Pythonで始めるasyncpg入門:非同期PostgreSQLドライバを使いこなそう

PythonでPostgreSQLにアクセスする際、psycopg2などの同期ドライバを使うのが一般的ですが、非同期処理と組み合わせるとパフォーマンスを最大限に引き出せる場面があります。

本記事では、非同期対応のPostgreSQLドライバである asyncpg の基本から使い方、実践的なサンプルコードまで、初心者向けにわかりやすく解説します。


前提知識と準備

  • Python 3.7以上がインストールされていること
  • PostgreSQL 9.5以上が動作していること
  • 基本的なSQL文(SELECT/INSERT/UPDATE/DELETE)が理解できていること
  • 非同期プログラミング(async/await)の基礎知識があると望ましいですが、本記事では簡単に触れます

asyncpgのインストール

# pipでインストール
pip install asyncpg

仮想環境を使っている場合は、事前に python -m venv venv → source venv/bin/activate(Windowsの場合は venv\Scripts\activate)を実行しましょう。


非同期プログラミングの基礎

Pythonの非同期処理は、async defで定義した関数(コルーチン)をawaitで呼び出して実行します。

簡単な例
import asyncio

async def say_hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

asyncio.run(say_hello())
  • asyncio.run(): コルーチンを実行するエントリポイント
  • await asyncio.sleep(1): 非同期に1秒待機

これをベースに、asyncpgも非同期I/Oでデータベースとやり取りします。


asyncpgでの基本操作

接続と切断

import asyncio
import asyncpg

async def main():
    conn = await asyncpg.connect(
        user='your_user', password='your_pass',
        database='your_db', host='127.0.0.1'
    )
    # ここでクエリ実行など
    await conn.close()

asyncio.run(main())

クエリの実行

# テーブル作成
await conn.execute('''
    CREATE TABLE IF NOT EXISTS users (
        id SERIAL PRIMARY KEY,
        name TEXT,
        age INT
    )
''')

レコードの取得

# 単一レコード
row = await conn.fetchrow('SELECT * FROM users WHERE id = $1', 1)
print(row['name'], row['age'])

# 複数レコード
rows = await conn.fetch('SELECT * FROM users')
for r in rows:
    print(r['id'], r['name'])

プレースホルダーは $1, $2 のように数字で指定します。


コネクションプールの利用

多数のリクエストが発生するWebアプリなどでは、毎回接続を開閉するのはコストが高いです。

asyncpg.create_pool() でプールを作成し、使い回しましょう。

async def main():
    pool = await asyncpg.create_pool(
        user='your_user', password='your_pass',
        database='your_db', host='127.0.0.1',
        min_size=5, max_size=10
    )
    async with pool.acquire() as conn:
        await conn.execute('INSERT INTO users(name, age) VALUES($1, $2)', '太郎', 30)
    await pool.close()
  • min_size: プールに常に確保しておく接続数
  • max_size: プール内の最大接続数

トランザクション制御

トランザクションは transaction() コンテキストマネージャを使うと簡単です。

async with pool.acquire() as conn:
    async with conn.transaction():
        await conn.execute('UPDATE users SET age = age + 1 WHERE id = $1', 1)
        # ここでエラーが発生するとロールバックされる

実践サンプル:簡易TODOアプリ

以下は、非同期でTODOを管理する簡易CLIアプリの例です。

import asyncio
import asyncpg

DSN = 'postgresql://user:pass@127.0.0.1/todo_db'

async def init_db(conn):
    await conn.execute('''
        CREATE TABLE IF NOT EXISTS todo (
            id SERIAL PRIMARY KEY,
            task TEXT,
            done BOOLEAN DEFAULT FALSE
        )
    ''')

async def add_task(pool, task):
    async with pool.acquire() as conn:
        await conn.execute('INSERT INTO todo(task) VALUES($1)', task)
        print(f"タスク追加: {task}")

async def list_tasks(pool):
    async with pool.acquire() as conn:
        rows = await conn.fetch('SELECT * FROM todo ORDER BY id')
        for r in rows:
            status = '✔' if r['done'] else '✖'
            print(f"{r['id']}. {r['task']} [{status}]")

