@@ -41,7 +41,7 @@ class Connection(metaclass=ConnectionMeta):
4141 '_stmt_cache' , '_stmts_to_close' , '_listeners' ,
4242 '_server_version' , '_server_caps' , '_intro_query' ,
4343 '_reset_query' , '_proxy' , '_stmt_exclusive_section' ,
44-  '_config' , '_params' , '_addr' )
44+  '_config' , '_params' , '_addr' ,  '_notice_callbacks' )
4545
4646 def  __init__ (self , protocol , transport , loop ,
4747 addr : (str , int ) or  str ,
@@ -69,6 +69,7 @@ def __init__(self, protocol, transport, loop,
6969 self ._stmts_to_close  =  set ()
7070
7171 self ._listeners  =  {}
72+  self ._notice_callbacks  =  set ()
7273
7374 settings  =  self ._protocol .get_settings ()
7475 ver_string  =  settings .server_version 
@@ -126,6 +127,26 @@ async def remove_listener(self, channel, callback):
126127 del  self ._listeners [channel ]
127128 await  self .fetch ('UNLISTEN {}' .format (channel ))
128129
130+  def  add_notice_callback (self , callback ):
131+  """Add a callback for Postgres notices (NOTICE, DEBUG, LOG etc.). 
132+ 
133+  It will be called when asyncronous NoticeResponse is received 
134+  from the connection. Possible message types are: WARNING, NOTICE, DEBUG, 
135+  INFO, or LOG. 
136+ 
137+  :param callable callback: 
138+  A callable receiving the following arguments: 
139+  **connection**: a Connection the callback is registered with; 
140+  **message**: the `exceptions.PostgresNotice` message. 
141+  """ 
142+  if  self .is_closed ():
143+  raise  exceptions .InterfaceError ('connection is closed' )
144+  self ._notice_callbacks .add (callback )
145+ 
146+  def  remove_notice_callback (self , callback ):
147+  """Remove a callback for notices.""" 
148+  self ._notice_callbacks .discard (callback )
149+ 
129150 def  get_server_pid (self ):
130151 """Return the PID of the Postgres server the connection is bound to.""" 
131152 return  self ._protocol .get_server_pid ()
@@ -821,13 +842,15 @@ async def close(self):
821842 self ._listeners  =  {}
822843 self ._aborted  =  True 
823844 await  self ._protocol .close ()
845+  self ._notice_callbacks  =  set ()
824846
825847 def  terminate (self ):
826848 """Terminate the connection without waiting for pending data.""" 
827849 self ._mark_stmts_as_closed ()
828850 self ._listeners  =  {}
829851 self ._aborted  =  True 
830852 self ._protocol .abort ()
853+  self ._notice_callbacks  =  set ()
831854
832855 async  def  reset (self ):
833856 self ._check_open ()
@@ -909,6 +932,26 @@ async def cancel():
909932
910933 self ._loop .create_task (cancel ())
911934
935+  def  _notice (self , message ):
936+  if  self ._proxy  is  None :
937+  con_ref  =  self 
938+  else :
939+  # See the comment in the `_notify` below. 
940+  con_ref  =  self ._proxy 
941+ 
942+  for  cb  in  self ._notice_callbacks :
943+  self ._loop .call_soon (self ._call_notice_cb , cb , con_ref , message )
944+ 
945+  def  _call_notice_cb (self , cb , con_ref , message ):
946+  try :
947+  cb (con_ref , message )
948+  except  Exception  as  ex :
949+  self ._loop .call_exception_handler ({
950+  'message' : 'Unhandled exception in asyncpg notice message ' 
951+  'callback {!r}' .format (cb ),
952+  'exception' : ex 
953+  })
954+ 
912955 def  _notify (self , pid , channel , payload ):
913956 if  channel  not  in self ._listeners :
914957 return 
0 commit comments