Flask 中的 Long Polling

Long Polling 并不是什么稀奇的东西,其实就是一个普通的 HTTP 请求,只不过在没数据时我们直接把他阻塞掉,直到有数据直接返回完成本次请求。

那么说白了代码其实就像是下面这样:

from flask import Flask, jsonify
from time import sleep

app = Flask(__name__)
app.debug = True


@app.route('/long-polling')
def long_polling():
    i = 0
    while True:
        if i == 10:
            return jsonify({'error': 0})
        else:
            i += 1
            sleep(5)


if __name__ == '__main__':
    app.run()

long_polling中用sleep(5)来阻塞,模拟一些耗时的操作,也就是在i == 10时返回一个 JSON 串。

我们把他跑起来试试:

curl http://127.0.0.1:5000/long-polling
{
  "error": 0
}

漫长的等待后我们发现终于返回了数据。

更远的问题

但是这么一来有问题啊,如果有两个人同时访问的话,第一个人阻塞的时候第二个人完全就是需要等待额外的时间,知道第一个人处理完数据后这个进程才能继续接受请求,这特么该怎么办呢?

既然一个请求不够的话那我们多跑几个进程,绑定到不同的端口上,然后用 Nginx 做个反代均衡负载?等等,那要是同时有好多人过来那我们不是需要成堆的进程?

为了解决这个问题,我们要引入一个新模型——协程。

协程是用户级别的线程,这里需要补下操作系统的知识。CPU 在处理指令时都是一条一条执行的(其实现代的 CPU 中还有更好的处理方式),那么为什么我们看起来是很多任务并行的呢?可以简单的认为是 CPU 先跑一段这个线程里的指令,然后中断切换到另外一个线程里去跑一段(其实切换的时候还有更多的要讲),因为切换的速度非常快,所以我们感觉起来就像是多进程并行了。

协程要怎么理解呢?协程就是把切换的过程交给用户自己来处理,用户自己处理几个上下文。具体要怎么做呢?Python 中有一个非常强大的语句叫做yield

yield其实是返回了一个生成器,简单的理解起来就是执行到yield时抛出一个值,下一次在此执行时从上次抛出值的yield的下一句开始执行,直到再一次遇到yield或者结束循环。

使用yield来做协程操作的话简单来讲就是一个生产者-消费者模型,生产者yield出一个产品,消费者解决掉然后再次调用生产者产出产品,这里就有了一个非常基本的协程,生产者与消费者协作完成整个任务。

好了我们简单了解了协程是什么东西后回到 Long Polling 的内容。刚才我们实现了一个非常简单的 Long Polling,也遇到了一个高并发的问题,现在我们尝试用gevent来解决这个问题。

使用 gevent

gevent是一个基于协程的 Python 网络库,接下来我们就要来应用它了(当然pip install gevent是绝对必要的)。

我们可以简单的使用 monkey patch 来应用到我们的 Flask APP 中。你的代码看起来就会像下面这样:

from gevent.monkey import patch_all; patch_all()
from gevent.pywsgi import WSGIServer
from flask import Flask, jsonify
from time import sleep

app = Flask(__name__)
app.debug = True


@app.route('/long-polling')
def long_polling():
    i = 0
    while True:
        if i == 10:
            return jsonify({'error': 0})
        else:
            i += 1
            sleep(5)


if __name__ == '__main__':
    server = WSGIServer(('127.0.0.1', 5000), app)
    server.serve_forever()

我们主要增加的是第一行patch_all()这里我们做了一个 Monkey Patch(Monkey Patch 其实就是在运行时偷偷地把一些函数代换掉,这也是 Python 这门语言特性所决定 Python 可以进行 Monkey Patch),所以之后我们使用的sleep函数其实已经不是 Pytho 内建的sleep了,怎么观察到呢?可以这么做个实验:

