99
1010import argparse
1111import asyncio
12- from concurrent import futures
1312import csv
1413import io
1514import itertools
1615import json
1716import re
1817import sys
1918import time
20-
21- import numpy as np
22- import uvloop
19+ from concurrent import futures
2320
2421import aiopg
2522import asyncpg
23+ import numpy as np
2624import postgresql
25+ import psycopg
2726import psycopg2
2827import psycopg2 .extras
28+ import uvloop
2929
3030
3131def _chunks (iterable , n ):
@@ -41,18 +41,26 @@ def _ctr(_):
4141
4242
4343def psycopg_connect (args ):
44- conn = psycopg2 .connect (user = args .pguser , host = args .pghost ,
44+ conn = psycopg .connect (user = args .pguser , host = args .pghost ,
4545 port = args .pgport )
4646 return conn
4747
48+ def psycopg2_connect (args ):
49+ conn = psycopg2 .connect (user = args .pguser , host = args .pghost ,
50+ port = args .pgport )
51+ return conn
4852
49- def psycopg_execute (conn , query , args ):
53+ def psycopg2_execute (conn , query , args ):
5054 cur = conn .cursor (cursor_factory = psycopg2 .extras .DictCursor )
5155 cur .execute (query , args )
5256 return len (cur .fetchall ())
5357
58+ def psycopg_execute (conn , query , args ):
59+ cur = conn .cursor (row_factory = psycopg .rows .dict_row )
60+ cur .execute (query , args )
61+ return len (cur .fetchall ())
5462
55- def psycopg_copy (conn , query , args ):
63+ def psycopg2_copy (conn , query , args ):
5664 rows , copy = args [:2 ]
5765 f = io .StringIO ()
5866 writer = csv .writer (f , delimiter = '\t ' )
@@ -64,6 +72,29 @@ def psycopg_copy(conn, query, args):
6472 conn .commit ()
6573 return cur .rowcount
6674
75+ def psycopg_copy (conn , query , args ):
76+ rows , copy = args [:2 ]
77+ f = io .StringIO ()
78+ writer = csv .writer (f , delimiter = '\t ' )
79+ for row in rows :
80+ writer .writerow (row )
81+ f .seek (0 )
82+ with conn .cursor () as cur :
83+ with cur .copy (query ) as copy :
84+ copy .write (f .read ())
85+ conn .commit ()
86+ return cur .rowcount
87+
88+ def psycopg2_executemany (conn , query , args ):
89+ cur = conn .cursor (cursor_factory = psycopg2 .extras .DictCursor )
90+ cur .executemany (query , args )
91+ return len (args )
92+
93+ def psycopg_executemany (conn , query , args ):
94+ with conn .cursor () as cur :
95+ cur .executemany (query , args )
96+ return len (args )
97+
6798
6899def pypostgresql_connect (args ):
69100 conn = postgresql .open (user = args .pguser , host = args .pghost ,
@@ -128,15 +159,33 @@ async def asyncpg_connect(args):
128159 return conn
129160
130161
162+ async def async_psycopg_connect (args ):
163+ conn = await psycopg .AsyncConnection .connect (
164+ user = args .pguser , host = args .pghost , port = args .pgport )
165+ return conn
166+
167+
131168async def asyncpg_execute (conn , query , args ):
132169 return len (await conn .fetch (query , * args ))
133170
134171
172+ async def async_psycopg_execute (conn , query , args ):
173+ cur = conn .cursor (row_factory = psycopg .rows .dict_row )
174+ await cur .execute (query , args )
175+ return len (await cur .fetchall ())
176+
177+
135178async def asyncpg_executemany (conn , query , args ):
136179 await conn .executemany (query , args )
137180 return len (args )
138181
139182
183+ async def async_psycopg_executemany (conn , query , args ):
184+ async with conn .cursor () as cur :
185+ await cur .executemany (query , args )
186+ return len (args )
187+
188+
140189async def asyncpg_copy (conn , query , args ):
141190 rows , copy = args [:2 ]
142191 result = await conn .copy_records_to_table (
@@ -145,6 +194,21 @@ async def asyncpg_copy(conn, query, args):
145194 return int (count )
146195
147196
197+ async def async_psycopg_copy (conn , query , args ):
198+ rows , copy = args [:2 ]
199+ f = io .StringIO ()
200+ writer = csv .writer (f , delimiter = '\t ' )
201+ for row in rows :
202+ writer .writerow (row )
203+ f .seek (0 )
204+
205+ async with conn .cursor () as cur :
206+ async with cur .copy (query ) as copy :
207+ await copy .write (f .read ())
208+ await conn .commit ()
209+ return cur .rowcount
210+
211+
148212async def worker (executor , eargs , start , duration , timeout ):
149213 queries = 0
150214 rows = 0
@@ -369,7 +433,15 @@ def die(msg):
369433 help = 'PostgreSQL server user' )
370434 parser .add_argument (
371435 'driver' , help = 'driver implementation to use' ,
372- choices = ['aiopg' , 'aiopg-tuples' , 'asyncpg' , 'psycopg' , 'postgresql' ])
436+ choices = [
437+ 'aiopg' ,
438+ 'aiopg-tuples' ,
439+ 'asyncpg' ,
440+ 'psycopg2' ,
441+ 'psycopg3' ,
442+ 'psycopg3-async' ,
443+ 'postgresql'
444+ ],
373445 parser .add_argument (
374446 'queryfile' , help = 'file to read benchmark query information from' )
375447
@@ -425,11 +497,25 @@ def die(msg):
425497 asyncpg_connect , asyncpg_execute , asyncpg_copy , asyncpg_executemany
426498 is_async = True
427499 arg_format = 'native'
428- elif args .driver == 'psycopg' :
429- connector , executor , copy_executor = \
430- psycopg_connect , psycopg_execute , psycopg_copy
500+ elif args .driver == 'psycopg2' :
501+ connector , executor , copy_executor , batch_executor = (
502+ psycopg2_connect , psycopg2_execute ,
503+ psycopg2_copy , psycopg2_executemany ,
504+ )
505+ is_async = False
506+ arg_format = 'python'
507+ elif args .driver == 'psycopg3' :
508+ connector , executor , copy_executor , batch_executor = \
509+ psycopg_connect , psycopg_execute , psycopg_copy , psycopg_executemany
431510 is_async = False
432511 arg_format = 'python'
512+ elif args .driver == 'psycopg3-async' :
513+ connector , executor , copy_executor , batch_executor = (
514+ async_psycopg_connect , async_psycopg_execute ,
515+ async_psycopg_copy , async_psycopg_executemany ,
516+ )
517+ is_async = True
518+ arg_format = 'python'
433519 elif args .driver == 'postgresql' :
434520 connector , executor = pypostgresql_connect , pypostgresql_execute
435521 is_async = False
0 commit comments