博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python-- python threadpool 的前世今生
阅读量:5104 次
发布时间:2019-06-13

本文共 3115 字,大约阅读时间需要 10 分钟。

引出

首先需要了解的是threadpool 的用途,他更适合于用到一些大量的短任务合集,而非一些时间长的任务,换句话说,适合大量的CPU密集型短任务,那些消耗时间较长的IO密集型长任务适合用去解决。

目前,python 标准库(特指python2.X)中的threadpool模块是在 multiprocessing.pool.threadpool,或者multiprocessing.dummy.ThreadPool(dummy模块是针对threading 多线程的进一步封装)。该模块有个缺点就是在所有线程执行完之前无法强制退出。实现原理大同小异:实例化pool的时候会创建指定数目的线程,把task 传给一个task-queue,线程会读取task-queue 的task,没有就阻塞,读取到后就执行,并将结果交给一个result-queue。

除了标准库中的threadpool,还有一些使用比较多的threadpool,以下展开。

pip 中的 ThreadPool

安装简单:pip install threadpool

使用如下:

1 2 3 4
pool = ThreadPool(poolsize)   # 定义线程池,指定线程数量 requests = makeRequests(some_callable, list_of_args, callback) # 调用makeRequests创建了要开启多线程的函数,以及函数相关参数和回调函数  [pool.putRequest(req) for req in requests] # 所有要运行多线程的请求扔进线程池 pool.wait() # 等待所有线程完成后退出

原理类似,源码解读可以参考 ,该博客还给出了对其的一些优化。

自己定制 threadpool

根据需要的功能定制适合自己的threadpool 也是一种常见的手段,常用的功能比如:是否需要返回线程执行后的返回值,线程执行完之后销毁还是阻塞等等。以下为自己经常用的的一个比较简洁的threadpool,感谢@kaito-kidd提供,:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
# coding: utf8 """ 线程池,用于高效执行某些任务。 """ import Queue import threading class Task(threading.Thread): """ 任务 """ def __init__(self, num, input_queue, output_queue, error_queue): super(Task, self).__init__() self.thread_name = "thread-%s" % num self.input_queue = input_queue self.output_queue = output_queue self.error_queue = error_queue self.deamon = True def run(self): """run """ while 1: try: func, args = self.input_queue.get(block=False) except Queue.Empty: print "%s finished!" % self.thread_name break try: result = func(*args) except Exception as exc: self.error_queue.put((func.func_name, args, str(exc))) else: self.output_queue.put(result) class Pool(object): """ 线程池 """ def __init__(self, size): self.input_queue = Queue.Queue() self.output_queue = Queue.Queue() self.error_queue = Queue.Queue() self.tasks = [ Task(i, self.input_queue, self.output_queue, self.error_queue) for i in range(size) ] def add_task(self, func, args): """添加单个任务 """ if not isinstance(args, tuple): raise TypeError("args must be tuple type!") self.input_queue.put((func, args)) def add_tasks(self, tasks): """批量添加任务 """ if not isinstance(tasks, list): raise TypeError("tasks must be list type!") for func, args in tasks: self.add_task(func, args) def get_results(self): """获取执行结果集 """ while not self.output_queue.empty(): print "Result: ", self.output_queue.get() def get_errors(self): """获取执行失败的结果集 """ while not self.error_queue.empty(): func, args, error_info = self.error_queue.get() print "Error: func: %s, args : %s, error_info : %s" \ % (func.func_name, args, error_info) def run(self): """执行 """ for task in self.tasks: task.start() for task in self.tasks: task.join() def test(i): """test """ result = i * 10 return result def main(): """ main """ pool = Pool(size=5) pool.add_tasks([(test, (i,)) for i in range(100)]) pool.run() if __name__ == "__main__": main()

 

转载于:https://www.cnblogs.com/276815076/p/11233385.html

你可能感兴趣的文章
如何处理大量数据并发操作(转)
查看>>
JavaScript实现强制重定向至HTTPS页面
查看>>
2019年2月备战春招最新大数据+Java岗位+人工智能岗位资料免费送【限时领取】...
查看>>
.NET设计模式简析
查看>>
SQL高效率语句(二)
查看>>
web优化之-js动态合并 动态压缩 去掉js重复引用 js缓存 js延迟加载
查看>>
201704221048_《ES6模板字符串》
查看>>
【BZOJ-2595】游览计划 斯坦纳树
查看>>
Ubuntu——配置JDK
查看>>
导弹拦截版
查看>>
jzoj5195. 【NOIP2017提高组模拟7.3】A(递推,打表)
查看>>
robot framework接口测试之一-完整的测试用例
查看>>
IOS开发:使用lipo合并armv7,i386,armv7s库文件
查看>>
使用 udev 高效、动态地管理 Linux 设备文件
查看>>
Java8函数之旅(四) --四大函数接口
查看>>
django环境处理
查看>>
记一次企业级爬虫系统升级改造(三):文本分析与数据建模规则化处理
查看>>
javascript window对象
查看>>
Android定制组件之Widget之昨天今天明天
查看>>
JSON
查看>>