博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
twisted的defer模式和线程池
阅读量:7250 次
发布时间:2019-06-29

本文共 3832 字,大约阅读时间需要 12 分钟。

Reference: http://www.cnblogs.com/mumuxinfei/p/4528910.html

 

前言:

  最近帮朋友review其模块服务代码, 使用的是python的twisted网络框架. 鉴于之前并没有使用过, 于是决定好好研究一番.
  twisted的reactor模型很好的处理了网络IO事件, 以及定时任务触发. 但包处理后的业务逻辑操作, 需要根据具体的场景来决定.
  本文将讲述twisted如何实现half-sync/half-async的模式, 其线程池和defer模式是如何设计和使用的.

场景构造:

  twisted服务接受业务请求, 后端需要访问mysql. 由于mysql的接口是同步的, 如果安装twisted默认的方式处理话, 其业务操作(mysql)会阻塞reactor的IO事件循环. 这大大降低了twisted的服务能力. 
  为了解决该类问题, twisted支持线程池. 把业务逻辑和IO事件分离, IO操作依旧是异步的, 而业务逻辑则采用线程池来处理.

  

工作线程池:

  在具体讲述defer模式之前, 先谈谈reactor自带的线程池, 这也符合使用half-sync/half-async模式的直观理解.
  先来构造下一个基础样例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#! /usr/bin/python
#-*- coding: UTF-8 -*-
 
from 
twisted.internet 
import 
reactor
from 
twisted.internet 
import 
protocol
from 
twisted.protocols.basic 
import 
LineReceiver
 
import 
time
 
class 
DemoProtocol(LineReceiver):
                
    
def 
lineReceived(
self
, line):
        
# 进行数据包的处理
        
reactor.callInThread(
self
.handle_request, line)
     
    
def 
handle_request(
self
, line):
        
"""
            
hanlde_request:
                
进行具体的业务逻辑处理
        
"""
        
# 边使用sleep(1)来代替模拟
        
time.sleep(
1
)
        
# 借助callFromThread响应结果
        
reactor.callFromThread(
self
.write_response, line)
     
    
def 
write_response(
self
, result):
        
self
.transport.write(
"ack:" 
+ 
str
(result) 
+ 
"\r\n"
)
 
class 
DemoProtocolFactory(protocol.Factory):
    
def 
buildProtocol(
self
, addr):
        
return 
DemoProtocol()
     
 
reactor.listenTCP(
9090
, DemoProtocolFactory())
reactor.run()

  DemoProtocol在收到一行消息, 需要处理一个业务需耗时一秒, 于是其调用callInThread来借助reactor的线程池来执行.

  其callInThread的函数定义如下:

1
2
def 
callInThread(
self
, _callable, 
*
args, 
*
*
kwargs):
        
self
.getThreadPool().callInThread(_callable, 
*
args, 
*
*
kwargs)

  从中, 我们可以印证之前的观点, 借助线程池来完成耗时阻塞的业务工作.

  再来看一下callFromThread的函数定义:

1
2
3
4
def 
callFromThread(
self
, f, 
*
args, 
*
*
kw):
        
assert 
callable
(f), 
"%s is not callable" 
% 
(f,)
        
self
.threadCallQueue.append((f, args, kw))
        
self
.wakeUp()

  其作用是把回调放入主线程(也是reactor主事件循环)的待执行队列中, 并及时唤醒reactor.

  我们把写入响应的操作放入主循环中, 是为了让IO集中在主循环中进行, 避免潜在的线程不安全的问题.

defer模式:

  直接使用reactor的线程池, 非常容易实现half-sync/half-async的模式, 也让IO和业务逻辑隔离. 但reactor设计之初, 更倾向于隐藏其内部的线程池. 于是其引入了defer模式.
  让我们实现与上等同的代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#! /usr/bin/python
#-*- coding: UTF-8 -*-
 
from 
twisted.internet 
import 
reactor
from 
twisted.internet 
import 
protocol
from 
twisted.protocols.basic 
import 
LineReceiver
from 
twisted.internet.threads 
import 
deferToThread
 
import 
time
 
class 
DemoProtocol(LineReceiver):
                
    
def 
lineReceived(
self
, line):
        
# 进行数据包的处理
        
deferToThread(
self
.handle_request, line).addCallback(
self
.write_response)
     
    
def 
handle_request(
self
, line):
        
"""
            
hanlde_request:
                
进行具体的业务逻辑处理
        
"""
        
# 边使用sleep(1)来代替模拟
        
time.sleep(
1
)
        
return 
line
     
    
def 
write_response(
self
, result):
        
self
.transport.write(
"ack:" 
+ 
str
(result) 
+ 
"\r\n"
)
     
 
class 
DemoProtocolFactory(protocol.Factory):
    
def 
buildProtocol(
self
, addr):
        
return 
DemoProtocol()
     
 
reactor.listenTCP(
9090
, DemoProtocolFactory())
reactor.run()

  使用defer后, 代码更加的简洁. 其defer对象, 其实借用了线程池. 

  threads.deferToThread定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def 
deferToThread(f, 
*
args, 
*
*
kwargs):
    
from 
twisted.internet 
import 
reactor
    
return 
deferToThreadPool(reactor, reactor.getThreadPool(),
                             
f, 
*
args, 
*
*
kwargs)
 
def 
deferToThreadPool(reactor, threadpool, f, 
*
args, 
*
*
kwargs):
    
= 
defer.Deferred()
 
    
def 
onResult(success, result):
        
if 
success:
            
reactor.callFromThread(d.callback, result)
        
else
:
            
reactor.callFromThread(d.errback, result)
 
    
threadpool.callInThreadWithCallback(onResult, f, 
*
args, 
*
*
kwargs)
 
    
return 
d

  这边我们可以发现deferToThread, 就是间接调用了callInThread函数, 另一方面, 对其回调函数的执行结果, 进行了onCallback, 以及onErrback的调用. 这些回调函数在主线程中运行.

  defer模式简化了程序编写, 也改变了人们开发的思维模式.

测试回顾:

  使用telnet进行测试, 结果正常.
  
  另一方面, twisted的线程池, 其默认是采用延迟初始化的方式.
  服务开启时, 只有主线程一个, 随着请求的到来, 其按需产生更多的worker thread.
  而其线程池默认为10. 我们可以借助suggestThreadPoolSize方法来修改.

你可能感兴趣的文章
使用Matplotlib绘制正余弦函数、抛物线
查看>>
四位辉光管时钟-学长毕设
查看>>
大话RAC介质恢复---联机日志损坏
查看>>
oracle 内存分配和调优 总结
查看>>
移植最新版libmemcached到VC++的艰苦历程和经验总结(上)
查看>>
诡异的bug: tcsh陷入死循环
查看>>
java-第一章-上机练习-04
查看>>
Active Directory 基础 (1)
查看>>
xml地图生成网址
查看>>
Python 练习1
查看>>
TCExam文件代码注释分析(后台首页admin/code/index.php)
查看>>
Finereport在企业级BI分析中的应用
查看>>
linux内核参数注释与优化
查看>>
linux 2.6x内核升级
查看>>
pxe
查看>>
NFS网络文件系统安装
查看>>
网页嵌入自动生成当前网页二维码图片代码
查看>>
Linux时间同步服务
查看>>
Python基础-----列表、元组、集合(2)
查看>>
iptables详解
查看>>