当前位置: 代码迷 >> python >> 你如何在 Flask 中安排定时事件?
  详细解决方案

你如何在 Flask 中安排定时事件?

热度:126   发布时间:2023-06-13 16:55:05.0

这是一些代码:

from flask import Flask, request
import time, threading

class MyServer(Flask):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.reset()

    def reset(self):
        self.string = "hello"

application = MyServer(__name__)

@application.route("/set")
def set():
    application.string = request.args["string"]
    threading.Timer(10, application.reset()).start()
    return request.args["string"] + " stored for 10 seconds"

@application.route("/get")
def get():
    return application.string

if __name__ == "__main__":
    application.debug = True
    application.run()

我的期望/目标是,如果您访问端点/set?string=foo那么在接下来的 10 秒内,应用程序将在您每次访问/get时返回“foo”,从那时起它将返回“hello”,直到您点击/set端点。

相反,如果我点击/set?string=foo然后立即点击 '/get',应用程序将返回“hello”并且我在控制台中看到“TypeError: 'NoneType' object is not callable”。 任何人都可以帮忙吗?

以下是不正确的:

threading.Timer(10, application.reset()).start()

Timer 的第二个参数需要是一个函数,但您实际上是在调用 reset() 函数,从而将该方法调用的结果传递给 Timer。 就好像你做了以下...

result = application.reset()
threading.Timer(10, result).start()

你可能想做的是以下...

threading.Timer(10, application.reset).start()

话虽如此,我会非常犹豫将此解决方案用于玩具项目以外的任何事情:根据您的 Flask 应用程序的部署方式,您实际上可能同时运行多个 Flask 进程。 在这种情况下,此解决方案只会更改您当前正在访问的进程。 此外,根据您的负载,在每个请求中生成一个新线程可能会导致相当多的开销。

更好的方法可能是将此数据保存到用户的会话(cookie)或数据库中。 您可以使用 Celery 之类的系统或其他消息队列来运行异步代码。

如果您需要数据在一定时间后“过期”,还可以考虑设置数据应该过期的日期,然后引入检查过期的代码。 因此,您的“set”方法可以同步设置过期时间,并且“get”端点可以检查过期时间是否已过并根据该时间选择要返回的值。 现在您不需要进行异步调用。