Skip to content
This repository was archived by the owner on Jun 8, 2023. It is now read-only.

Commit 4b3463d

Browse files
committed
queue_message exposed by api
1 parent c2335aa commit 4b3463d

File tree

2 files changed

+5
-4
lines changed

2 files changed

+5
-4
lines changed

pq/api.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .utils.serializers import Message, MessageDict
1+
from .utils.serializers import Message, MessageDict, queue_message
22
from .utils.concurrency import ASYNC_IO, THREAD_IO, CPUBOUND
33
from .mq import MessageFuture, MQ, Manager, register_broker
44
from .consumer import ConsumerAPI
@@ -17,6 +17,7 @@
1717
'PulsarQueue',
1818
#
1919
'Message',
20+
'queue_message',
2021
'MessageDict',
2122
'MessageFuture',
2223
'ConsumerAPI',

pq/utils/serializers.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class Json:
5858
def decode(cls, data):
5959
if isinstance(data, bytes):
6060
data = data.decode('utf-8')
61-
return json.loads(data, object_hook=object_hook)
61+
return json.loads(data, object_hook=queue_message)
6262

6363
@classmethod
6464
def encode(cls, message):
@@ -72,15 +72,15 @@ class MsgPack:
7272

7373
@classmethod
7474
def decode(cls, data):
75-
return msgpack.unpackb(data, object_hook=object_hook,
75+
return msgpack.unpackb(data, object_hook=queue_message,
7676
encoding='utf-8')
7777

7878
@classmethod
7979
def encode(cls, message):
8080
return msgpack.packb(message, default=as_message)
8181

8282

83-
def object_hook(d):
83+
def queue_message(d):
8484
type = d.get('type')
8585
MsgType = message_types.get(type)
8686
if MsgType:

0 commit comments

Comments
 (0)