python使用Flask,Redis和Celery的异步任务

随着Web应用程序的发展和使用的增加,用例也变得多样化。

由Kaizong Ye,Weilong Zhang撰写

我们现在正在建设和使用网站来执行比以往任何时候都更复杂的任务。其中一些任务可以进行处理,并将反馈立即转发给用户,而其他任务则需要稍后进行进一步处理和结果转发。

越来越多地采用Internet访问和支持Internet的设备导致最终用户流量增加。

×

参考资料:
Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

Celery简介
除Celery是一个异步任务的调度工具。 Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。
Broker
在 Python 中定义 Celery 的时候,我们要引入 Broker(消息中间件),中文翻译过来就是“中间人”的意思,在这里 Broker 起到一个中间人的角色。在工头提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农等着取出一个个任务准备着手做。
Backend
这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 有点像我们的 Broker,也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。
Celery应用场景
1.你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。
2.你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福

Celery的特点
1.简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
2.高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
3.快速:一个单进程的celery每分钟可处理上百万个任务
3.灵活: 几乎celery的各个组件都可以被扩展及自定制


在本文中,我们将探讨Celery在Flask应用程序中安排后台任务的使用,以减轻资源密集型任务的负担并确定对最终用户的响应的优先级。

什么是任务队列?

任务队列是一种分配小的工作单元或任务的机制,可以在不干扰大多数基于Web的应用程序的请求-响应周期的情况下执行这些任务。


课程

R语言数据分析挖掘必知必会

从数据获取和清理开始,有目的的进行探索性分析与可视化。让数据从生涩的资料,摇身成为有温度的故事。

立即参加

任务队列有助于委派工作,否则将在等待响应时降低应用程序的速度。它们还可以用于在主机或进程与用户交互时处理资源密集型任务。

示范 

我们将构建一个Flask应用程序,该应用程序允许用户设置提醒,该提醒将在设定的时间传递到他们的电子邮件中。

我们还将提供自定义消息或提醒被调用并将消息发送给用户之前的时间的功能。

设定

与其他项目一样,我们的工作将在虚拟环境中进行 :

$ pipenv install --three
$ pipenv shell

对于此项目,我们将需要安装Flask和Celery软件包以开始:



$ pipenv install flask celery

我们的Flask应用程序文件结构如下所示:

.
├── Pipfile                    #管理我们的环境
├── Pipfile.lock
├── README.md
├── __init__.py
├── app.py                     # Flask应用程序的主要实现
├── config.py                  # 托管配置
├── requirements.txt           # 储存我们的要求
└── templates
    └── index.html             # 登陆页面
 
 
 
1 directory, 8 files

让我们从创建Flask应用程序开始,该应用程序将呈现一个表单,该表单允许用户输入将来发送的消息的详细信息。

我们将以下内容添加到我们的app.py文件中:

from flask import Flask, flash, render_template, request, redirect, url_for
 
app = Flask(__name__)
app.config.from_object("config")
app.secret_key = app.config['SECRET_KEY']
 
@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')
 
    elif request.method == 'POST':
        email = request.form['email']
        first_name = request.form['first_name']
        last_name = request.form['last_name']
        message = request.form['message']
        duration = request.form['duration']
        duration_unit = request.form['duration_unit']
 
        flash(“Message scheduled”)
        return redirect(url_for('index'))
 
 
if __name__ == '__main__':
    app.run(debug=True)

这是一个非常简单的应用程序,只需一条路由即可处理GETPOST请求表单。提交详细信息后,我们可以将数据交给计划工作的功能。

为了整理主应用程序文件,我们将配置变量放在单独的config.py文件中,然后从文件中加载配置:

app.config.from_object("config")

我们的config.py文件将与该app.py文件位于同一文件夹中,并包含一些基本配置:

SECRET_KEY = 'very_very_secure_and_secret'
# 更多配置
 

现在,让我们将目标网页实现为index.html

{% for message in get_flashed_messages() %}
  <p style="color: red;">{{ message }}</p>
{% endfor %}
 
<form method="POST">
    First Name: <input id="first_name" name="first_name" type="text">
    Last Name: <input id="last_name" name="last_name" type="text">
    Email: <input id="email" name="email" type="email">
    Message: <textarea id="textarea" name="message"></textarea>
    Duration: <input id="duration" name="duration" placeholder="Enter duration as a number. for example: 3" type="text">
 
   <select name="duration_unit">
      <option value="" disabled selected>Choose the duration</option>
      <option value="1">Minutes</option>
      <option value="2">Hours</option>
      <option value="3">Days</option>
   </select>
 
   <button type="submit" name="action">Submit </button>
</form>

 现在,我们可以启动我们的应用程序:

1_landing_page

使用邮件发送电子邮件

为了从Flask应用程序发送电子邮件,我们将使用Flask-Mail库,该库如下所示添加到我们的项目中:

$ pipenv install flask-mail

有了Flask应用程序和表单,我们现在可以将Flask-Mail集成到我们的app.py

 
from flask_mail import Mail, Message
 
app = Flask(__name__)
app.config.from_object("config")
app.secret_key = app.config['SECRET_KEY']
 
# 设置Flask-Mail集成
 
mail = Mail(app)
 
def send_mail(data):
    """ Function to send emails.
    """
    with app.app_context():
        msg = Message("Ping!",
                    sender="admin.ping",
                    recipients=[data['email']])
        msg.body = data['message']
        mail.send(msg)

