Skip to content

Commit c4a41ab

Browse files
bretambroseBret Ambrose
andauthored
Retry timeouts (awslabs#687)
Co-authored-by: Bret Ambrose <bambrose@amazon.com>
1 parent e6a48db commit c4a41ab

File tree

7 files changed

+616
-180
lines changed

7 files changed

+616
-180
lines changed

test/__init__.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,8 @@
1010

1111
from awscrt import NativeResource
1212
from awscrt._test import check_for_leaks
13-
from awscrt.io import init_logging, LogLevel
13+
import time
1414
import unittest
15-
import sys
1615

1716
TIMEOUT = 30.0
1817

@@ -57,3 +56,23 @@ def tearDown(self):
5756
except Exception:
5857
NativeResourceTest._previous_test_failed = True
5958
raise
59+
60+
61+
MAX_RETRIES = 5
62+
63+
64+
def _is_retryable_exception(e):
65+
exception_text = str(e)
66+
return "AWS_IO_TLS_NEGOTIATION_TIMEOUT" in exception_text or "AWS_IO_SOCKET_TIMEOUT" in exception_text
67+
68+
69+
def test_retry_wrapper(test_function):
70+
for i in range(MAX_RETRIES):
71+
try:
72+
test_function()
73+
return
74+
except Exception as e:
75+
if _is_retryable_exception(e) and i + 1 < MAX_RETRIES:
76+
time.sleep(1)
77+
else:
78+
raise

test/test_mqtt.py

Lines changed: 93 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from awscrt.io import ClientBootstrap, ClientTlsContext, DefaultHostResolver, EventLoopGroup, Pkcs11Lib, TlsContextOptions
55
from awscrt import http
66
from awscrt.mqtt import Client, Connection, QoS, Will, OnConnectionClosedData, OnConnectionFailureData, OnConnectionSuccessData, ConnectReturnCode
7-
from test import NativeResourceTest
7+
from test import test_retry_wrapper, NativeResourceTest
88
from concurrent.futures import Future
99
import os
1010
import unittest
@@ -59,7 +59,7 @@ def _create_connection(
5959
on_connection_resumed=on_connection_resumed_callback)
6060
return connection
6161

62-
def test_connect_disconnect(self):
62+
def _test_connect_disconnect(self):
6363
test_input_endpoint = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_HOST")
6464
test_input_cert = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_CERT")
6565
test_input_key = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_KEY")
@@ -71,7 +71,10 @@ def test_connect_disconnect(self):
7171
connection.connect().result(TIMEOUT)
7272
connection.disconnect().result(TIMEOUT)
7373

74-
def test_ecc_connect_disconnect(self):
74+
def test_connect_disconnect(self):
75+
test_retry_wrapper(self._test_connect_disconnect)
76+
77+
def _test_ecc_connect_disconnect(self):
7578
test_input_endpoint = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_HOST")
7679
test_input_cert = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_ECC_CERT")
7780
test_input_key = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_ECC_KEY")
@@ -83,7 +86,10 @@ def test_ecc_connect_disconnect(self):
8386
connection.connect().result(TIMEOUT)
8487
connection.disconnect().result(TIMEOUT)
8588

86-
def test_pkcs11(self):
89+
def test_ecc_connect_disconnect(self):
90+
test_retry_wrapper(self._test_ecc_connect_disconnect)
91+
92+
def _test_pkcs11(self):
8793
test_input_endpoint = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_HOST")
8894
test_input_pkcs11_lib = _get_env_variable("AWS_TEST_PKCS11_LIB")
8995
test_input_pkcs11_pin = _get_env_variable("AWS_TEST_PKCS11_PIN")
@@ -105,7 +111,10 @@ def test_pkcs11(self):
105111
connection.connect().result(TIMEOUT)
106112
connection.disconnect().result(TIMEOUT)
107113

108-
def test_pub_sub(self):
114+
def test_pkcs11(self):
115+
test_retry_wrapper(self._test_pkcs11)
116+
117+
def _test_pub_sub(self):
109118
self.TEST_TOPIC = '/test/me/senpai/' + str(uuid.uuid4())
110119
test_input_endpoint = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_HOST")
111120
test_input_cert = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_CERT")
@@ -148,7 +157,10 @@ def on_message(**kwargs):
148157
# disconnect
149158
connection.disconnect().result(TIMEOUT)
150159

151-
def test_will(self):
160+
def test_pub_sub(self):
161+
test_retry_wrapper(self._test_pub_sub)
162+
163+
def _test_will(self):
152164
self.TEST_TOPIC = '/test/me/senpai/' + str(uuid.uuid4())
153165
test_input_endpoint = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_HOST")
154166
test_input_cert = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_CERT")
@@ -239,7 +251,10 @@ def on_message(**kwargs):
239251
# disconnect
240252
subscriber.disconnect().result(TIMEOUT)
241253

242-
def test_on_message(self):
254+
def test_will(self):
255+
test_retry_wrapper(self._test_will)
256+
257+
def _test_on_message(self):
243258
test_input_endpoint = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_HOST")
244259
test_input_cert = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_CERT")
245260
test_input_key = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_KEY")
@@ -275,7 +290,10 @@ def on_message(**kwargs):
275290
# disconnect
276291
connection.disconnect().result(TIMEOUT)
277292

278-
def test_on_message_old_fn_signature(self):
293+
def test_on_message(self):
294+
test_retry_wrapper(self._test_on_message)
295+
296+
def _test_on_message_old_fn_signature(self):
279297
# ensure that message-received callbacks with the old function signature still work
280298

281299
test_input_endpoint = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_HOST")
@@ -320,7 +338,10 @@ def on_sub_message(topic, payload):
320338
# disconnect
321339
connection.disconnect().result(TIMEOUT)
322340

323-
def test_connect_disconnect_with_default_singletons(self):
341+
def test_on_message_old_fn_signature(self):
342+
test_retry_wrapper(self._test_on_message_old_fn_signature)
343+
344+
def _test_connect_disconnect_with_default_singletons(self):
324345
test_input_endpoint = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_HOST")
325346
test_input_cert = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_CERT")
326347
test_input_key = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_KEY")
@@ -337,7 +358,10 @@ def test_connect_disconnect_with_default_singletons(self):
337358
EventLoopGroup.release_static_default()
338359
DefaultHostResolver.release_static_default()
339360

340-
def test_connect_publish_wait_statistics_disconnect(self):
361+
def test_connect_disconnect_with_default_singletons(self):
362+
test_retry_wrapper(self._test_connect_disconnect_with_default_singletons)
363+
364+
def _test_connect_publish_wait_statistics_disconnect(self):
341365
test_input_endpoint = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_HOST")
342366
test_input_cert = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_CERT")
343367
test_input_key = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_KEY")
@@ -369,7 +393,10 @@ def test_connect_publish_wait_statistics_disconnect(self):
369393
# disconnect
370394
connection.disconnect().result(TIMEOUT)
371395

372-
def test_connect_publish_statistics_wait_disconnect(self):
396+
def test_connect_publish_wait_statistics_disconnect(self):
397+
test_retry_wrapper(self._test_connect_publish_wait_statistics_disconnect)
398+
399+
def _test_connect_publish_statistics_wait_disconnect(self):
373400
test_input_endpoint = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_HOST")
374401
test_input_cert = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_CERT")
375402
test_input_key = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_KEY")
@@ -409,7 +436,10 @@ def test_connect_publish_statistics_wait_disconnect(self):
409436
# disconnect
410437
connection.disconnect().result(TIMEOUT)
411438

412-
def test_connect_disconnect_with_callbacks_happy(self):
439+
def test_connect_publish_statistics_wait_disconnect(self):
440+
test_retry_wrapper(self._test_connect_publish_statistics_wait_disconnect)
441+
442+
def _test_connect_disconnect_with_callbacks_happy(self):
413443
test_input_endpoint = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_HOST")
414444
test_input_cert = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_CERT")
415445
test_input_key = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_KEY")
@@ -442,7 +472,10 @@ def on_connection_closed_callback(connection, callback_data: OnConnectionClosedD
442472
connection.disconnect().result(TIMEOUT)
443473
on_connection_closed_future.result(TIMEOUT)
444474

445-
def test_connect_disconnect_with_callbacks_unhappy(self):
475+
def test_connect_disconnect_with_callbacks_happy(self):
476+
test_retry_wrapper(self._test_connect_disconnect_with_callbacks_happy)
477+
478+
def _test_connect_disconnect_with_callbacks_unhappy(self):
446479
test_input_endpoint = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_HOST")
447480
test_input_cert = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_CERT")
448481
test_input_key = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_KEY")
@@ -478,7 +511,10 @@ def on_connection_closed_callback(connection, callback_data: OnConnectionClosedD
478511
failure_data = on_onnection_failure_future.result(TIMEOUT)
479512
self.assertTrue(failure_data['error'] is not None)
480513

481-
def test_connect_disconnect_with_callbacks_happy_on_resume(self):
514+
def test_connect_disconnect_with_callbacks_unhappy(self):
515+
test_retry_wrapper(self._test_connect_disconnect_with_callbacks_unhappy)
516+
517+
def _test_connect_disconnect_with_callbacks_happy_on_resume(self):
482518
# Check that an on_connection_success callback fires on a resumed connection.
483519

484520
# NOTE Since there is no mocked server available on this abstraction level, the only sensible approach
@@ -517,6 +553,10 @@ def on_connection_resumed_callback(connection, return_code: ConnectReturnCode, s
517553
self.assertEqual(success_data['return_code'], ConnectReturnCode.ACCEPTED)
518554
self.assertEqual(success_data['session_present'], False)
519555

556+
# Putting a sleep here helps prevent a "race" condition in IoT Core where the second connection can get
557+
# rejected rather than the first disconnected.
558+
time.sleep(5)
559+
520560
# Reset the future for the reconnect attempt.
521561
on_connection_success_future = Future()
522562

@@ -547,11 +587,14 @@ def on_connection_success_callback_dup(connection, callback_data: OnConnectionSu
547587
connection.disconnect().result(TIMEOUT)
548588
on_connection_closed_future.result(TIMEOUT)
549589

590+
def test_connect_disconnect_with_callbacks_happy_on_resume(self):
591+
test_retry_wrapper(self._test_connect_disconnect_with_callbacks_happy_on_resume)
592+
550593
# ==============================================================
551594
# MOSQUITTO CONNECTION TESTS
552595
# ==============================================================
553596

554-
def test_mqtt311_direct_connect_minimum(self):
597+
def _test_mqtt311_direct_connect_minimum(self):
555598
input_host_name = _get_env_variable("AWS_TEST_MQTT311_DIRECT_MQTT_HOST")
556599
input_port = int(_get_env_variable("AWS_TEST_MQTT311_DIRECT_MQTT_PORT"))
557600

@@ -567,7 +610,10 @@ def test_mqtt311_direct_connect_minimum(self):
567610
connection.connect().result(TIMEOUT)
568611
connection.disconnect().result(TIMEOUT)
569612

570-
def test_mqtt311_direct_connect_basic_auth(self):
613+
def test_mqtt311_direct_connect_minimum(self):
614+
test_retry_wrapper(self._test_mqtt311_direct_connect_minimum)
615+
616+
def _test_mqtt311_direct_connect_basic_auth(self):
571617
input_host_name = _get_env_variable("AWS_TEST_MQTT311_DIRECT_MQTT_BASIC_AUTH_HOST")
572618
input_port = int(_get_env_variable("AWS_TEST_MQTT311_DIRECT_MQTT_BASIC_AUTH_PORT"))
573619
input_username = _get_env_variable("AWS_TEST_MQTT311_BASIC_AUTH_USERNAME")
@@ -587,7 +633,10 @@ def test_mqtt311_direct_connect_basic_auth(self):
587633
connection.connect().result(TIMEOUT)
588634
connection.disconnect().result(TIMEOUT)
589635

590-
def test_mqtt311_direct_connect_tls(self):
636+
def test_mqtt311_direct_connect_basic_auth(self):
637+
test_retry_wrapper(self._test_mqtt311_direct_connect_basic_auth)
638+
639+
def _test_mqtt311_direct_connect_tls(self):
591640
input_host_name = _get_env_variable("AWS_TEST_MQTT311_DIRECT_MQTT_TLS_HOST")
592641
input_port = int(_get_env_variable("AWS_TEST_MQTT311_DIRECT_MQTT_TLS_PORT"))
593642

@@ -605,7 +654,10 @@ def test_mqtt311_direct_connect_tls(self):
605654
connection.connect().result(TIMEOUT)
606655
connection.disconnect().result(TIMEOUT)
607656

608-
def test_mqtt311_direct_connect_mutual_tls(self):
657+
def test_mqtt311_direct_connect_tls(self):
658+
test_retry_wrapper(self._test_mqtt311_direct_connect_tls)
659+
660+
def _test_mqtt311_direct_connect_mutual_tls(self):
609661
input_cert = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_CERT")
610662
input_key = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_RSA_KEY")
611663
input_host = _get_env_variable("AWS_TEST_MQTT311_IOT_CORE_HOST")
@@ -626,7 +678,10 @@ def test_mqtt311_direct_connect_mutual_tls(self):
626678
connection.connect().result(TIMEOUT)
627679
connection.disconnect().result(TIMEOUT)
628680

629-
def test_mqtt311_direct_connect_http_proxy_tls(self):
681+
def test_mqtt311_direct_connect_mutual_tls(self):
682+
test_retry_wrapper(self._test_mqtt311_direct_connect_mutual_tls)
683+
684+
def _test_mqtt311_direct_connect_http_proxy_tls(self):
630685
input_proxy_host = _get_env_variable("AWS_TEST_MQTT311_PROXY_HOST")
631686
input_proxy_port = int(_get_env_variable("AWS_TEST_MQTT311_PROXY_PORT"))
632687
input_host_name = _get_env_variable("AWS_TEST_MQTT311_DIRECT_MQTT_TLS_HOST")
@@ -655,7 +710,10 @@ def test_mqtt311_direct_connect_http_proxy_tls(self):
655710
connection.connect().result(TIMEOUT)
656711
connection.disconnect().result(TIMEOUT)
657712

658-
def test_mqtt311_websocket_connect_minimum(self):
713+
def test_mqtt311_direct_connect_http_proxy_tls(self):
714+
test_retry_wrapper(self._test_mqtt311_direct_connect_http_proxy_tls)
715+
716+
def _test_mqtt311_websocket_connect_minimum(self):
659717
input_host_name = _get_env_variable("AWS_TEST_MQTT311_WS_MQTT_HOST")
660718
input_port = int(_get_env_variable("AWS_TEST_MQTT311_WS_MQTT_PORT"))
661719

@@ -677,7 +735,10 @@ def sign_function(transform_args, **kwargs):
677735
connection.connect().result(TIMEOUT)
678736
connection.disconnect().result(TIMEOUT)
679737

680-
def test_mqtt311_websocket_connect_basic_auth(self):
738+
def test_mqtt311_websocket_connect_minimum(self):
739+
test_retry_wrapper(self._test_mqtt311_websocket_connect_minimum)
740+
741+
def _test_mqtt311_websocket_connect_basic_auth(self):
681742
input_host_name = _get_env_variable("AWS_TEST_MQTT311_WS_MQTT_BASIC_AUTH_HOST")
682743
input_port = int(_get_env_variable("AWS_TEST_MQTT311_WS_MQTT_BASIC_AUTH_PORT"))
683744
input_username = _get_env_variable("AWS_TEST_MQTT311_BASIC_AUTH_USERNAME")
@@ -703,7 +764,10 @@ def sign_function(transform_args, **kwargs):
703764
connection.connect().result(TIMEOUT)
704765
connection.disconnect().result(TIMEOUT)
705766

706-
def test_mqtt311_websocket_connect_tls(self):
767+
def test_mqtt311_websocket_connect_basic_auth(self):
768+
test_retry_wrapper(self._test_mqtt311_websocket_connect_basic_auth)
769+
770+
def _test_mqtt311_websocket_connect_tls(self):
707771
input_host_name = _get_env_variable("AWS_TEST_MQTT311_WS_MQTT_TLS_HOST")
708772
input_port = int(_get_env_variable("AWS_TEST_MQTT311_WS_MQTT_TLS_PORT"))
709773

@@ -727,7 +791,10 @@ def sign_function(transform_args, **kwargs):
727791
connection.connect().result(TIMEOUT)
728792
connection.disconnect().result(TIMEOUT)
729793

730-
def test_mqtt311_websocket_connect_http_proxy_tls(self):
794+
def test_mqtt311_websocket_connect_tls(self):
795+
test_retry_wrapper(self._test_mqtt311_websocket_connect_tls)
796+
797+
def _test_mqtt311_websocket_connect_http_proxy_tls(self):
731798
input_proxy_host = _get_env_variable("AWS_TEST_MQTT311_PROXY_HOST")
732799
input_proxy_port = int(_get_env_variable("AWS_TEST_MQTT311_PROXY_PORT"))
733800
input_host_name = _get_env_variable("AWS_TEST_MQTT311_WS_MQTT_TLS_HOST")
@@ -761,6 +828,9 @@ def sign_function(transform_args, **kwargs):
761828
connection.connect().result(TIMEOUT)
762829
connection.disconnect().result(TIMEOUT)
763830

831+
def test_mqtt311_websocket_connect_http_proxy_tls(self):
832+
test_retry_wrapper(self._test_mqtt311_websocket_connect_http_proxy_tls)
833+
764834

765835
if __name__ == 'main':
766836
unittest.main()

0 commit comments

Comments
 (0)