from datetime import datetime, timezone
from croniter import croniter
from jivago.lang.annotations import Override
from jivago.scheduling.schedule import Schedule
[docs]
class CronSchedule(Schedule):
def __init__(self, cron_string: str, start_time: datetime):
if start_time:
self.iterator = croniter(cron_string, start_time=start_time.astimezone(timezone.utc))
else:
self.iterator = croniter(cron_string)
[docs]
@Override
def next_start_time(self) -> datetime:
return self.iterator.get_next(ret_type=datetime)