Để giúp bạn bắt đầu, chúng tôi đã chọn một vài ví dụ Apscheduler, dựa trên những cách phổ biến nó được sử dụng trong các dự án công cộng.
Agronholm / Apscheduler / tests / test_triggers.pyview trên github
View on Github
def test_cron_trigger_1[self, timezone]:
trigger = CronTrigger[year='2009/2', month='1/3', day='5-13', timezone=timezone]
assert repr[trigger] == [""]
assert str[trigger] == "cron[year='2009/2', month='1/3', day='5-13']"
start_date = timezone.localize[datetime[2008, 12, 1]]
correct_next_date = timezone.localize[datetime[2009, 1, 5]]
assert trigger.get_next_fire_time[None, start_date] == correct_next_date
Agronholm / Apscheduler / Tests / TestTriggers.PyView trên GitHub View on Github
def test_cron_year_list[self]:
trigger = CronTrigger[self.defaults, year='2009,2008']
eq_[repr[trigger], ""]
eq_[str[trigger], "cron[year='2009,2008']"]
start_date = datetime[2009, 1, 1, tzinfo=local_tz]
correct_next_date = datetime[2009, 1, 1, tzinfo=local_tz]
eq_[trigger.get_next_fire_time[start_date], correct_next_date]
AfourMy / ENMS / ENMS / Model / Lập lịch.PyView trên GitHub View on Github
if self.scheduling_mode == "cron":
self.periodic = True
expression = self.crontab_expression.split[]
mapping = {
"0": "sun",
"1": "mon",
"2": "tue",
"3": "wed",
"4": "thu",
"5": "fri",
"6": "sat",
"7": "sun",
"*": "*",
}
expression[-1] = ",".join[mapping[day] for day in expression[-1].split[","]]
trigger = {"trigger": CronTrigger.from_crontab[" ".join[expression]]}
elif self.frequency:
self.periodic = True
frequency_in_seconds = [
int[self.frequency]
* {"seconds": 1, "minutes": 60, "hours": 3600, "days": 86400}[
self.frequency_unit
]
]
trigger = {
"trigger": "interval",
"start_date": self.aps_date["start_date"],
"end_date": self.aps_date["end_date"],
"seconds": frequency_in_seconds,
}
else:
self.periodic = False
enms-tự động View on Github
def schedule_task[self, task]:
if task["scheduling_mode"] == "cron":
crontab = task["crontab_expression"].split[]
crontab[-1] = ",".join[self.days[day] for day in crontab[-1].split[","]]
trigger = {"trigger": CronTrigger.from_crontab[" ".join[crontab]]}
elif task["frequency"]:
trigger = {
"trigger": "interval",
"start_date": self.aps_date[task["start_date"]],
"end_date": self.aps_date[task["end_date"]],
"seconds": int[task["frequency"]]
* self.seconds[task["frequency_unit"]],
}
else:
trigger = {"trigger": "date", "run_date": self.aps_date[task["start_date"]]}
if not self.scheduler.get_job[task["id"]]:
job = self.scheduler.add_job[
id=str[task["id"]],
replace_existing=True,
func=self.run_service,
args=[task["id"]],
Sispheor / piclodio3 / back / utils / scoruler_manager.pyview trên github View on Github
def add_new_job[self, job_id, day_of_week_string, hour, minute, url, auto_stop_minutes] -> bool:
print["add a new job with id {}, {}, {}, {}, {}, {}".
format[job_id, day_of_week_string, hour, minute, url, auto_stop_minutes]]
my_cron = CronTrigger[hour=hour,
minute=minute,
day_of_week=day_of_week_string]
self.scheduler.add_job[func=self.play_web_radio,
trigger=my_cron,
id=str[job_id],
args=[url, auto_stop_minutes]]
return True
giả mạo / đọc có thể đọc được View on Github
continue
if job_str not in activeScheduledTasks.target_jobs:
print["Removing job: %s -> %s" % [job_str, job_id]]
self.scheduler_manager.remove_job[job_id]
else:
job_params = activeScheduledTasks.target_jobs[job_str]
if job_params.get['interval']:
trig = apscheduler.triggers.interval.IntervalTrigger[
seconds = job_params.get['interval'],
start_date = start_date,
]
start_date = start_date + datetime.timedelta[minutes=5]
else:
trig = apscheduler.triggers.cron.CronTrigger[
month = job_params.get['month', None],
day = job_params.get['day', None],
day_of_week = job_params.get['day_of_week', None],
hour = job_params.get['hour', None],
minute = job_params.get['minute', None],
]
if job.name != job_params['name']:
self.scheduler_manager.remove_job[job_id]
# So the apscheduler CronTrigger class doesn't provide the equality
# operator, so we compare the stringified version. Gah.
elif str[job.trigger] != str[trig]:
print["Removing trigger:", str[job.trigger], str[trig]]
self.scheduler_manager.remove_job[job_id]
Elemental-lf / benji / src / benji / k8s_operator / crd / operator_config.pyview trên github View on Github
def install_maintenance_jobs[*, parent_body: Dict[str, Any], logger] -> None:
reconciliation_schedule: Optional[str] = benji.k8s_operator.operator_config['spec']['reconciliationSchedule']
benji.k8s_operator.scheduler.add_job[lambda: reconciliate_versions_job[logger=logger],
CronTrigger[].from_crontab[reconciliation_schedule],
name=SCHED_VERSION_RECONCILIATION_JOB,
id=SCHED_VERSION_RECONCILIATION_JOB]
cleanup_schedule: Optional[str] = benji.k8s_operator.operator_config['spec'].get['cleanupSchedule', None]
if cleanup_schedule is not None and cleanup_schedule:
benji.k8s_operator.scheduler.add_job[lambda: cleanup_job[parent_body=parent_body, logger=logger],
CronTrigger[].from_crontab[cleanup_schedule],
name=SCHED_CLEANUP_JOB,
id=SCHED_CLEANUP_JOB]
# FIXME: Add extra schedule config for this job
benji.k8s_operator.scheduler.add_job[lambda: version_status_job[],
CronTrigger[].from_crontab[cleanup_schedule],
name=SCHED_VERSION_STATUS_JOB,
id=SCHED_VERSION_STATUS_JOB]