from apscheduler.scheduler import Scheduler
schedudler = Scheduler(daemonic = False)
@schedudler.cron_schedule(second='*', day_of_week='0-4', hour='9-12,13-15')
def quote_send_sh_job():
print 'a simple cron job start at', datetime.datetime.now()
schedudler.start()
對於定時任務我想大家都不陌生,簡單的講就是預先定義好在約定的時間執行約定的任務。這在程序開發中是一個很常用的功能在java 中可以使用Quartz,在.Net中也有對應的實現,而在python中對應的實現就是APScheduler。
利用APScheduler,可以實現給定的日期時間,固定間隔時間,以及crontab類型的任務。這裡解釋一下cron,cron就是在Linux下使用的定時任務器,可以通過定義特定格式的表達式來對觸發時間進行描述。在APScheduler中可以添加對應cron表達式的任務。
APScheduler可以選擇任務的存儲方式,如以內存存儲方式的非持久化存儲,或者以mongodb,redis,shelves,qlalchemy等的持久化方式存儲。當然你也可以寫自己的持久化存儲方式只要繼承JobStore類實現相關的方法就可以了。還是建議采用持久化的方式來存儲,來避免出現當機等突發現象引起的不必要的損失。
APScheduler使用起來非常的簡單。一種是通過裝飾器的方式來調用。
一種是直接在類中調用。
def alarm(time):
print('Alarm! This alarm was scheduled at %s.' % time)
__MYSQL_url = 'mysql://root:123456@localhost/w'
uler(standalone=True)
if __name__ == '__main__':
scheduler = Sche
dscheduler.add_jobstore(SQLAlchemyJobStore(url=__MYSQL_url), 'default')
d_date_job(alarm, alarm_time, name='alarm1', args=[date
alarm_time = datetime.now() + timedelta(seconds=10)
scheduler.a
dtime.now()])
print 'alarms added: ', alarm_time
alarm_time = datetime.now() + timedelta(seconds=15)
, alarm_time
alarm_time = datetime.now() + timedelta(seconds=20)
scheduler
scheduler.add_date_job(alarm, alarm_time, name='alarm2', args=[datetime.now()])
print 'alarms added:
'.add_date_job(alarm, alarm_time, name='alarm3', args=[datetime.now()])
print 'alarms added: ', alarm_time
scheduler.add_cron_job(alarm, day_of_week='mon-fri', hour=5, minute=30)
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
注意代碼一種將daemonic = False,daemonic屬性是用來設置是否以後台線程的方式運行,默認值為true就是他會隨主程序的結束一起結束,若設成false則只能手動結束(實質是調用python內部類threading中的setDaemon方法)。APScheduler是以線程池的多線程的方式並發執行任務的(參見threadpool.py)。
代碼二演示了一個利用SQLAlchemy來實現持久化的方式,APScheduler會將定義好的將要執行的任務放入數據庫,在任務執行完成後刪除對應數據。記住在定義的多次任務中,數據庫中存的只是下一個時間要執行的單條任務信息,而不是所有將會執行的任務信息。
再講一下misfire_grace_time屬性的作用。 因為某種特定的原因導致定時任務服務掛掉,在重啟後如果任務的時間和實際的時間的差值小於定義的misfire_grace_time,任務還會執行,這就為任務提供了一定容錯機制。
另外APScheduler還可以監聽各種運行中的事件,來調用自己定義的方法如
def my_listener(event):
if event.exception:
print 'The job crashed :('
else:
print 'The job worked :)'
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)