async def mark_done(pool, task_id):
    async with pool.acquire() as conn:
        await conn.execute('UPDATE todo SET done = TRUE WHERE id = $1', task_id)
        print(f"タスク完了: ID={task_id}")

async def main():
    pool = await asyncpg.create_pool(dsn=DSN)
    async with pool.acquire() as conn:
        await init_db(conn)

    # サンプル操作
    await add_task(pool, 'asyncpgを学ぶ')
    await add_task(pool, 'サンプルアプリを作る')
    print('\n--- 現在のタスク ---')
    await list_tasks(pool)
    await mark_done(pool, 1)
    print('\n--- 更新後のタスク ---')
    await list_tasks(pool)

    await pool.close()

if __name__ == '__main__':
    asyncio.run(main())

演習問題

  1. テーブル設計
    • products テーブルを作成し、以下のカラムを持たせてください。
      • id (SERIAL, 主キー)
      • name (TEXT)
      • price (NUMERIC)
      • stock (INT)
  2. データ登録・取得
    • 1.で作成したテーブルに3件の商品を登録し、すべてのレコードを取得してコンソールに出力してください。
  3. 在庫更新トランザクション
    • 購入処理として、指定した product_id の在庫を quantity 分だけ減らす関数を作成し、在庫不足時は例外を発生させてトランザクションをロールバックしてください。
  4. コネクションプール応用
    • プールサイズを最小3、最大6で設定し、複数のタスクを同時に実行しても安全に動作することを確認してください。

解答例

import asyncio
import asyncpg
from decimal import Decimal

DSN = 'postgresql://user:pass@127.0.0.1/shop_db'

async def init_db(conn):
    # 演習1: productsテーブル作成
    await conn.execute('''
        CREATE TABLE IF NOT EXISTS products (
            id SERIAL PRIMARY KEY,
            name TEXT,
            price NUMERIC,
            stock INT
        )
    ''')

async def exercise2(pool):
    # 演習2: データ登録と取得
    async with pool.acquire() as conn:
        await conn.execute('''
            INSERT INTO products(name, price, stock) VALUES
            ('ペン', $1, 100),
            ('ノート', $2, 50),
            ('消しゴム', $3, 200)
        ''', Decimal('1.20'), Decimal('3.00'), Decimal('0.50'))
        rows = await conn.fetch('SELECT * FROM products')
        print('--- 商品一覧 ---')
        for r in rows:
            print(r['id'], r['name'], r['price'], r['stock'])

async def purchase(pool, product_id: int, quantity: int):
    # 演習3: 購入処理
    async with pool.acquire() as conn:
        async with conn.transaction():
            row = await conn.fetchrow('SELECT stock FROM products WHERE id = $1', product_id)
            if row is None:
                raise ValueError('該当商品が存在しません')
            current_stock = row['stock']
            if current_stock < quantity:
                raise ValueError('在庫不足')
            await conn.execute(
                'UPDATE products SET stock = stock - $1 WHERE id = $2',
                quantity, product_id
            )
            print(f'購入成功: product_id={product_id}, quantity={quantity}')

async def main():
    pool = await asyncpg.create_pool(
        dsn=DSN,
        min_size=3,
        max_size=6
    )
    # 初期化
    async with pool.acquire() as conn:
        await init_db(conn)

    # 演習2実行
    await exercise2(pool)

    # 演習3実行例
    try:
        await purchase(pool, product_id=1, quantity=20)
        await purchase(pool, product_id=2, quantity=100)  # 在庫不足例
    except Exception as e:
        print('エラー:', e)

    await pool.close()

if __name__ == '__main__':
    asyncio.run(main())

まとめ

  • asyncpg は高速かつ軽量な非同期PostgreSQLドライバ
  • async/await と組み合わせることで、I/O待ち時間を有効活用できる
  • コネクションプールやトランザクション管理もシンプルなAPIで実装可能
  • 本記事の演習問題を通じて、実践的な使い方を身につけましょう

これで、Pythonでの非同期PostgreSQLアクセスがぐっと身近になったはずです。ぜひ日々の開発に活用してみてください。