Skip to content

Commit fd03020

Browse files
committed
feat: (WIP) - placeholder. Adds timeout property to WriteOptions and proposed tests
1 parent b22870b commit fd03020

File tree

8 files changed

+231
-6
lines changed

8 files changed

+231
-6
lines changed

influxdb_client_3/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,11 +220,13 @@ def __init__(
220220
write_type = DefaultWriteOptions.write_type.value
221221
write_precision = DefaultWriteOptions.write_precision.value
222222
write_no_sync = DefaultWriteOptions.no_sync.value
223+
write_timeout = DefaultWriteOptions.timeout.value
223224
if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None:
224225
write_opts = write_client_options['write_options']
225226
write_type = getattr(write_opts, 'write_type', write_type)
226227
write_precision = getattr(write_opts, 'write_precision', write_precision)
227228
write_no_sync = getattr(write_opts, 'no_sync', write_no_sync)
229+
write_timeout = getattr(write_opts, 'timeout', write_timeout)
228230

229231
write_options = WriteOptions(
230232
write_type=write_type,
@@ -253,6 +255,7 @@ def __init__(
253255
url=f"{scheme}://{hostname}:{port}",
254256
token=self._token,
255257
org=self._org,
258+
timeout=write_timeout,
256259
**kwargs)
257260

258261
self._write_api = _WriteApi(influxdb_client=self._client, **self._write_client_options)
@@ -344,6 +347,7 @@ def write(self, record=None, database=None, **kwargs):
344347
:type database: str
345348
:param kwargs: Additional arguments to pass to the write API.
346349
"""
350+
print(f"DEBUG InfluxDBClient3.write {record}")
347351
if database is None:
348352
database = self._database
349353

influxdb_client_3/write_client/_sync/api_client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,14 +357,18 @@ def call_api(self, resource_path, method,
357357
If parameter async_req is False or missing,
358358
then the method will return the response directly.
359359
"""
360+
print("DEBUG ApiClient.call_api()")
360361
if not async_req:
362+
print(" DEBUG synchronous call")
361363
return self.__call_api(resource_path, method,
362364
path_params, query_params, header_params,
363365
body, post_params, files,
364366
response_type, auth_settings,
365367
_return_http_data_only, collection_formats,
366368
_preload_content, _request_timeout, urlopen_kw)
367369
else:
370+
# TODO possible refactor - async handler inside package `_sync`?
371+
print(" DEBUG asynchronous call")
368372
thread = self.pool.apply_async(self.__call_api, (resource_path,
369373
method, path_params, query_params,
370374
header_params, body,

influxdb_client_3/write_client/client/write_api.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from influxdb_client_3.write_client.rest import _UTF_8_encoding
2626

2727
DEFAULT_WRITE_NO_SYNC = False
28+
DEFAULT_WRITE_TIMEOUT = 10_000
2829

2930
logger = logging.getLogger('influxdb_client_3.write_client.client.write_api')
3031

@@ -45,6 +46,7 @@ class DefaultWriteOptions(Enum):
4546
write_type = WriteType.synchronous
4647
write_precision = DEFAULT_WRITE_PRECISION
4748
no_sync = DEFAULT_WRITE_NO_SYNC
49+
timeout = DEFAULT_WRITE_TIMEOUT
4850

4951

5052
class WriteOptions(object):
@@ -61,6 +63,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
6163
max_close_wait=300_000,
6264
write_precision=DEFAULT_WRITE_PRECISION,
6365
no_sync=DEFAULT_WRITE_NO_SYNC,
66+
timeout=DEFAULT_WRITE_TIMEOUT,
6467
write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
6568
"""
6669
Create write api configuration.
@@ -93,6 +96,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
9396
self.write_scheduler = write_scheduler
9497
self.max_close_wait = max_close_wait
9598
self.write_precision = write_precision
99+
self.timeout = timeout
96100
self.no_sync = no_sync
97101

98102
def to_retry_strategy(self, **kwargs):
@@ -268,6 +272,8 @@ def __init__(self,
268272
# Define Subject that listen incoming data and produces writes into InfluxDB
269273
self._subject = Subject()
270274

275+
print(f"DEBUG batching write with subject {self._subject}")
276+
271277
self._disposable = self._subject.pipe(
272278
# Split incoming data to windows by batch_size or flush_interval
273279
ops.window_with_time_or_count(count=write_options.batch_size,
@@ -372,6 +378,7 @@ def write(self, bucket: str, org: str = None,
372378
373379
""" # noqa: E501
374380
org = get_org_query_param(org=org, client=self._influxdb_client)
381+
print("DEBUG WriteApi.write()")
375382

376383
self._append_default_tags(record)
377384

@@ -390,13 +397,21 @@ def write(self, bucket: str, org: str = None,
390397
_async_req = True if self._write_options.write_type == WriteType.asynchronous else False
391398

392399
def write_payload(payload):
400+
print("DEBUG WriteApi.write_payload()")
393401
final_string = b'\n'.join(payload[1])
394-
return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync)
402+
result = self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync)
403+
# return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync)
404+
print(f" DEBUG write_payload() result {result}")
405+
return result
395406

396407
results = list(map(write_payload, payloads.items()))
408+
print(f" DEBUG WriteApi.write() results {results}")
397409
if not _async_req:
410+
print(f" --->DEBUG not async_request")
398411
return None
399412
elif len(results) == 1:
413+
# TODO if results contains error or exception ensure handled
414+
print(f" --->DEBUG async_request() results {results[0]}")
400415
return results[0]
401416
return results
402417

@@ -464,11 +479,13 @@ def __del__(self):
464479
def _write_batching(self, bucket, org, data,
465480
precision=None,
466481
**kwargs):
482+
print("DEBUG _write_batching()")
467483
if precision is None:
468484
precision = self._write_options.write_precision
469485

470486
if isinstance(data, bytes):
471487
_key = _BatchItemKey(bucket, org, precision)
488+
print(f" DEBUG _write_batching() data bytes {_key}")
472489
self._subject.on_next(_BatchItem(key=_key, data=data))
473490

474491
elif isinstance(data, str):
@@ -520,6 +537,8 @@ def _write_batching(self, bucket, org, data,
520537

521538
def _http(self, batch_item: _BatchItem):
522539

540+
print("DEBUG _http()")
541+
523542
logger.debug("Write time series data into InfluxDB: %s", batch_item)
524543

525544
if self._retry_callback:
@@ -540,6 +559,7 @@ def _retry_callback_delegate(exception):
540559
return _BatchResponse(data=batch_item)
541560

542561
def _post_write(self, _async_req, bucket, org, body, precision, no_sync, **kwargs):
562+
print("DEBUG write_api._post_write()")
543563

544564
return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision,
545565
no_sync=no_sync,
@@ -549,6 +569,8 @@ def _post_write(self, _async_req, bucket, org, body, precision, no_sync, **kwarg
549569

550570
def _to_response(self, data: _BatchItem, delay: timedelta):
551571

572+
print("DEBUG _to_response()")
573+
552574
return rx.of(data).pipe(
553575
ops.subscribe_on(self._write_options.write_scheduler),
554576
# use delay if its specified

influxdb_client_3/write_client/service/write_service.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,15 @@ def post_write(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403
4242
returns the request thread.
4343
""" # noqa: E501
4444
kwargs['_return_http_data_only'] = True
45+
print("DEBUG WriteService.post_write()")
4546
if kwargs.get('async_req'):
46-
return self.post_write_with_http_info(org, bucket, body, **kwargs) # noqa: E501
47+
print(" DEBUG making asynchronous request ")
48+
thread = self.post_write_with_http_info(org, bucket, body, **kwargs) # noqa: E501
49+
print(f" DEBUG thread: {thread} ")
50+
# return self.post_write_with_http_info(org, bucket, body, **kwargs) # noqa: E501
51+
return thread
4752
else:
53+
print(" DEBUG making synchronous request ")
4854
(data) = self.post_write_with_http_info(org, bucket, body, **kwargs) # noqa: E501
4955
return data
5056

@@ -72,10 +78,14 @@ def post_write_with_http_info(self, org, bucket, body, **kwargs): # noqa: E501,
7278
:return: None
7379
If the method is called asynchronously,
7480
returns the request thread.
75-
""" # noqa: E501
81+
"""
82+
print("WriteService.post_write_with_http_info()")
83+
# noqa: E501
7684
local_var_params, path, path_params, query_params, header_params, body_params = \
7785
self._post_write_prepare(org, bucket, body, **kwargs) # noqa: E501
7886

87+
print(f"DEBUG local_var_params: {local_var_params}")
88+
7989
try:
8090
return self.api_client.call_api(
8191
path, 'POST',
@@ -146,6 +156,8 @@ async def post_write_async(self, org, bucket, body, **kwargs): # noqa: E501,D40
146156
def _post_write_prepare(self, org, bucket, body, **kwargs): # noqa: E501,D401,D403
147157
local_var_params = dict(locals())
148158

159+
print(f"DEBUG local_var_params: {local_var_params}")
160+
149161
all_params = ['org', 'bucket', 'body', 'zap_trace_span', 'content_encoding', 'content_type', 'content_length', 'accept', 'org_id', 'precision', 'no_sync'] # noqa: E501
150162
self._check_operation_params('post_write', all_params, local_var_params)
151163
# verify the required parameter 'org' is set

tests/test_api_client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@ def test_api_error_headers(self):
140140
self.assertEqual(headers['X-Influxdb-Request-Id'], requestid)
141141
self.assertEqual(headers['X-Influxdb-Build'], 'Mock')
142142

143+
def test_request_timeout_from_config(self):
144+
# TODO
145+
conf = Configuration()
146+
local_client = ApiClient(conf)
147+
143148
def test_should_gzip(self):
144149
# Test when gzip is disabled
145150
self.assertFalse(ApiClient.should_gzip("test", enable_gzip=False, gzip_threshold=1))

tests/test_influxdb_client_3.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
from pytest_httpserver import HTTPServer
66

7-
from influxdb_client_3 import InfluxDBClient3, WritePrecision, DefaultWriteOptions, Point, WriteOptions, WriteType
7+
from influxdb_client_3 import InfluxDBClient3, WritePrecision, DefaultWriteOptions, Point, WriteOptions, WriteType, \
8+
write_client_options
89
from influxdb_client_3.exceptions import InfluxDB3ClientQueryError
910
from influxdb_client_3.write_client.rest import ApiException
1011
from tests.util import asyncio_run
@@ -66,6 +67,51 @@ def test_token_auth_scheme_explicit(self):
6667
)
6768
self.assertEqual(client._client.auth_header_value, "my_scheme my_token")
6869

70+
def test_write_options(self):
71+
client = InfluxDBClient3(
72+
host="localhost",
73+
org="my_org",
74+
token="my_token",
75+
auth_scheme="my_scheme",
76+
write_client_options=write_client_options(
77+
success_callback=lambda _: True,
78+
error_callback=lambda _: False,
79+
extra_arg="ignored",
80+
write_options=WriteOptions(write_type=WriteType.synchronous,
81+
max_retries=0,
82+
max_retry_time=0,
83+
max_retry_delay=0,
84+
timeout=30_000,
85+
flush_interval=500,))
86+
)
87+
88+
assert isinstance(client._write_client_options["write_options"], WriteOptions)
89+
assert client._write_client_options["success_callback"]("an_arg") == True
90+
assert client._write_client_options["error_callback"]("an_arg") == False
91+
assert client._write_client_options["extra_arg"] == "ignored"
92+
assert client._write_client_options["write_options"].timeout == 30_000
93+
assert client._write_client_options["write_options"].max_retries == 0
94+
assert client._write_client_options["write_options"].max_retry_time == 0
95+
assert client._write_client_options["write_options"].max_retry_delay == 0
96+
assert client._write_client_options["write_options"].write_type == WriteType.synchronous
97+
assert client._write_client_options["write_options"].flush_interval == 500
98+
print(f"DEBUG client._client._base._Configuration {client._client.conf.__dict__}")
99+
print(f"DEBUG client._client._base._Configuration.timeout {client._client.conf.timeout}")
100+
101+
102+
def test_default_write_options(self):
103+
client = InfluxDBClient3(
104+
host="localhost",
105+
token="my_token",
106+
org="my_org",
107+
database="my_db",
108+
)
109+
110+
assert client._write_client_options["write_options"].write_type == DefaultWriteOptions.write_type.value
111+
assert client._write_client_options["write_options"].no_sync == DefaultWriteOptions.no_sync.value
112+
assert client._write_client_options["write_options"].write_precision == DefaultWriteOptions.write_precision.value
113+
assert client._write_client_options["write_options"].timeout == DefaultWriteOptions.timeout.value
114+
69115
@asyncio_run
70116
async def test_query_async(self):
71117
with ConstantFlightServer() as server:

tests/test_influxdb_client_3_integration.py

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
import time
88
import unittest
99

10-
from influxdb_client_3 import InfluxDBClient3, write_client_options, WriteOptions
10+
from urllib3.exceptions import MaxRetryError, ConnectTimeoutError
11+
12+
from influxdb_client_3 import InfluxDBClient3, write_client_options, WriteOptions, SYNCHRONOUS, flight_client_options, \
13+
WriteType
1114
from influxdb_client_3.exceptions import InfluxDBError
1215
from tests.util import asyncio_run, lp_to_py_object
1316

@@ -247,3 +250,110 @@ async def test_verify_query_async(self):
247250
def test_get_server_version(self):
248251
version = self.client.get_server_version()
249252
assert version is not None
253+
254+
# TODO set sync test, also investigate behavior with batcher and retry
255+
# TODO do these need to be run with integration - won't mock suffice?
256+
def test_write_timeout_sync(self):
257+
258+
ErrorRecord = None
259+
def set_error_record(error):
260+
nonlocal ErrorRecord
261+
ErrorRecord = error
262+
263+
with pytest.raises(ConnectTimeoutError) as e:
264+
localClient = InfluxDBClient3(
265+
host=self.host,
266+
database=self.database,
267+
token=self.token,
268+
write_client_options=flight_client_options(
269+
error_callback=set_error_record,
270+
write_options=WriteOptions(
271+
max_retry_time=0,
272+
timeout=20,
273+
write_type=WriteType.synchronous
274+
)
275+
)
276+
)
277+
278+
localClient.write("test_write_timeout,location=harfa fVal=3.14,iVal=42i")
279+
280+
281+
@pytest.mark.skip(reason="placeholder - partially implemented")
282+
@asyncio_run
283+
async def test_write_timeout_async(self):
284+
# fco = flight_client_options(max_retries=10, timeout=30_000)
285+
# print(f"DEBUG fco: {fco}")
286+
# TODO ensure API can handle either callback or thrown exception
287+
# TODO asserts based on solution
288+
289+
ErrorRecord = None
290+
def set_error_record(error):
291+
nonlocal ErrorRecord
292+
ErrorRecord = error
293+
294+
295+
localClient = InfluxDBClient3(
296+
host=self.host,
297+
database=self.database,
298+
token=self.token,
299+
write_client_options=flight_client_options(
300+
error_callback=set_error_record,
301+
write_options=WriteOptions(
302+
max_retry_time=0,
303+
timeout=20,
304+
write_type=WriteType.asynchronous
305+
)
306+
)
307+
)
308+
309+
print(f"DEBUG localClient._write_client_options: {localClient._write_client_options['write_options'].__dict__}")
310+
print(f"DEBUG localClient._client._base._Configuration {localClient._client.conf.timeout}")
311+
312+
applyResult = localClient.write("test_write_timeout,location=harfa fVal=3.14,iVal=42i")
313+
print(f"DEBUG applyResult: {applyResult}")
314+
result = applyResult.get()
315+
print(f"DEBUG result: {result}")
316+
317+
318+
def test_write_timeout_batching(self):
319+
320+
ErrorResult = {"rt": None, "rd": None, "rx": None}
321+
322+
def set_error_result(rt, rd, rx):
323+
nonlocal ErrorResult
324+
ErrorResult = {"rt": rt, "rd": rd, "rx": rx}
325+
326+
localClient = InfluxDBClient3(
327+
host=self.host,
328+
database=self.database,
329+
token=self.token,
330+
write_client_options=flight_client_options(
331+
error_callback=set_error_result,
332+
write_options=WriteOptions(
333+
max_retry_time=0,
334+
timeout=20,
335+
write_type=WriteType.batching,
336+
max_retries=1,
337+
batch_size=1,
338+
)
339+
)
340+
)
341+
lp = "test_write_timeout,location=harfa fVal=3.14,iVal=42i"
342+
localClient.write(lp)
343+
344+
# wait for batcher attempt last write retry
345+
time.sleep(0.1)
346+
347+
assert ErrorResult["rt"] == (self.database, 'default', 'ns')
348+
assert ErrorResult["rd"] is not None
349+
assert isinstance(ErrorResult["rd"], bytes)
350+
assert ErrorResult["rd"].decode('utf-8') == lp
351+
assert ErrorResult["rx"] is not None
352+
assert isinstance(ErrorResult["rx"], MaxRetryError)
353+
mre = ErrorResult["rx"]
354+
assert isinstance(mre.reason, ConnectTimeoutError)
355+
356+
@pytest.mark.skip("place holder")
357+
def test_write_timeout_retry(self):
358+
# TODO
359+
print("DEBUG test_write_timeout_retry")

0 commit comments

Comments
 (0)