>>> from time import sleep
>>> sleep
<built-in function sleep>
>>> from gevent.monkey import patch_all
>>> patch_all()
>>> from time import sleep
>>> sleep
<function sleep at 0x10a375758>

可以观察到后面的sleep函数已经不是原来的build-in版本了。这里需要注意的一点就是patch_all()后需要重新import time这个模块,或者把patch_all()的操作就放到最前面进行。

还有一点是我们把app.run()换成了geventWSGIServer,因为app.run()使用的其实是werkzeug.serving中的一个run_simple,仅仅能够在开发中使用?

接下来我们就可以实验下使用了gevent后的 Long Polling 效果。

我们用siege来模拟多用户并发访问:

siege http://127.0.0.1:5000/long-polling -c10 -t60s
** SIEGE 3.1.0
** Preparing 10 concurrent users for battle.
The server is now under siege...
HTTP/1.1 200  50.04 secs:      16 bytes ==> GET  /long-polling
HTTP/1.1 200  50.05 secs:      16 bytes ==> GET  /long-polling
HTTP/1.1 200  50.05 secs:      16 bytes ==> GET  /long-polling
HTTP/1.1 200  50.04 secs:      16 bytes ==> GET  /long-polling
HTTP/1.1 200  50.04 secs:      16 bytes ==> GET  /long-polling
HTTP/1.1 200  50.04 secs:      16 bytes ==> GET  /long-polling
HTTP/1.1 200  50.04 secs:      16 bytes ==> GET  /long-polling
HTTP/1.1 200  50.04 secs:      16 bytes ==> GET  /long-polling
HTTP/1.1 200  50.04 secs:      16 bytes ==> GET  /long-polling
HTTP/1.1 200  50.04 secs:      16 bytes ==> GET  /long-polling

Lifting the server siege...      done.

Transactions:		          10 hits
Availability:		      100.00 %
Elapsed time:		       59.33 secs
Data transferred:	        0.00 MB
Response time:		       50.04 secs
Transaction rate:	        0.17 trans/sec
Throughput:		        0.00 MB/sec
Concurrency:		        8.43
Successful transactions:          10
Failed transactions:	           0
Longest transaction:	       50.05
Shortest transaction:	       50.04
 
FILE: /usr/local/var/siege.log
You can disable this annoying message by editing
the .siegerc file in your home directory; change
the directive 'show-logfile' to false.

你可以看见光光一个进程我们就能够处理 10 个用户并发访问的情况,实际上他还能处理更多。

部署问题

有没有感觉每次都要把patch_all()放在最前面这样代码看起来实在是很恶心,而且我们不知道部署的时候要怎么处理好。

这里我们就可以用 Gunicorn,Gunicorn 可以使用-k指定 worker,当我们使用-k gevent时他会自己帮我们处理好需要的 Monkey Patch,那么我们就不用自己处理了。

完整的命令看起来会像是这样:

gunicorn -k gevent -w 4 -b 127.0.0.1:8881 myproject:app

其中-w 4是指定 4 个 worker 进程,那样整个 APP 的处理能力又会有所提升。

这么一来你的代码里就不需要显式的出现patch_all(),而是可以像往常一样写好阻塞的过程,等着 Gunicorn 帮你做 Monkey Patch。

甚至你的代码就按照原来写法:

from flask import Flask, jsonify
from time import sleep

app = Flask(__name__)
app.debug = True


@app.route('/long-polling')
def long_polling():
    i = 0
    while True:
        if i == 10:
            return jsonify({'error': 0})
        else:
            i += 1
            sleep(5)


if __name__ == '__main__':
    app.run()

在应用的主逻辑部分完全按照原来的写法,而在部署时交给 Gunicorn 让他使用gevent处理。

最后的最后

使用 gevent 还可以做很多有趣的事情,比如 WebSocket,WebSocket 就像是一个传统的 Socket,可以实时双向地传输数据,这个坑就以后再填吧 _(:з」∠)_