功能 send_main(data)将接收要发送的消息和电子邮件的收件人,然后在经过指定的时间后将其调用以将电子邮件发送给用户。config.py为了使Flask-Mail正常运行,我们还需要向我们添加以下变量:

 
# Flask-邮件
MAIL_SERVER = 'smtp.googlemail.com'
MAIL_PORT = 587
MAIL_USE_TLS = True
MAIL_USERNAME = 'mail-username'
MAIL_PASSWORD = 'mail-password'

 整合

在我们的Flask应用程序准备就绪并配备了电子邮件发送功能之后,我们现在可以集成Celery,以便计划在以后发送电子邮件。我们app.py将再次被修改:

 
# 现有导入保持不变
 
 
from celery import Celery
 
# Flask应用程序和flask-mail配置被截断
 
 
 
# 设置客户端
 
 
client = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
client.conf.update(app.config)
 
# 将此装饰器添加到我们的send_mail函数中
 
 
@client.task
def send_mail(data):
    #函数保持不变
 
 
 
@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')
 
    elif request.method == 'POST':
        data = {}
        data['email'] = request.form['email']
        data['first_name'] = request.form['first_name']
        data['last_name'] = request.form['last_name']
        data['message'] = request.form['message']
        duration = int(request.form['duration'])
        duration_unit = request.form['duration_unit']
 
        if duration_unit == 'minutes':
            duration *= 60
        elif duration_unit == 'hours':
            duration *= 3600
        elif duration_unit == 'days':
            duration *= 86400
 
        send_mail.apply_async(args=[data], countdown=duration)
        flash(f"Email will be sent to {data['email']} in {request.form['duration']} {duration_unit}")
 
        return redirect(url_for('index'))

celery通过附加消息传递代理的URL,我们导入并使用它在Flask应用程序中初始化Celery客户端。在我们的例子中,我们将使用Redis作为代理,因此我们将以下内容添加到我们的config.py

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

为了使我们的send_mail()功能作为后台任务执行,我们将添加@client.task装饰器,以便我们的Celery客户端会意识到这一点。

设置Celery客户端后,将修改还处理表单输入的主要功能。

首先,我们将send_mail()函数的输入数据打包在字典中。然后,我们使用函数通过Celery Task Calling API调用邮件功能,该函数apply_async接受函数所需的参数。

设置了一个可选countdown参数,定义了运行代码和执行任务之间的延迟。

汇集 

为了运行我们的项目,我们将需要两个终端,一个终端启动我们的Flask应用程序,另一个终端启动Celery worker,后者将在后台发送消息。

在第一个终端中启动Flask应用程序:

$ python app.py

在第二个终端中,启动虚拟环境,然后启动Celery worker:

 
# 启动virtualenv
 
$ pipenv shell
$ celery worker -A app.client --loglevel=info

如果一切顺利,我们将在运行Celery客户端的终端中获得以下反馈:

现在让我们导航到 http://localhost:5000并填写详细信息,以计划在提交2分钟后到达的电子邮件。

在表格上方,将显示一条消息,指示将接收电子邮件的地址以及发送电子邮件的持续时间。在我们的Celery终端中,我们还将能够看到一个日志条目,表明我们的电子邮件已被调度:

[2019-10-23 16:27:25,399: INFO/MainProcess] Received task: app.send_mail[d65025c8-a291-40d0-aea2-e816cb40cd78]  ETA:[2019-10-23 13:29:25.170622+00:00]

ETA条目的部分显示何时send_email()调用我们的函数,以及何时发送电子邮件。

 因此,让我们为后台任务实现一个监视解决方案,以便我们可以查看任务,并注意出现问题以及未按计划执行任务的情况。

 监控我们的群集

 安装Flower非常简单:

$ pipenv install flower

之前,我们在app.py文件中指定了Celery客户的详细信息。我们需要将该客户传递给Flower以对其进行监控。

为此,我们需要打开第三个终端窗口,进入虚拟环境,然后启动监视工具:

$ pipenv shell
$ flower -A app.client --port=5555

启动Flower时,我们通过将其传递给application(-A)参数来指定Celery客户端,并通过该参数来指定要使用的端口--port

有了我们的监控功能后,让我们安排在仪表板上发送另一封电子邮件,然后导航到http://localhost:5555,在以下位置我们会对此表示欢迎:

在此页面上,我们可以看到Celery集群中的工作人员列表,该列表当前仅由我们的机器组成。

要查看我们刚刚计划的电子邮件,请单击仪表板左上方的“ 任务”按钮,这将带我们到可以查看已计划的任务的页面:

在本部分中,我们可以看到我们已计划了两封电子邮件,并且已在计划的时间成功发送了一封电子邮件。出于测试目的,计划分别在1分钟和5分钟后发送电子邮件。

结论

我们已经成功建立了Celery集群并将其集成到我们的Flask应用程序中,该应用程序允许用户计划在将来的某个时间后发送电子邮件。


可下载资源

关于作者

Kaizong Ye拓端研究室(TRL)的研究员。在此对他对本文所作的贡献表示诚挚感谢,他在上海财经大学完成了统计学专业的硕士学位,专注人工智能领域。擅长Python.Matlab仿真、视觉处理、神经网络、数据分析。

本文借鉴了作者最近为《R语言数据分析挖掘必知必会 》课堂做的准备。

​非常感谢您阅读本文,如需帮助请联系我们!

 
QQ在线咨询
售前咨询热线
15121130882
售后咨询热线
0571-63341498

关注有关新文章的微信公众号


永远不要错过任何见解。当新文章发表时,我们会通过微信公众号向您推送。

技术干货

最新洞察

This will close in 0 seconds