Python装饰器如何增加任务超时退出功能

描述

任务超时退出

我们日常在使用的各种网络请求库时都带有timeout参数,例如request库。这个参数可以使请求超时就不再继续了,直接抛出超时错误,避免等太久。

如果我们自己开发的方法也希望增加这个功能,该如何做呢?

方法很多,但最简单直接的是使用并发库futures,为了使用方便,我将其封装成了一个装饰器,代码如下:

import functools
from concurrent import futures

executor = futures.ThreadPoolExecutor(1)

def timeout(seconds):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kw):
            future = executor.submit(func, *args, **kw)
            return future.result(timeout=seconds)
        return wrapper
    return decorator

定义了以上函数,我们就有了一个超时结束的装饰器,下面可以测试一下:

import time

@timeout(1)
def task(a, b):
    time.sleep(1.2)
    return a+b

task(2, 3)

结果:

---------------------------------------------------------------------------
TimeoutError                              Traceback (most recent call last)
...
D:Anaconda3libconcurrentfutures_base.py in result(self, timeout)
    432                 return self.__get_result()
    433             else:
-- > 434                 raise TimeoutError()
    435 
    436     def exception(self, timeout=None):

TimeoutError:

上面我们通过装饰器定义了函数的超时时间为1秒,通过睡眠模拟函数执行超过1秒时,成功的抛出了超时异常。

程序能够在超时时间内完成时:

@timeout(1)
def task(a, b):
    time.sleep(0.9)
    return a+b

task(2, 3)

结果:

5

可以看到,顺利的得到了结果。

这样我们就可以通过一个装饰器给任何函数增加超时时间,这个函数在规定时间内还处理不完就可以直接结束任务。

前面我将这个装饰器将所需的变量定义到了外部,其实我们还可以通过类装饰器进一步封装,代码如下:

import functools
from concurrent import futures

class timeout:
    __executor = futures.ThreadPoolExecutor(1)

    def __init__(self, seconds):
        self.seconds = seconds

    def __call__(self, func):
        @functools.wraps(func)
        def wrapper(*args, **kw):
            future = timeout.__executor.submit(func, *args, **kw)
            return future.result(timeout=self.seconds)
        return wrapper

经测试使用类装饰器能得到同样的效果。

注意:使用@functools.wraps的目的是因为被装饰的func函数元信息会被替换为wrapper函数的元信息,而@functools.wraps(func)将wrapper函数的元信息替换为func函数的元信息。最终虽然返回的是wrapper函数,元信息却依然是原有的func函数。

在函数式编程中,函数的返回值是函数对象被称为闭包。

日志记录

如果我们需要记录部分函数的执行时间,函数执行前后打印一些日志,装饰器是一种很方便的选择。

代码如下:

import time
import functools

def log(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        res = func(*args, **kwargs)
        end = time.perf_counter()
        print(f'函数 {func.__name__} 耗时 {(end - start) * 1000} ms')
        return res
    return wrapper

装饰器 log 记录某个函数的运行时间,并返回其执行结果。

测试一下:

@log
def now():
    print('2021-7-1')

now()

结果:

2021-7-1
函数 now 耗时 0.09933599994838005 ms

缓存

如果经常调用一个函数,而且参数经常会产生重复,如果把结果缓存起来,下次调用同样参数时就会节省处理时间。

定义函数:

import math
import random
import time


def task(x):
    time.sleep(0.01)
    return round(math.log(x**3 / 15), 4)

执行:

%%time
for i in range(500):
    task(random.randrange(5, 10))

结果:

Wall time: 5.01 s

此时如果我们使用缓存的效果就会大不一样,实现缓存的装饰器有很多,我就不重复造轮子了,这里使用functools包下的LRU缓存:

from functools import lru_cache

@lru_cache()
def task(x):
    time.sleep(0.01)
    return round(math.log(x**3 / 15), 4)

执行:

%%time
for i in range(500):
    task(random.randrange(5, 10))

结果:

Wall time: 50 ms

约束某个函数的可执行次数

如果我们希望程序中的某个函数在整个程序的生命周期中只执行一次或N次,可以写一个这样的装饰器:

import functools


class allow_count:
    def __init__(self, count):
        self.count = count
        self.i = 0

    def __call__(self, func):
        @functools.wraps(func)
        def wrapper(*args, **kw):
            if self.i >= self.count:
                return
            self.i += 1
            return func(*args, **kw)
        return wrapper

测试:

@allow_count(3)
def job(x):
    x += 1
    return x


for i in range(5):
    print(job(i))

结果:

1
2
3
None
None
打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分