@@ -489,7 +489,7 @@ cdef class ReadBuffer:
489489
490490 self ._ensure_first_buf()
491491
492-  cdef int32_t has_message (self ) except  - 1 :
492+  cdef int32_t take_message (self ) except  - 1 :
493493 cdef:
494494 const char  * cbuf
495495
@@ -525,8 +525,24 @@ cdef class ReadBuffer:
525525 self ._current_message_ready =  1 
526526 return  1 
527527
528-  cdef inline int32_t has_message_type(self , char  mtype) except  - 1 :
529-  return  self .has_message() and  self .get_message_type() ==  mtype
528+  cdef inline int32_t take_message_type(self , char  mtype) except  - 1 :
529+  cdef const char  * buf0
530+ 
531+  if  self ._current_message_ready:
532+  return  self ._current_message_type ==  mtype
533+  elif  self ._length >=  1 :
534+  self ._ensure_first_buf()
535+  buf0 =  cpython.PyBytes_AS_STRING(self ._buf0)
536+ 
537+  return  buf0[self ._pos0] ==  mtype and  self .take_message()
538+  else :
539+  return  0 
540+ 
541+  cdef int32_t put_message(self ) except  - 1 :
542+  if  not  self ._current_message_ready:
543+  raise  BufferError(' cannot put message: no message taken' 
544+  self ._current_message_ready =  False 
545+  return  0 
530546
531547 cdef inline const char *  try_consume_message(self , ssize_t*  len ):
532548 cdef:
@@ -541,7 +557,7 @@ cdef class ReadBuffer:
541557 buf =  self ._try_read_bytes(buf_len)
542558 if  buf !=  NULL :
543559 len [0 ] =  buf_len
544-  self ._discard_message ()
560+  self ._finish_message ()
545561 return  buf
546562
547563 cdef Memory consume_message(self ):
@@ -551,7 +567,7 @@ cdef class ReadBuffer:
551567 mem =  self .read(self ._current_message_len_unread)
552568 else :
553569 mem =  None 
554-  self ._discard_message ()
570+  self ._finish_message ()
555571 return  mem
556572
557573 cdef bytearray consume_messages(self , char  mtype):
@@ -562,7 +578,7 @@ cdef class ReadBuffer:
562578 ssize_t total_bytes =  0 
563579 bytearray result
564580
565-  if  not  self .has_message_type (mtype):
581+  if  not  self .take_message_type (mtype):
566582 return  None 
567583
568584 #  consume_messages is a volume-oriented method, so
@@ -571,26 +587,24 @@ cdef class ReadBuffer:
571587 result =  PyByteArray_FromStringAndSize(NULL , self ._length)
572588 buf =  PyByteArray_AsString(result)
573589
574-  while  self .has_message_type (mtype):
590+  while  self .take_message_type (mtype):
575591 nbytes =  self ._current_message_len_unread
576592 self ._read(buf, nbytes)
577593 buf +=  nbytes
578594 total_bytes +=  nbytes
579-  self ._discard_message ()
595+  self ._finish_message ()
580596
581597 #  Clamp the result to an actual size read.
582598 PyByteArray_Resize(result, total_bytes)
583599
584600 return  result
585601
586-  cdef discard_message(self ):
587-  if  self ._current_message_type ==  0 :
588-  #  Already discarded
602+  cdef finish_message(self ):
603+  if  self ._current_message_type ==  0  or  not  self ._current_message_ready:
604+  #  The message has already been finished (e.g by consume_message()),
605+  #  or has been put back by put_message().
589606 return 
590607
591-  if  not  self ._current_message_ready:
592-  raise  BufferError(' no message to discard' 
593- 
594608 if  self ._current_message_len_unread:
595609 if  ASYNCPG_DEBUG:
596610 mtype =  chr (self ._current_message_type)
@@ -602,9 +616,9 @@ cdef class ReadBuffer:
602616 mtype,
603617 (< Memory> discarded).as_bytes()))
604618
605-  self ._discard_message ()
619+  self ._finish_message ()
606620
607-  cdef inline _discard_message (self ):
621+  cdef inline _finish_message (self ):
608622 self ._current_message_type =  0 
609623 self ._current_message_len =  0 
610624 self ._current_message_ready =  0 
0 commit comments