十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
多进程/多线程+Queue
创新互联建站成立与2013年,先为西青等服务建站,西青等地企业,进行企业商务咨询服务。为西青企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。
一般来说,在Python中编写并发程序的经验是:计算密集型任务使用多进程,IO密集型任务使用多进程或者多线程.另外,因为涉及到资源共享,所以需要同步锁等一系列麻烦的步骤,代码编写不直观.另外一种好的思路是利用多进程/多线程+Queue的方法,可以避免加锁这样麻烦低效的方式.
现在在Python2中利用Queue+多进程的方法来处理一个IO密集型任务.
假设现在需要下载多个网页内容并进行解析,单进程的方式效率很低,所以使用多进程/多线程势在必行.
我们可以先初始化一个tasks队列,里面将要存储的是一系列dest_url,同时开启4个进程向tasks中取任务然后执行,处理结果存储在一个results队列中,最后对results中的结果进行解析.最后关闭两个队列.
下面是一些主要的逻辑代码.
# -*- coding:utf-8 -*-
#IO密集型任务
#多个进程同时下载多个网页
#利用Queue+多进程
#由于是IO密集型,所以同样可以利用threading模块
import multiprocessing
def main():
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
cpu_count = multiprocessing.cpu_count() #进程数目==CPU核数目
create_process(tasks, results, cpu_count) #主进程马上创建一系列进程,但是由于阻塞队列tasks开始为空,副进程全部被阻塞
add_tasks(tasks) #开始往tasks中添加任务
parse(tasks, results) #最后主进程等待其他线程处理完成结果
def create_process(tasks, results, cpu_count):
for _ in range(cpu_count):
p = multiprocessing.Process(target=_worker, args=(tasks, results)) #根据_worker创建对应的进程
p.daemon = True #让所有进程可以随主进程结束而结束
p.start() #启动
def _worker(tasks, results):
while True: #因为前面所有线程都设置了daemon=True,故不会无限循环
try:
task = tasks.get() #如果tasks中没有任务,则阻塞
result = _download(task)
results.put(result) #some exceptions do not handled
finally:
tasks.task_done()
def add_tasks(tasks):
for url in get_urls(): #get_urls() return a urls_list
tasks.put(url)
def parse(tasks, results):
try:
tasks.join()
except KeyboardInterrupt as err:
print "Tasks has been stopped!"
print err
while not results.empty():
_parse(results)
if __name__ == '__main__':
main()
利用Python3中的concurrent.futures包
在Python3中可以利用concurrent.futures包,编写更加简单易用的多线程/多进程代码.其使用感觉和Java的concurrent框架很相似(借鉴?)
比如下面的简单代码示例
def handler():
futures = set()
with concurrent.futures.ProcessPoolExecutor(max_workers=cpu_count) as executor:
for task in get_task(tasks):
future = executor.submit(task)
futures.add(future)
def wait_for(futures):
try:
for future in concurrent.futures.as_completed(futures):
err = futures.exception()
if not err:
result = future.result()
else:
raise err
except KeyboardInterrupt as e:
for future in futures:
future.cancel()
print "Task has been canceled!"
print e
return result
总结
要是一些大型Python项目也这般编写,那么效率也太低了.在Python中有许多已有的框架使用,使用它们起来更加高效.
但是自己的一些"小打小闹"的程序这样来编写还是不错的.:)
#!/usr/bin/envpython#-*-coding:utf-8-*-#author:ChanghuaGongimporttime,threading#fromurllib.requestimportRequest,urlopenpy3#fromurllib.errorimportURLErrorpy3importurllib2#URLreq=urllib2.Request('
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# author: Changhua Gong
import time,threading
# from urllib.request import Request, urlopen py3
# from urllib.error import URLError py3
import urllib2
#URL
req = urllib2.Request('')
#
rule = {0:500,1:30}
'''
Rule规则:0:50,第一次运行不睡眠即为0,直接并发50次;1:20,第二秒,相当于睡眠1秒,然后并发20次,
如第三秒需并发500次,则rule = {0:50,1:20,1:500}
'''
#Open url
def geturl():
time_b = time.time()
try:
response = urllib2.urlopen(req)
print(response.read().decode("utf-8")) # 打印输出内容
except urllib2.URLError as e:
if hasattr(e, 'reason'):
print('We failed to reach a server.')
print('Reason: ', e.reason)
elif hasattr(e, 'code'):
print('The server couldn/'t fulfill the request.')
print('Error code: ', e.code)
time_e = time.time()
print("Thread %s runned for %ss" % (threading.current_thread().name, (time_e - time_b))) #线程访问时效
if __name__=='__main__':
for k in rule:
time.sleep(k)
for i in range(rule[k]):
t = threading.Thread(target=geturl)
t.start()
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
遇到IO阻塞时会自动切换任务
上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,
而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了
from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前
我们可以用threading.current_thread().getName()来查看每个g1和g2,查看的结果为DummyThread-n,即假线程
通过gevent实现单线程下的socket并发(from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞)
服务端
客户端
多线程并发多个客户端
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:
例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数..
ps: 对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。
创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程
参数介绍:
方法介绍:
主要方法:
其他方法(了解部分)
应用:
发现:并发开启多个客户端,服务端同一时间只有3个不同的pid,干掉一个客户端,另外一个客户端才会进来,被3个进程之一处理
回调函数:
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数
并行和并发
无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务。
并发是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发,(并行也属于并发),简单的可以理解为快速在多个线程来回切换,感觉好像同时在做多个事情。
只有具备多个cpu才能实现并行,单核下,可以利用多道技术,多个核,每个核也都可以利用多道技术(多道技术是针对单核而言的)。 有四个核,六个任务,这样同一时间有四个任务被执行,假设分别被分配给了cpu1,cpu2,cpu3,cpu4,一旦任务1遇到I/O就被迫中断执行,此时任务5就拿到cpu1的时间片去执行,这就是单核下的多道技术 ,而一旦任务1的I/O结束了,操作系统会重新调用它(需知进程的调度、分配给哪个cpu运行,由操作系统说了算),可能被分配给四个cpu中的任意一个去执行。
相关推荐:《Python视频教程》
多道技术:内存中同时存入多道(多个)程序,cpu从一个进程快速切换到另外一个,使每个进程各自运行几十或几百毫秒,这样,虽然在某一个瞬间,一个cpu只能执行一个任务,但在1秒内,cpu却可以运行多个进程,这就给人产生了并行的错觉,即伪并发,以此来区分多处理器操作系统的真正硬件并行(多个cpu共享同一个物理内存)。
同步执行:一个进程在执行某个任务时,另外一个进程必须等待其执行完毕,才能继续执行。
异步执行:一个进程在执行某个任务时,另外一个进程无需等待其执行完毕,就可以继续执行,当有消息返回时,系统会通知后者进行处理,这样可以提高执行效率。
举个例子,打电话时就是同步通信,发短息时就是异步通信。
相关推荐:
Python如何实现线程间同步
为了让代码能够并发执行,向创建线程并在核实的时候销毁它。
由于目的比较单纯,只是讲解基础的线程创建方法,所以可以直接使用threading库中的Thread类来实例化一个线程对象。
例子,用户输入两个数字,并且求其两个数字的四则运算的结果:
除了以上的一些功能以外,在python线程
中没有其他的诸如给线程发信号、设置线程调度属性、执行任何其他高级操作的功能了,如果需要这些功能,就需要手工编写了。
另外,需要注意的是,由于GIL(全局解释器锁)的存在,限制了在python解释器当中只允许运行一个线程。基于这个原因,不停该使用python线程来处理计算密集型的任务,因为在这种任务重我们希望在多个CPU核心上实现并行处理。Python线程更适合于IO处理以及设计阻塞操作的并发执行任务(即等待IO响应或等待数据库取出结果等)。
如何判断线程是否已经启动?
目的:我们加载了一个线程,但是想要知道这个线程什么时候才会开始运行?
方法:
线程的核心特征我认为就是不确定性,因为其什么时候开始运行,什么时候被打断,什么时候恢复执行,这不是程序员能够控制的,而是有系统调度
来完成的。如果遇到像某个线程的运行依托于其他某个线程运行到某个状态时该线程才能开始运行,那么这就是线程同步
问题,同样这个问题非常棘手。要解决这类问题我们要借助threading中的Event对象。
Event其实和条件标记类似,匀速线程
等待某个时间发生。初始状态时事件被设置成0。如果事件没有被设置而线程正在等待该事件,那么线程就会被阻塞,直到事件被设置位置,当有线程设置了这个事件之后,那么就会唤醒正在等待事件的线程,如果线程等待的事件已经设置了,那么线程会继续执行。
一个例子:
如上能够确定的是,主线程会在线程t运行结束时再运行。