Skip to content

Commit bb89973

Browse files
bretambroseBret Ambrose
andauthored
MQTT request response client creation/destruction (awslabs#643)
Co-authored-by: Bret Ambrose <bambrose@amazon.com>
1 parent 0a9b3c5 commit bb89973

File tree

5 files changed

+592
-0
lines changed

5 files changed

+592
-0
lines changed

awscrt/mqtt_request_response.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
"""
2+
MQTT Request Response module
3+
"""
4+
5+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
6+
# SPDX-License-Identifier: Apache-2.0.
7+
8+
from dataclasses import dataclass
9+
from typing import Callable, Union
10+
from awscrt import NativeResource, mqtt5, mqtt
11+
import _awscrt
12+
13+
14+
@dataclass
15+
class RequestResponseClientOptions:
16+
"""
17+
MQTT-based request-response client configuration options
18+
19+
Args:
20+
max_request_response_subscriptions (int): Maximum number of subscriptions that the client will concurrently use for request-response operations
21+
max_streaming_subscriptions (int): Maximum number of subscriptions that the client will concurrently use for streaming operations
22+
operation_timeout_in_seconds (Optional[int]): Duration, in seconds, that a request-response operation will wait for completion before giving up
23+
"""
24+
max_request_response_subscriptions: int
25+
max_streaming_subscriptions: int
26+
operation_timeout_in_seconds: 'Optional[int]' = 60
27+
28+
def validate(self):
29+
assert isinstance(self.max_request_response_subscriptions, int)
30+
assert isinstance(self.max_streaming_subscriptions, int)
31+
assert isinstance(self.operation_timeout_in_seconds, int)
32+
33+
34+
class Client(NativeResource):
35+
"""
36+
MQTT-based request-response client tuned for AWS MQTT services.
37+
38+
Supports streaming operations (listen to a stream of modeled events from an MQTT topic) and request-response
39+
operations (performs the subscribes, publish, and incoming publish correlation and error checking needed to
40+
perform simple request-response operations over MQTT).
41+
42+
Args:
43+
protocol_client (Union[mqtt5.Client, mqtt.Connection]): MQTT client to use as transport
44+
client_options (ClientOptions): The ClientOptions dataclass to used to configure the new request response Client.
45+
46+
"""
47+
48+
def __init__(self, protocol_client: Union[mqtt5.Client, mqtt.Connection],
49+
client_options: RequestResponseClientOptions):
50+
51+
assert isinstance(protocol_client, mqtt5.Client) or isinstance(protocol_client, mqtt.Connection)
52+
assert isinstance(client_options, RequestResponseClientOptions)
53+
client_options.validate()
54+
55+
super().__init__()
56+
57+
if isinstance(protocol_client, mqtt5.Client):
58+
self._binding = _awscrt.mqtt_request_response_client_new_from_5(protocol_client, client_options)
59+
else:
60+
self._binding = _awscrt.mqtt_request_response_client_new_from_311(protocol_client, client_options)

source/module.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "mqtt5_client.h"
1515
#include "mqtt_client.h"
1616
#include "mqtt_client_connection.h"
17+
#include "mqtt_request_response.h"
1718
#include "s3.h"
1819
#include "websocket.h"
1920

@@ -776,6 +777,10 @@ static PyMethodDef s_module_methods[] = {
776777
AWS_PY_METHOD_DEF(mqtt5_client_get_stats, METH_VARARGS),
777778
AWS_PY_METHOD_DEF(mqtt5_ws_handshake_transform_complete, METH_VARARGS),
778779

780+
/* MQTT Request Response Client */
781+
AWS_PY_METHOD_DEF(mqtt_request_response_client_new_from_5, METH_VARARGS),
782+
AWS_PY_METHOD_DEF(mqtt_request_response_client_new_from_311, METH_VARARGS),
783+
779784
/* Cryptographic primitives */
780785
AWS_PY_METHOD_DEF(md5_new, METH_NOARGS),
781786
AWS_PY_METHOD_DEF(sha256_new, METH_NOARGS),

source/mqtt_request_response.c

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
#include "mqtt_request_response.h"
6+
7+
#include "mqtt5_client.h"
8+
#include "mqtt_client_connection.h"
9+
10+
#include "aws/mqtt/request-response/request_response_client.h"
11+
12+
static const char *s_capsule_name_mqtt_request_response_client = "aws_mqtt_request_response_client";
13+
14+
static const char *AWS_PYOBJECT_KEY_REQUEST_RESPONSE_CLIENT_OPTIONS = "RequestResponseClientOptions";
15+
static const char *AWS_PYOBJECT_KEY_MAX_REQUEST_RESPONSE_SUBSCRIPTIONS = "max_request_response_subscriptions";
16+
static const char *AWS_PYOBJECT_KEY_MAX_STREAMING_SUBSCRIPTIONS = "max_streaming_subscriptions";
17+
static const char *AWS_PYOBJECT_KEY_OPERATION_TIMEOUT_IN_SECONDS = "operation_timeout_in_seconds";
18+
19+
struct mqtt_request_response_client_binding {
20+
struct aws_mqtt_request_response_client *native;
21+
};
22+
23+
static void s_mqtt_request_response_python_client_destructor(PyObject *client_capsule) {
24+
struct mqtt_request_response_client_binding *client_binding =
25+
PyCapsule_GetPointer(client_capsule, s_capsule_name_mqtt_request_response_client);
26+
assert(client_binding);
27+
28+
client_binding->native = aws_mqtt_request_response_client_release(client_binding->native);
29+
30+
aws_mem_release(aws_py_get_allocator(), client_binding);
31+
}
32+
33+
/*
34+
* Returns success as true/false. If not successful, a python error will be set, so the caller does not need to check
35+
*/
36+
static bool s_init_mqtt_request_response_client_options(
37+
struct aws_mqtt_request_response_client_options *client_options,
38+
PyObject *client_options_py) {
39+
AWS_ZERO_STRUCT(*client_options);
40+
41+
uint32_t max_request_response_subscriptions = PyObject_GetAttrAsUint32(
42+
client_options_py,
43+
AWS_PYOBJECT_KEY_REQUEST_RESPONSE_CLIENT_OPTIONS,
44+
AWS_PYOBJECT_KEY_MAX_REQUEST_RESPONSE_SUBSCRIPTIONS);
45+
if (PyErr_Occurred()) {
46+
PyErr_Format(PyErr_Occurred(), "Cannot convert max_request_response_subscriptions to a C uint32");
47+
return false;
48+
}
49+
50+
uint32_t max_streaming_subscriptions = PyObject_GetAttrAsUint32(
51+
client_options_py,
52+
AWS_PYOBJECT_KEY_REQUEST_RESPONSE_CLIENT_OPTIONS,
53+
AWS_PYOBJECT_KEY_MAX_STREAMING_SUBSCRIPTIONS);
54+
if (PyErr_Occurred()) {
55+
PyErr_Format(PyErr_Occurred(), "Cannot convert max_streaming_subscriptions to a C uint32");
56+
return false;
57+
}
58+
59+
uint32_t timeout_in_seconds = PyObject_GetAttrAsUint32(
60+
client_options_py,
61+
AWS_PYOBJECT_KEY_REQUEST_RESPONSE_CLIENT_OPTIONS,
62+
AWS_PYOBJECT_KEY_OPERATION_TIMEOUT_IN_SECONDS);
63+
if (PyErr_Occurred()) {
64+
PyErr_Format(PyErr_Occurred(), "Cannot convert operation_timeout_in_seconds to a C uint32_t");
65+
return false;
66+
}
67+
68+
client_options->max_request_response_subscriptions = (size_t)max_request_response_subscriptions;
69+
client_options->max_streaming_subscriptions = (size_t)max_streaming_subscriptions;
70+
client_options->operation_timeout_seconds = (uint32_t)timeout_in_seconds;
71+
72+
return true;
73+
}
74+
75+
PyObject *aws_py_mqtt_request_response_client_new_from_5(PyObject *self, PyObject *args) {
76+
(void)self;
77+
78+
PyObject *mqtt5_client_py = NULL;
79+
PyObject *client_options_py = NULL;
80+
81+
if (!PyArg_ParseTuple(
82+
args,
83+
"OO",
84+
/* O */ &mqtt5_client_py,
85+
/* O */ &client_options_py)) {
86+
return NULL;
87+
}
88+
89+
struct aws_mqtt5_client *protocol_client = aws_py_get_mqtt5_client(mqtt5_client_py);
90+
if (protocol_client == NULL) {
91+
return NULL;
92+
}
93+
94+
struct aws_mqtt_request_response_client_options client_options;
95+
if (!s_init_mqtt_request_response_client_options(&client_options, client_options_py)) {
96+
return NULL;
97+
}
98+
99+
struct aws_allocator *allocator = aws_py_get_allocator();
100+
101+
struct aws_mqtt_request_response_client *rr_client =
102+
aws_mqtt_request_response_client_new_from_mqtt5_client(allocator, protocol_client, &client_options);
103+
if (rr_client == NULL) {
104+
PyErr_SetAwsLastError();
105+
return NULL;
106+
}
107+
108+
struct mqtt_request_response_client_binding *client_binding =
109+
aws_mem_calloc(allocator, 1, sizeof(struct mqtt_request_response_client_binding));
110+
// Python object that wraps a c struct and a function to call when its reference goes to zero
111+
PyObject *capsule = PyCapsule_New(
112+
client_binding, s_capsule_name_mqtt_request_response_client, s_mqtt_request_response_python_client_destructor);
113+
if (!capsule) {
114+
aws_mem_release(allocator, client_binding);
115+
aws_mqtt_request_response_client_release(rr_client);
116+
return NULL;
117+
}
118+
119+
client_binding->native = rr_client;
120+
121+
return capsule;
122+
}
123+
124+
PyObject *aws_py_mqtt_request_response_client_new_from_311(PyObject *self, PyObject *args) {
125+
(void)self;
126+
127+
PyObject *mqtt_connection_py = NULL;
128+
PyObject *client_options_py = NULL;
129+
130+
if (!PyArg_ParseTuple(
131+
args,
132+
"OO",
133+
/* O */ &mqtt_connection_py,
134+
/* O */ &client_options_py)) {
135+
return NULL;
136+
}
137+
138+
struct aws_mqtt_client_connection *protocol_client = aws_py_get_mqtt_client_connection(mqtt_connection_py);
139+
if (protocol_client == NULL) {
140+
return NULL;
141+
}
142+
143+
struct aws_mqtt_request_response_client_options client_options;
144+
if (!s_init_mqtt_request_response_client_options(&client_options, client_options_py)) {
145+
return NULL;
146+
}
147+
148+
struct aws_allocator *allocator = aws_py_get_allocator();
149+
150+
struct aws_mqtt_request_response_client *rr_client =
151+
aws_mqtt_request_response_client_new_from_mqtt311_client(allocator, protocol_client, &client_options);
152+
if (rr_client == NULL) {
153+
PyErr_SetAwsLastError();
154+
return NULL;
155+
}
156+
157+
struct mqtt_request_response_client_binding *client_binding =
158+
aws_mem_calloc(allocator, 1, sizeof(struct mqtt_request_response_client_binding));
159+
// Python object that wraps a c struct and a function to call when its reference goes to zero
160+
PyObject *capsule = PyCapsule_New(
161+
client_binding, s_capsule_name_mqtt_request_response_client, s_mqtt_request_response_python_client_destructor);
162+
if (!capsule) {
163+
aws_mem_release(allocator, client_binding);
164+
aws_mqtt_request_response_client_release(rr_client);
165+
return NULL;
166+
}
167+
168+
client_binding->native = rr_client;
169+
170+
return capsule;
171+
}
172+
173+
struct aws_mqtt_request_response_client *aws_py_get_mqtt_request_response_client(
174+
PyObject *mqtt_request_response_client) {
175+
AWS_PY_RETURN_NATIVE_FROM_BINDING(
176+
mqtt_request_response_client,
177+
s_capsule_name_mqtt_request_response_client,
178+
"Client",
179+
mqtt_request_response_client_binding);
180+
}

source/mqtt_request_response.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#ifndef AWS_CRT_PYTHON_MQTT_REQUEST_RESPONSE_H
2+
#define AWS_CRT_PYTHON_MQTT_REQUEST_RESPONSE_H
3+
/**
4+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
* SPDX-License-Identifier: Apache-2.0.
6+
*/
7+
8+
#include "module.h"
9+
10+
struct aws_mqtt_request_response_client;
11+
12+
PyObject *aws_py_mqtt_request_response_client_new_from_5(PyObject *self, PyObject *args);
13+
PyObject *aws_py_mqtt_request_response_client_new_from_311(PyObject *self, PyObject *args);
14+
15+
/* Given a python object, return a pointer to its underlying native type.
16+
* If NULL is returned, a python error has been set */
17+
struct aws_mqtt_request_response_client *aws_py_get_mqtt_request_response_client(
18+
PyObject *mqtt_request_response_client);
19+
20+
#endif /* AWS_CRT_PYTHON_MQTT_REQUEST_RESPONSE_H */

0 commit comments

Comments
 (0)