-   Notifications  You must be signed in to change notification settings 
- Fork 435
Open
Description
- asyncpg version: 0.24.0
- PostgreSQL version: 12.8
- Do you use a PostgreSQL SaaS? If so, which? Can you reproduce
 the issue with a local PostgreSQL install?: Local PostgreSQL install
- Python version: 3.9.1
- Platform: Linux Mint
- Do you use pgbouncer?: No
- Did you install asyncpg with pip?: Yes
- If you built asyncpg locally, which version of Cython did you use?: N/A
- Can the issue be reproduced under both asyncio and
 uvloop?: Have not tested with uvloop yet
When a row in my table is inserted or updated, I need a listener to pull data from the table and perform operations with it. The callback is successfully triggered on updates and inserts, but within the callback the database cannot be accessed. A new connection is successfully acquired from the pool (line [1]) but the fetch operation does not complete (line [2]).
Is this an issue with my code, or is it a limitation of asyncpg?
Sorry for an enormous code example - it's in a microservice application with a number of Python applications that all access the database so this was about as minimal I could make things while keeping it vaguely resembling the existing program structure.
import asyncio import asyncpg from os import getenv from dotenv import load_dotenv import json load_dotenv("../../.env") class DatabaseInterface: """  Multiple different applications need to access the database, and will be performing similar operations.  Methods and properties to do with interacting with the database are all put into the DatabaseInterface class,  with each application creating a new instance of the class.  """ def __init__(self): self.pool = None self.listeners = [] async def connect(self): self.pool = await asyncpg.create_pool(port=int(getenv("POSTGRES_PORT")), user=getenv("POSTGRES_USER"), password=getenv("POSTGRES_PASSWORD"), database=getenv("POSTGRES_DB")) async def create_schema(self): async with self.pool.acquire() as connection: await connection.execute("""  DROP TABLE IF EXISTS table1;    CREATE TABLE table1 (key INTEGER,  value INTEGER,  PRIMARY KEY (key));    CREATE OR REPLACE FUNCTION notify_mychannel()  RETURNS TRIGGER AS  $$  DECLARE  payload TEXT;  BEGIN  -- This is a simplified payload. In the actual program,   -- the payload contains a key and an event label  payload := json_build_object('key', NEW.key);  PERFORM pg_notify('mychannel', payload);  RETURN NULL;  END  $$  LANGUAGE plpgsql;    CREATE TRIGGER update_value  AFTER UPDATE OF value ON table1   FOR EACH ROW  EXECUTE PROCEDURE notify_mychannel();    CREATE TRIGGER new_value  AFTER INSERT ON table1  FOR EACH ROW  EXECUTE PROCEDURE notify_mychannel();  """) async def add_listener(self, channel, callback): """  This wrapper function saves the connection in self.listeners, so that it is not garbage-collected.  """ connection = await self.pool.acquire() await connection.add_listener(channel, callback) self.listeners.append({'connection': connection, 'channel': channel, 'callback': callback}) async def upsert(self, key: int, value: int): async with self.pool.acquire() as connection: await connection.execute("INSERT INTO table1(key, value)" " VALUES ($1, $2)" " ON CONFLICT (key)" " DO UPDATE SET value = $2;", key, value) async def fetch(self, key: int): async with self.pool.acquire() as connection: # [1] record = await connection.fetch("SELECT key, value FROM table1" " WHERE key = $1;") # [2] return record['value'] class ExampleApplication: def __init__(self): self.database_interface = DatabaseInterface() async def start(self): await self.database_interface.connect() await self.database_interface.create_schema() await self.database_interface.add_listener('mychannel', self.callback) async def callback(self, connection: asyncpg.Connection, pid: int, channel: str, payload: str): """  This is an example callback function. In the actual app, the payload contains a key and event value,  which, depending on the event, should trigger fetches from multiple different tables in the database,  return data which will be used in other async operations.  """ payload_json = json.loads(payload) value = await self.database_interface.fetch(payload_json['key']) # This is the line that does not complete print(f"Value {value}") # This line is never run async def main(): app = ExampleApplication() await app.start() await app.database_interface.upsert(1, 10) if __name__ == "__main__": asyncio.run(main())Metadata
Metadata
Assignees
Labels
No labels