Python实现定时任务的方案

2023-08-21 21:29:12

在日常工作中,我们常常会用到需要周期性执行的任务,一种方式是采用 Linux 系统自带的 crond[1] 结合命令行实现。另外一种方式是直接使用 Python。接下里整理的是常见的 Python 定时任务的实现方式。
利用 while True: + sleep() 实现定时任务

位于 time 模块中的 sleep(secs) 函数,可以实现令当前执行的线程暂停 secs 秒后再继续执行。所谓暂停,即令当前线程进入阻塞状态,当达到 sleep() 函数规定的时间后,再由阻塞状态转为就绪状态,等待 CPU 调度。

基于这样的特性我们可以通过 while 死循环+sleep() 的方式实现简单的定时任务。

代码示例:
import datetime

import time

def time_printer():

    now = datetime.datetime.now()

    ts = now.strftime('%Y-%m-%d %H:%M:%S')

    print('do func time :', ts)

def loop_monitor():

    while True:

        time_printer()

        time.sleep(5)  # 暂停 5 秒

if __name__ == "__main__":

    loop_monitor()
主要缺点:

只能设定间隔,不能指定具体的时间,比如每天早上 8:00

sleep 是一个阻塞函数,也就是说 sleep 这一段时间,程序什么也不能操作 。使用 Timeloop 库运行定时任务

Timeloop[2] 是一个库,可用于运行多周期任务。这是一个简单的库,它使用 decorator 模式在线程中运行标记函数。

示例代码:
import time

from timeloop import Timeloop

from datetime import timedelta

tl = Timeloop()

@tl.job(interval=timedelta(seconds=2))

def sample_job_every_2s():

    print "2s job current time : {}".format(time.ctime())

@tl.job(interval=timedelta(seconds=5))

def sample_job_every_5s():

    print "5s job current time : {}".format(time.ctime())

@tl.job(interval=timedelta(seconds=10))

def sample_job_every_10s():

    print "10s job current time : {}".format(time.ctime())

利用 threading.Timer 实现定时任务

threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点,timer 最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。

Timer(interval, function, args=[ ], kwargs={ })
interval: 指定的时间
function: 要执行的方法
args/kwargs: 方法的参数 代码示例:
代码示例:
import datetime
from threading import Timer
def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
    loop_monitor()
def loop_monitor():
    t = Timer(5, time_printer)
    t.start()
if __name__ == "__main__":
    loop_monitor()
备注:Timer 只能执行一次,这里需要循环调用,否则只能执行一次
利用内置模块 sched 实现定时任务
sched 模块实现了一个通用事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务执行后会立刻调用延时函数,以确保其他线程也能执行。
class sched.scheduler(timefunc, delayfunc) 这个类定义了调度事件的通用接口,它需要外部传入两个参数,timefunc 是一个没有参数的返回时间类型数字的函数(常用使用的如 time 模块里面的 time),delayfunc 应该是一个需要一个参数来调用、与 timefunc 的输出兼容、并且作用为延迟多个时间单位的函数(常用的如 time 模块的 sleep)。
代码示例:
import datetime
import time
import sched
def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
    loop_monitor()
def loop_monitor():
    s = sched.scheduler(time.time, time.sleep)  # 生成调度器
    s.enter(5, 1, time_printer, ())
    s.run()
if __name__ == "__main__":
    loop_monitor()
scheduler 对象主要方法:
enter(delay, priority, action, argument),安排一个事件来延迟 delay 个时间单位。
cancel(event):从队列中删除事件。如果事件不是当前队列中的事件,则该方法将跑出一个 ValueError。
run():运行所有预定的事件。这个函数将等待(使用传递给构造函数的 delayfunc() 函数),然后执行事件,直到不再有预定的事件。

个人点评:比 threading.Timer 更好,不需要循环调用。


利用调度模块 schedule 实现定时任务
schedule[3] 是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间。schedule[4] 允许用户使用简单、人性化的语法以预定的时间间隔定期运行 Python 函数(或其它可调用函数)。
先来看代码,是不是不看文档就能明白什么意思?
import schedule
import time
def job():
    print("I'm working...")
schedule.every(10).seconds.do(job)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
    schedule.run_pending()
    time.sleep(1)
装饰器:通过 @repeat() 装饰静态方法
import time
from schedule import every, repeat, run_pending
@repeat(every().second)
def job():
    print('working...')
while True:
    run_pending()
    time.sleep(1)
传递参数:
import schedule
def greet(name):
    print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
while True:
    schedule.run_pending()
装饰器同样能传递参数:
from schedule import every, repeat, run_pending
@repeat(every().second, 'World')
@repeat(every().minute, 'Mars')
def hello(planet):
    print('Hello', planet)
while True:
    run_pending()
取消任务:
import schedule
i = 0
def some_task():
    global i
    i += 1
    print(i)
    if i == 10:
        schedule.cancel_job(job)
        print('cancel job')
        exit(0)
job = schedule.every().second.do(some_task)
while True:
    schedule.run_pending()
运行一次任务:
import time
import schedule
def job_that_executes_once():
    print('Hello')
    return schedule.CancelJob
schedule.every().minute.at(':34').do(job_that_executes_once)
while True:
    schedule.run_pending()
    time.sleep(1)
根据标签检索任务:
# 检索所有任务:schedule.get_jobs()
import schedule
def greet(name):
    print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
friends = schedule.get_jobs('friend')
print(friends)
根据标签取消任务:
# 取消所有任务:schedule.clear()
import schedule
def greet(name):
    print('Hello {}'.format(name))
    if name == 'Cancel':
        schedule.clear('second-tasks')
        print('cancel second-tasks')
schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')
while True:
    schedule.run_pending()
运行任务到某时间:
import schedule
from datetime import datetime, timedelta, time
def job():
    print('working...')
schedule.every().second.until('23:59').do(job)  # 今天 23:59 停止
schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30 停止
schedule.every().second.until(timedelta(hours=8)).do(job)  # 8 小时后停止
schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天 23:59:59 停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30 停止
while True:
    schedule.run_pending()
马上运行所有任务(主要用于测试):
import schedule
def job():
    print('工作中...')
def job1():
    print('哈喽...')
schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)
schedule.run_all()
schedule.run_all(delay_seconds=3)  # 任务间延迟 3 秒
并行运行:使用 Python 内置队列实现:
import threading
import time
import schedule
def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
    schedule.run_pending()

    time.sleep(1)