- Notifications
You must be signed in to change notification settings - Fork 435
Closed
Description
- asyncpg version: 0.28.0
- PostgreSQL version: 15 (latest docker container)
- Do you use a PostgreSQL SaaS? If so, which? Can you reproduce
the issue with a local PostgreSQL install?: - Python version: 3.10
- Platform: windows or debian in WSL
- Do you use pgbouncer?: no
- Did you install asyncpg with pip?: yes
- If you built asyncpg locally, which version of Cython did you use?:
- Can the issue be reproduced under both asyncio and
uvloop?: yes
Hi,
I need for a project to store complex numbers in a database (numpy.complex64 exactly) and for that I created a postgresql composite datatype.
I was able to make it work with asyncpg in text format for simple INSERT, SELECT... after some works and patches found on previous issues report.
However for the binary format I have weird errors : asyncpg.exceptions.DatatypeMismatchError: wrong number of columns: 1082549862, expected 2
I made a "simple" python script to reproduce the issue :
from __future__ import annotations import asyncpg import asyncio import numpy as np from ast import literal_eval import struct SQL_CREATE = """ DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'complex') THEN CREATE TYPE complex AS ( r float4, i float4 ); END IF; CREATE TABLE dummy_table (dummy_column complex); DROP TABLE dummy_table; END $$; DROP TABLE IF EXISTS "poc_asyncpg"; CREATE TABLE "poc_asyncpg" ( "id" SERIAL PRIMARY KEY, "float_value" float4 NULL, "complex_value" complex NULL, "complex_array" complex[] NULL ) """ def _cplx_decode(val) -> np.complex64: cplx = complex(*literal_eval(val)) return np.complex64(cplx) def _cplx_encode(val: np.complex64 | complex) -> str: return str((np.float32(val.real), np.float32(val.imag))) async def set_type_codec(conn): """ had to use this patch since the conn.set_type_codec does not work for scalar variables """ schema = 'public' format = 'text' conn._check_open() typenames = ('complex',) for typename in typenames: typeinfo = await conn.fetchrow( asyncpg.introspection.TYPE_BY_NAME, typename, schema) if not typeinfo: raise ValueError('unknown type: {}.{}'.format(schema, typename)) oid = typeinfo['oid'] conn._protocol.get_settings().add_python_codec( oid, typename, schema, 'scalar', lambda a: _cplx_encode(a), lambda a: _cplx_decode(a), format) # if this part is commented, error message is : # asyncpg.exceptions._base.InternalClientError: no binary format encoder for type complex conn._protocol.get_settings().add_python_codec( oid, typename, schema, 'scalar', encoder=lambda x: struct.pack('!2f', x.real, x.imag), decoder=lambda x: np.frombuffer(x, dtype=np.complex64)[0], format="binary") # Statement cache is no longer valid due to codec changes. conn._drop_local_statement_cache() async def init_connection(conn): await set_type_codec(conn) await conn.set_type_codec( 'numeric', encoder=str, decoder=np.float32, schema='pg_catalog', format='text' ) await conn.set_type_codec( 'float4', encoder=str, decoder=np.float32, schema='pg_catalog', format='text' ) await conn.set_type_codec( 'float4', encoder=struct.Struct("!f").pack, decoder=struct.Struct("!f").unpack, schema='pg_catalog', format='binary' ) async def trunc(pool): async with pool.acquire() as conn: async with conn.transaction(): query = "TRUNCATE poc_asyncpg" await conn.execute(query) async def worker_copy(pool, column_name, data): await trunc(pool) async with pool.acquire() as conn: async with conn.transaction(): await conn.copy_records_to_table("poc_asyncpg", records=[(data,)], columns=(column_name,) ) try: from common import dbinfo except ImportError as e: class DB: user = "user" password = "password" database = "db" dbinfo = DB() def create_pool(db_info): pool = asyncpg.create_pool( user=db_info.user, password=db_info.password, database=dbinfo.database, host="127.0.0.1", init=init_connection ) return pool async def main(info): pool = await create_pool(info) async with pool.acquire() as conn: async with conn.transaction(): await conn.execute(SQL_CREATE) await worker_copy(pool, "float_value", np.float32(4.2)) await worker_copy(pool, "complex_value", np.complex64(4.2 + 1j * 4.2)) if __name__ == '__main__': loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(main(dbinfo))Metadata
Metadata
Assignees
Labels
No labels