33import sys
44from datetime import datetime , timedelta
55from logging import basicConfig , getLevelName , getLogger
6- from typing import Dict , List , Optional
6+ from typing import Any , Dict , List , Optional , Set , Tuple
77
88import pytz
99from pycron import is_now
@@ -55,7 +55,7 @@ async def get_schedules(source: ScheduleSource) -> List[ScheduledTask]:
5555
5656async def get_all_schedules (
5757 scheduler : TaskiqScheduler ,
58- ) -> Dict [ ScheduleSource , List [ScheduledTask ]]:
58+ ) -> List [ Tuple [ ScheduleSource , List [ScheduledTask ] ]]:
5959 """
6060 Task to update all schedules.
6161
@@ -71,7 +71,7 @@ async def get_all_schedules(
7171 schedules = await asyncio .gather (
7272 * [get_schedules (source ) for source in scheduler .sources ],
7373 )
74- return dict (zip (scheduler .sources , schedules ))
74+ return list (zip (scheduler .sources , schedules ))
7575
7676
7777def get_task_delay (task : ScheduledTask ) -> Optional [int ]:
@@ -98,12 +98,10 @@ def get_task_delay(task: ScheduledTask) -> Optional[int]:
9898 task_time = to_tz_aware (task .time )
9999 if task_time <= now :
100100 return 0
101- one_min_ahead = (now + timedelta (minutes = 1 )).replace (second = 1 , microsecond = 0 )
102- if task_time <= one_min_ahead :
103- delay = task_time - now
104- if delay .microseconds :
105- return int (delay .total_seconds ()) + 1
106- return int (delay .total_seconds ())
101+ delay = task_time - now
102+ if delay .microseconds :
103+ return int (delay .total_seconds ()) + 1
104+ return int (delay .total_seconds ())
107105 return None
108106
109107
@@ -145,21 +143,41 @@ async def delayed_send(
145143 await scheduler .on_ready (source , task )
146144
147145
148- async def run_scheduler_loop (scheduler : TaskiqScheduler ) -> None :
146+ async def run_scheduler_loop ( # noqa: C901
147+ scheduler : TaskiqScheduler ,
148+ interval : Optional [timedelta ] = None ,
149+ ) -> None :
149150 """
150151 Runs scheduler loop.
151152
152153 This function imports taskiq scheduler
153154 and runs tasks when needed.
154155
155156 :param scheduler: current scheduler.
157+ :param interval: interval to check for schedule updates.
156158 """
157159 loop = asyncio .get_event_loop ()
158- running_schedules = set ()
160+ running_schedules : Dict [str , asyncio .Task [Any ]] = {}
161+ ran_cron_jobs : Set [str ] = set ()
162+ current_minute = datetime .now (tz = pytz .UTC ).minute
159163 while True :
160- # We use this method to correctly sleep for one minute.
164+ now = datetime .now (tz = pytz .UTC )
165+ # If minute changed, we need to clear
166+ # ran_cron_jobs set and update current minute.
167+ if now .minute != current_minute :
168+ current_minute = now .minute
169+ ran_cron_jobs .clear ()
170+ # If interval is not None, we need to
171+ # calculate next run time using it.
172+ if interval is not None :
173+ next_run = now + interval
174+ # otherwise we need assume that
175+ # we will run it at the start of the next minute.
176+ # as crontab does.
177+ else :
178+ next_run = (now + timedelta (minutes = 1 )).replace (second = 1 , microsecond = 0 )
161179 scheduled_tasks = await get_all_schedules (scheduler )
162- for source , task_list in scheduled_tasks . items () :
180+ for source , task_list in scheduled_tasks :
163181 logger .debug ("Got %d schedules from source %s." , len (task_list ), source )
164182 for task in task_list :
165183 try :
@@ -172,16 +190,37 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None:
172190 task .schedule_id ,
173191 )
174192 continue
175- if task_delay is not None :
176- send_task = loop .create_task (
177- delayed_send (scheduler , source , task , task_delay ),
178- )
179- running_schedules .add (send_task )
180- send_task .add_done_callback (running_schedules .discard )
181- next_minute = datetime .now ().replace (second = 0 , microsecond = 0 ) + timedelta (
182- minutes = 1 ,
183- )
184- delay = next_minute - datetime .now ()
193+ # If task delay is None, we don't need to run it.
194+ if task_delay is None :
195+ continue
196+ # If task is delayed for more than next_run,
197+ # we don't need to run it, because we will
198+ # run it in the next iteration.
199+ if now + timedelta (seconds = task_delay ) >= next_run :
200+ continue
201+ # If task is already running, we don't need to run it again.
202+ if task .schedule_id in running_schedules and task_delay < 1 :
203+ continue
204+ # If task is cron job, we need to check if
205+ # we already ran it this minute.
206+ if task .cron is not None :
207+ if task .schedule_id in ran_cron_jobs :
208+ continue
209+ ran_cron_jobs .add (task .schedule_id )
210+ send_task = loop .create_task (
211+ delayed_send (scheduler , source , task , task_delay ),
212+ # We need to set the name of the task
213+ # to be able to discard its reference
214+ # after it is done.
215+ name = f"schedule_{ task .schedule_id } " ,
216+ )
217+ running_schedules [task .schedule_id ] = send_task
218+ send_task .add_done_callback (
219+ lambda task_future : running_schedules .pop (
220+ task_future .get_name ().removeprefix ("schedule_" ),
221+ ),
222+ )
223+ delay = next_run - datetime .now (tz = pytz .UTC )
185224 logger .debug (
186225 "Sleeping for %.2f seconds before getting schedules." ,
187226 delay .total_seconds (),
@@ -226,6 +265,10 @@ async def run_scheduler(args: SchedulerArgs) -> None:
226265 for source in scheduler .sources :
227266 await source .startup ()
228267
268+ interval = None
269+ if args .update_interval :
270+ interval = timedelta (seconds = args .update_interval )
271+
229272 logger .info ("Starting scheduler." )
230273 await scheduler .startup ()
231274 logger .info ("Startup completed." )
@@ -239,7 +282,7 @@ async def run_scheduler(args: SchedulerArgs) -> None:
239282 await asyncio .sleep (delay .total_seconds ())
240283 logger .info ("First run skipped. The scheduler is now running." )
241284 try :
242- await run_scheduler_loop (scheduler )
285+ await run_scheduler_loop (scheduler , interval )
243286 except asyncio .CancelledError :
244287 logger .warning ("Shutting down scheduler." )
245288 await scheduler .shutdown ()
0 commit comments