Skip to content

Commit 1f2e24e

Browse files
authored
BUG FIX: issue:493 Loss of task_id when using retry middlewares (#496)
* issue:493 add: test case add: task_id for retry scheduled task --------- Co-authored-by: SARomanchuk
1 parent 3b714ae commit 1f2e24e

File tree

5 files changed

+52
-1
lines changed

5 files changed

+52
-1
lines changed

taskiq/kicker.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,10 @@ def with_labels(
6969
self.labels.update(labels)
7070
return self
7171

72-
def with_task_id(self, task_id: str) -> "AsyncKicker[_FuncParams, _ReturnType]":
72+
def with_task_id(
73+
self,
74+
task_id: Optional[str],
75+
) -> "AsyncKicker[_FuncParams, _ReturnType]":
7376
"""
7477
Set task_id for current execution.
7578
@@ -208,6 +211,7 @@ async def schedule_by_cron(
208211
labels=message.labels,
209212
args=message.args,
210213
kwargs=message.kwargs,
214+
task_id=self.custom_task_id,
211215
cron=cron_str,
212216
cron_offset=cron_offset,
213217
)
@@ -239,6 +243,7 @@ async def schedule_by_time(
239243
labels=message.labels,
240244
args=message.args,
241245
kwargs=message.kwargs,
246+
task_id=self.custom_task_id,
242247
time=time,
243248
)
244249
await source.add_schedule(scheduled)

taskiq/scheduler/scheduled_task/v1.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class ScheduledTask(BaseModel):
1212
labels: Dict[str, Any]
1313
args: List[Any]
1414
kwargs: Dict[str, Any]
15+
task_id: Optional[str] = None
1516
schedule_id: str = Field(default_factory=lambda: uuid.uuid4().hex)
1617
cron: Optional[str] = None
1718
cron_offset: Optional[Union[str, timedelta]] = None

taskiq/scheduler/scheduled_task/v2.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class ScheduledTask(BaseModel):
1313
labels: Dict[str, Any]
1414
args: List[Any]
1515
kwargs: Dict[str, Any]
16+
task_id: Optional[str] = None
1617
schedule_id: str = Field(default_factory=lambda: uuid.uuid4().hex)
1718
cron: Optional[str] = None
1819
cron_offset: Optional[Union[str, timedelta]] = None

taskiq/scheduler/scheduler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ async def on_ready(self, source: "ScheduleSource", task: ScheduledTask) -> None:
5151
.with_labels(
5252
schedule_id=task.schedule_id,
5353
)
54+
.with_task_id(task_id=task.task_id)
5455
.kiq(
5556
*task.args,
5657
**task.kwargs,

tests/test_retry_task.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import pytest
2+
3+
from taskiq import (
4+
Context,
5+
InMemoryBroker,
6+
SmartRetryMiddleware,
7+
TaskiqDepends,
8+
TaskiqScheduler,
9+
)
10+
from taskiq.schedule_sources import LabelScheduleSource
11+
12+
13+
@pytest.mark.parametrize(
14+
"retry_count",
15+
range(5),
16+
)
17+
@pytest.mark.anyio
18+
async def test_save_task_id_for_retry(retry_count: int) -> None:
19+
broker = InMemoryBroker().with_middlewares(
20+
SmartRetryMiddleware(
21+
default_retry_count=retry_count + 1,
22+
default_delay=0.1,
23+
),
24+
)
25+
scheduler = TaskiqScheduler(broker, [LabelScheduleSource(broker)])
26+
27+
check_interval = 0.5
28+
29+
@broker.task("exc_task", retry_on_error=True)
30+
async def exc_task(count: int = 0, context: "Context" = TaskiqDepends()) -> int:
31+
retry = int(context.message.labels.get("_retries", 0))
32+
if retry < count:
33+
raise Exception("test")
34+
return retry
35+
36+
await broker.startup()
37+
await scheduler.startup()
38+
39+
task_with_retry = await exc_task.kiq(retry_count)
40+
task_with_retry_result = await task_with_retry.wait_result(
41+
check_interval=check_interval,
42+
)
43+
assert task_with_retry_result.return_value == retry_count

0 commit comments

Comments
 (0)