Source code for jivago.scheduling.cron_schedule

from datetime import datetime, timezone

from croniter import croniter

from jivago.lang.annotations import Override
from jivago.scheduling.schedule import Schedule


[docs] class CronSchedule(Schedule): def __init__(self, cron_string: str, start_time: datetime): if start_time: self.iterator = croniter(cron_string, start_time=start_time.astimezone(timezone.utc)) else: self.iterator = croniter(cron_string)
[docs] @Override def next_start_time(self) -> datetime: return self.iterator.get_next(ret_type=datetime)