55# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
66# SPDX-License-Identifier: Apache-2.0.
77
8+ from collections .abc import Sequence
9+ from enum import IntEnum
810from dataclasses import dataclass
9- from typing import Union
11+ from typing import Callable , Union , Optional
1012from awscrt import NativeResource , mqtt5 , mqtt , exceptions
1113from concurrent .futures import Future
1214import _awscrt
13- import collections .abc
15+
16+
17+ class SubscriptionStatusEventType (IntEnum ):
18+ """
19+ The type of change to the state of a streaming operation subscription
20+ """
21+
22+ SUBSCRIPTION_ESTABLISHED = 0
23+ """
24+ The streaming operation is successfully subscribed to its topic (filter)
25+ """
26+
27+ SUBSCRIPTION_LOST = 1
28+ """
29+ The streaming operation has temporarily lost its subscription to its topic (filter)
30+ """
31+
32+ SUBSCRIPTION_HALTED = 2
33+ """
34+ The streaming operation has entered a terminal state where it has given up trying to subscribe
35+ to its topic (filter). This is always due to user error (bad topic filter or IoT Core permission policy).
36+ """
37+
38+
39+ @dataclass
40+ class SubscriptionStatusEvent :
41+ """
42+ An event that describes a change in subscription status for a streaming operation.
43+
44+ Args:
45+ type (SubscriptionStatusEventType): The type of status change represented by the event
46+ error (Optional[Exception]): Describes an underlying reason for the event. Only set for SubscriptionLost and SubscriptionHalted.
47+ """
48+ type : SubscriptionStatusEventType = None
49+ error : 'Optional[Exception]' = None
50+
51+
52+ @dataclass
53+ class IncomingPublishEvent :
54+ """
55+ An event that describes an incoming message on a streaming operation.
56+
57+ Args:
58+ topic (str): MQTT Topic that the response was received on.
59+ payload (Optional[bytes]): The payload of the incoming message.
60+ """
61+ topic : str
62+ payload : 'Optional[bytes]' = None
63+
64+
65+ """
66+ Signature for a handler that listens to subscription status events.
67+ """
68+ SubscriptionStatusListener = Callable [[SubscriptionStatusEvent ], None ]
69+
70+ """
71+ Signature for a handler that listens to incoming publish events.
72+ """
73+ IncomingPublishListener = Callable [[IncomingPublishEvent ], None ]
74+
75+
76+ @dataclass
77+ class StreamingOperationOptions :
78+ """
79+ Configuration options for an MQTT-based streaming operation.
80+
81+ Args:
82+ subscription_topic_filter (str): Topic filter that the streaming operation should listen on
83+ subscription_status_listener (SubscriptionStatusListener): function object to invoke when the operation's subscription status changes
84+ incoming_publish_listener (IncomingPublishListener): function object to invoke when a publish packet arrives that matches the subscription topic filter
85+ """
86+ subscription_topic_filter : str
87+ subscription_status_listener : 'Optional[SubscriptionStatusListener]' = None
88+ incoming_publish_listener : 'Optional[IncomingPublishListener]' = None
89+
90+ def validate (self ):
91+ """
92+ Stringently type-checks an instance's field values.
93+ """
94+ assert isinstance (self .subscription_topic_filter , str )
95+ assert callable (self .subscription_status_listener ) or self .subscription_status_listener is None
96+ assert callable (self .incoming_publish_listener ) or self .incoming_publish_listener is None
1497
1598
1699@dataclass
@@ -41,12 +124,15 @@ class ResponsePath:
41124 correlation_token_json_path : 'Optional[str]' = None
42125
43126 def validate (self ):
127+ """
128+ Stringently type-checks an instance's field values.
129+ """
44130 assert isinstance (self .topic , str )
45131 assert isinstance (self .correlation_token_json_path , str ) or self .correlation_token_json_path is None
46132
47133
48134@dataclass
49- class RequestResponseOperationOptions :
135+ class RequestOptions :
50136 """
51137 Configuration options for an MQTT-based request-response operation.
52138
@@ -64,11 +150,14 @@ class RequestResponseOperationOptions:
64150 correlation_token : 'Optional[str]' = None
65151
66152 def validate (self ):
67- assert isinstance (self .subscription_topic_filters , collections .abc .Sequence )
153+ """
154+ Stringently type-checks an instance's field values.
155+ """
156+ assert isinstance (self .subscription_topic_filters , Sequence )
68157 for topic_filter in self .subscription_topic_filters :
69158 assert isinstance (topic_filter , str )
70159
71- assert isinstance (self .response_paths , collections . abc . Sequence )
160+ assert isinstance (self .response_paths , Sequence )
72161 for response_path in self .response_paths :
73162 response_path .validate ()
74163
@@ -78,18 +167,7 @@ def validate(self):
78167
79168
80169@dataclass
81- class StreamingOperationOptions :
82- """
83- Configuration options for an MQTT-based streaming operation.
84-
85- Args:
86- subscription_topic_filter (str): Topic filter that the streaming operation should listen on
87- """
88- subscription_topic_filter : str
89-
90-
91- @dataclass
92- class RequestResponseClientOptions :
170+ class ClientOptions :
93171 """
94172 MQTT-based request-response client configuration options
95173
@@ -103,6 +181,9 @@ class RequestResponseClientOptions:
103181 operation_timeout_in_seconds : 'Optional[int]' = 60
104182
105183 def validate (self ):
184+ """
185+ Stringently type-checks an instance's field values.
186+ """
106187 assert isinstance (self .max_request_response_subscriptions , int )
107188 assert isinstance (self .max_streaming_subscriptions , int )
108189 assert isinstance (self .operation_timeout_in_seconds , int )
@@ -123,10 +204,10 @@ class Client(NativeResource):
123204 """
124205
125206 def __init__ (self , protocol_client : Union [mqtt5 .Client , mqtt .Connection ],
126- client_options : RequestResponseClientOptions ):
207+ client_options : ClientOptions ):
127208
128209 assert isinstance (protocol_client , mqtt5 .Client ) or isinstance (protocol_client , mqtt .Connection )
129- assert isinstance (client_options , RequestResponseClientOptions )
210+ assert isinstance (client_options , ClientOptions )
130211 client_options .validate ()
131212
132213 super ().__init__ ()
@@ -136,7 +217,17 @@ def __init__(self, protocol_client: Union[mqtt5.Client, mqtt.Connection],
136217 else :
137218 self ._binding = _awscrt .mqtt_request_response_client_new_from_311 (protocol_client , client_options )
138219
139- def make_request (self , options : RequestResponseOperationOptions ):
220+ def make_request (self , options : RequestOptions ):
221+ """
222+ Initiate an MQTT-based request-response async workflow
223+
224+ Args:
225+ options (RequestOptions): Configuration options for the request to perform
226+
227+ Returns:
228+ concurrent.futures.Future: A Future whose result will contain the topic and payload of a response
229+ to the request. The future will contain an exception if the request fails.
230+ """
140231 options .validate ()
141232
142233 future = Future ()
@@ -157,3 +248,52 @@ def on_request_complete(error_code, topic, payload):
157248 on_request_complete )
158249
159250 return future
251+
252+ def create_stream (self , options : StreamingOperationOptions ):
253+ """
254+ Creates a new streaming operation
255+
256+ Args:
257+ options (StreamingOperationOptions): Configuration options for the streaming operation
258+
259+ Returns:
260+ StreamingOperation: a new streaming operation. Opening the operation triggers the client to maintain
261+ an MQTT subscription for relevant events. Matching publishes and subscription status changes are
262+ communicated by invoking configuration-controlled callbacks.
263+ """
264+ options .validate ()
265+
266+ def on_subscription_status_event (event_type , error_code ):
267+ if options .subscription_status_listener is not None :
268+ event = SubscriptionStatusEvent (event_type )
269+ if error_code != 0 :
270+ event .error = exceptions .from_code (error_code )
271+ options .subscription_status_listener (event )
272+
273+ def on_incoming_publish_event (topic , payload ):
274+ if options .incoming_publish_listener is not None :
275+ event = IncomingPublishEvent (topic , payload )
276+ options .incoming_publish_listener (event )
277+
278+ stream_binding = _awscrt .mqtt_request_response_client_create_stream (
279+ self ._binding , options .subscription_topic_filter , on_subscription_status_event , on_incoming_publish_event )
280+
281+ return StreamingOperation (stream_binding )
282+
283+
284+ class StreamingOperation (NativeResource ):
285+ """
286+ An operation that represents a stream of events broadcast to an MQTT topic
287+ """
288+
289+ def __init__ (self , binding ):
290+ super ().__init__ ()
291+
292+ self ._binding = binding
293+
294+ def open (self ):
295+ """
296+ Triggers the streaming operation to maintain an MQTT subscription for relevant events. Until a stream is
297+ opened, no events can be received.
298+ """
299+ _awscrt .mqtt_streaming_operation_open (self ._binding )
0 commit comments