快上网专注成都网站设计 成都网站制作 成都网站建设
成都网站建设公司服务热线:028-86922220

网站建设知识

十年网站开发经验 + 多家企业客户 + 靠谱的建站团队

量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决

python函数多进程 python中的多线程和多进程

Python多进程运行——Multiprocessing基础教程2

上篇文章简单介绍了multiprocessing模块,本文将要介绍进程之间的数据共享和信息传递的概念。

10年积累的网站制作、成都网站建设经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先做网站后付款的网站建设流程,更有献县免费网站建设让你可以放心的选择与我们合作。

在多进程处理中,所有新创建的进程都会有这两个特点:独立运行,有自己的内存空间。

我们来举个例子展示一下:

这个程序的输出结果是:

在上面的程序中我们尝试在两个地方打印全局列表result的内容:

我们再用一张图来帮助理解记忆不同进程间的数据关系:

如果程序需要在不同的进程之间共享一些数据的话,该怎么做呢?不用担心,multiprocessing模块提供了Array对象和Value对象,用来在进程之间共享数据。

所谓Array对象和Value对象分别是指从共享内存中分配的ctypes数组和对象。我们直接来看一个例子,展示如何用Array对象和Value对象在进程之间共享数据:

程序输出的结果如下:

成功了!主程序和p1进程输出了同样的结果,说明程序中确实完成了不同进程间的数据共享。那么我们来详细看一下上面的程序做了什么:

在主程序中我们首先创建了一个Array对象:

向这个对象输入的第一个参数是数据类型:i表示整数,d代表浮点数。第二个参数是数组的大小,在这个例子中我们创建了包含4个元素的数组。

类似的,我们创建了一个Value对象:

我们只对Value对象输入了一个参数,那就是数据类型,与上述的方法一致。当然,我们还可以对其指定一个初始值(比如10),就像这样:

随后,我们在创建进程对象时,将刚创建好的两个对象:result和square_sum作为参数输入给进程:

在函数中result元素通过索引进行数组赋值,square_sum通过 value 属性进行赋值。

注意:为了完整打印result数组的结果,需要使用 result[:] 进行打印,而square_sum也需要使用 value 属性进行打印:

每当python程序启动时,同时也会启动一个服务器进程。随后,只要我们需要生成一个新进程,父进程就会连接到服务器并请求它派生一个新进程。这个服务器进程可以保存Python对象,并允许其他进程使用代理来操作它们。

multiprocessing模块提供了能够控制服务器进程的Manager类。所以,Manager类也提供了一种创建可以在不同流程之间共享的数据的方法。

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型,如列表、字典、队列、值、数组等。此外,单个管理器可以由网络上不同计算机上的进程共享。

但是,服务器进程管理器的速度比使用共享内存要慢。

让我们来看一个例子:

这个程序的输出结果是:

我们来理解一下这个程序做了什么:首先我们创建了一个manager对象

在with语句下的所有行,都是在manager对象的范围内的。接下来我们使用这个manager对象创建了列表(类似的,我们还可以用 manager.dict() 创建字典)。

最后我们创建了进程p1(用于在records列表中插入一条新的record)和p2(将records打印出来),并将records作为参数进行传递。

服务器进程的概念再次用下图总结一下:

为了能使多个流程能够正常工作,常常需要在它们之间进行一些通信,以便能够划分工作并汇总最后的结果。multiprocessing模块支持进程之间的两种通信通道:Queue和Pipe。

使用队列来回处理多进程之间的通信是一种比较简单的方法。任何Python对象都可以使用队列进行传递。我们来看一个例子:

上面这个程序的输出结果是:

我们来看一下上面这个程序到底做了什么。首先我们创建了一个Queue对象:

然后,将这个空的Queue对象输入square_list函数。该函数会将列表中的数平方,再使用 put() 方法放入队列中:

随后使用 get() 方法,将q打印出来,直至q重新称为一个空的Queue对象:

我们还是用一张图来帮助理解记忆:

一个Pipe对象只能有两个端点。因此,当进程只需要双向通信时,它会比Queue对象更好用。

multiprocessing模块提供了 Pipe() 函数,该函数返回由管道连接的一对连接对象。 Pipe() 返回的两个连接对象分别表示管道的两端。每个连接对象都有 send() 和 recv() 方法。

我们来看一个例子:

上面这个程序的输出结果是:

我们还是来看一下这个程序到底做了什么。首先创建了一个Pipe对象:

与上文说的一样,该对象返回了一对管道两端的两个连接对象。然后使用 send() 方法和 recv() 方法进行信息的传递。就这么简单。在上面的程序中,我们从一端向另一端发送一串消息。在另一端,我们收到消息,并在收到END消息时退出。

要注意的是,如果两个进程(或线程)同时尝试从管道的同一端读取或写入管道中的数据,则管道中的数据可能会损坏。不过不同的进程同时使用管道的两端是没有问题的。还要注意,Queue对象在进程之间进行了适当的同步,但代价是增加了计算复杂度。因此,Queue对象对于线程和进程是相对安全的。

最后我们还是用一张图来示意:

Python的multiprocessing模块还剩最后一篇文章:多进程的同步与池化

敬请期待啦!

Python的多进程模块multiprocessing

众所周知,Python中不存在真正的多线程,Python中的多线程是一个并发过程。如果想要并行的执行程序,充分的利用cpu资源(cpu核心),还是需要使用多进程解决的。其中multiprocessing模块应该是Python中最常用的多进程模块了。

基本上multiprocessing这个模块和threading这个模块用法是相同的,也是可以通过函数和类创建进程。

上述案例基本上就是笔者搬用了上篇文章多线程的案例,可见其使用的相似之处。导入multiprocessing后实例化Process就可以创建一个进程,参数的话也是和多线程一样,target放置进程执行函数,args存放该函数的参数。

使用类来创建进程也是需要先继承multiprocessing.Process并且实现其init方法。

Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求。

但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程。

需要注意的是,在调用join方法阻塞进程前,需要先调用close方法,,否则程序会出错。

在上述案例中,提到了非阻塞,当把创建进程的方法换为pool.apply(func, (msg,))时,就会阻塞进程,出现下面的状况。

在multiprocessing模块中还存在Queue对象,这是一个进程的安全队列,近似queue.Queue。队列一般也是需要配合多线程或者多进程使用。

下列案例是一个使用进程队列实现的生产者消费者模式。

multiprocessing支持两种进程间的通信,其中一种便是上述案例的队列,另一种则称作管道。在官方文档的描述中,multiprocessing中的队列是基于管道实现的,并且拥有更高的读写效率。

管道可以理解为进程间的通道,使用Pipe([duplex])创建,并返回一个元组(conn1,conn2)。如果duplex被置为True(默认值),那么该管道是双向的,如果duplex被置为False,那么该管道是单向的,即conn1只能用于接收消息,而conn2仅能用于发送消息。

其中conn1、conn2表示管道两端的连接对象,每个连接对象都有send()和recv()方法。send和recv方法分别是发送和接受消息的方法。例如,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。

关于multiprocessing模块其实还有很多实用的类和方法,由于篇幅有限(懒),笔者就先写到这里。该模块其实用起来很像threading模块,像锁对象和守护线程(进程)等multiprocessing模块也是有的,使用方法也近乎相同。

如果想要更加详细的了解multiprocessing模块,请参考官方文档。

python 多进程

基于官方文档:

日乐购,刚才看到的一个博客,写的都不太对,还是基于官方的比较稳妥

我就是喜欢抄官方的,哈哈

通常我们使用Process实例化一个进程,并调用 他的 start() 方法启动它。

这种方法和 Thread 是一样的。

上图中,我写了 p.join() 所以主进程是 等待 子进程执行完后,才执行 print("运行结束")

否则就是反过来了(这个不一定,看你的语句了,顺序其实是随机的)例如:

主进加个 sleep

所以不加join() ,其实子进程和主进程是各干各的,谁也不等谁。都执行完后,文件运行就结束了

上面我们用了 os.getpid() 和 os.getppid() 获取 当前进程,和父进程的id

下面就讲一下,这两个函数的用法:

os.getpid()

返回当前进程的id

os.getppid()

返回父进程的id。 父进程退出后,unix 返回初始化进程(1)中的一个

windows返回相同的id (可能被其他进程使用了)

这也就解释了,为啥我上面 的程序运行多次, 第一次打印的parentid 都是 14212 了。

而子进程的父级 process id 是调用他的那个进程的 id : 1940

视频笔记:

多进程:使用大致方法:

参考: 进程通信(pipe和queue)

pool.map (函数可以有return 也可以共享内存或queue) 结果直接是个列表

poll.apply_async() (同map,只不过是一个进程,返回结果用 xx.get() 获得)

报错:

参考 :

把 pool = Pool() 放到 if name == " main ": 下面初始化搞定。

结果:

这个肯定有解释的

测试多进程计算效果:

进程池运行:

结果:

普通计算:

我们同样传入 1 2 10 三个参数测试:

其实对比下来开始快了一半的;

我们把循环里的数字去掉一个 0;

单进程:

多进程:

两次测试 单进程/进程池 分别为 0.669 和 0.772 几乎成正比的。

问题 二:

视图:

post 视图里面

Music 类:

直接报错:

写在 类里面也 在函数里用 self.pool 调用也不行,也是相同的错误。

最后 把 pool = Pool 直接写在 search 函数里面,奇迹出现了:

前台也能显示搜索的音乐结果了

总结一点,进程这个东西,最好 写在 直接运行的函数里面,而不是 一个函数跳来跳去。因为最后可能 是在子进程的子进程运行的,这是不许的,会报错。

还有一点,多进程运行的函数对象,不能是 lambda 函数。也许lambda 虚拟,在内存??

使用 pool.map 子进程 函数报错,导致整个 pool 挂了:

参考:

主要你要,对函数内部捕获错误,而不能让异常抛出就可以了。

关于map 传多个函数参数

我一开始,就是正常思维,多个参数,搞个元祖,让参数一一对应不就行了:

报错:

参考:

普通的 process 当让可以穿多个参数,map 却不知道咋传的。

apply_async 和map 一样,不知道咋传的。

最简单的方法:

使用 starmap 而不是 map

结果:

子进程结束

1.8399453163146973

成功拿到结果了

关于map 和 starmap 不同的地方看源码:

关于apply_async() ,我没找到多参数的方法,大不了用 一个迭代的 starmap 实现。哈哈

关于 上面源码里面有 itertools.starmap

itertools 用法参考:

有个问题,多进程最好不要使用全部的 cpu , 因为这样可能影响其他任务,所以 在进程池 添加 process 参数 指定,cpu 个数:

上面就是预留了 一个cpu 干其他事的

后面直接使用 Queue 遇到这个问题:

解决:

Manager().Queue() 代替 Queue()

因为 queue.get() 是堵塞型的,所以可以提前判断是不是 空的,以免堵塞进程。比如下面这样:

使用 queue.empty() 空为True

Python多进程multiprocessing模块介绍

multiprocessing 是一个支持使用与 threading 模块类似的 API 来产生进程的包。 multiprocessing 包同时提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了 全局解释器锁。 因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。 它在 Unix 和 Windows 上均可运行。

1、multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

2、相关方法

输出结果如下:

Pool提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。下面的例子演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用了 Pool 。

将在标准输出中打印

其中:

(1)p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func( args, kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()

(2)p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func( args,**kwargs),然后返回结果。此方法的结果是 AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。多进程并发!

(3)p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成

(4)p.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

python多进程报错load_eof

python 多进程报错(创建运行多进程)

简单说一下python的多进程包multiprocessing。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。

方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。

属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

1.当我们希望使用python创建一个多进程运行时,碰到下面的报错提示:

2.解决的方法很简单,只需要将你的代码放到 if __name__ == "__main__"下面,如下图:

3.下面是测试多进程运行的程序。

import time

import random

from multiprocessing import Process

def run(name):

print(f' 开始运行 {name} 进程...')

# 睡眠一个1~5的随机数,做进程对比

time.sleep(random.randrange(1,5))

print(f' {name} 进程运行结束。')

if __name__ == "__main__":

p1 = Process(target=run, args=('my_jcy',)) # 必须加,号

p2 = Process(target=run, args=('my_mm',)) # 必须加,号

p1.start()

p2.start()

print('这里是主进程,已结束!')

如何使用Python实现多进程编程

1. Process

创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。

方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。

属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

例1.1:创建函数并将其作为单个进程

import multiprocessing

import time

def worker(interval):

n = 5

while n  0:

print("The time is {0}".format(time.ctime()))

time.sleep(interval)

n -= 1

if __name__ == "__main__":

p = multiprocessing.Process(target = worker, args = (3,))

p.start()

print "p.pid:", p.pid

print "p.name:", p.name

print "p.is_alive:", p.is_alive()

结果

12345678    p.pid: 8736p.name: Process-1p.is_alive: TrueThe time is Tue Apr 21 20:55:12 2015The time is Tue Apr 21 20:55:15 2015The time is Tue Apr 21 20:55:18 2015The time is Tue Apr 21 20:55:21 2015The time is Tue Apr 21 20:55:24 2015    

例1.2:创建函数并将其作为多个进程

import multiprocessing

import time

def worker_1(interval):

print "worker_1"

time.sleep(interval)

print "end worker_1"

def worker_2(interval):

print "worker_2"

time.sleep(interval)

print "end worker_2"

def worker_3(interval):

print "worker_3"

time.sleep(interval)

print "end worker_3"

if __name__ == "__main__":

p1 = multiprocessing.Process(target = worker_1, args = (2,))

p2 = multiprocessing.Process(target = worker_2, args = (3,))

p3 = multiprocessing.Process(target = worker_3, args = (4,))

p1.start()

p2.start()

p3.start()

print("The number of CPU is:" + str(multiprocessing.cpu_count()))

for p in multiprocessing.active_children():

print("child   p.name:" + p.name + "\tp.id" + str(p.pid))

print "END!!!!!!!!!!!!!!!!!"

结果

1234567891011    The number of CPU is:4child   p.name:Process-3    p.id7992child   p.name:Process-2    p.id4204child   p.name:Process-1    p.id6380END!!!!!!!!!!!!!!!!!worker_1worker_3worker_2end worker_1end worker_2end worker_3    

例1.3:将进程定义为类

import multiprocessing

import time

class ClockProcess(multiprocessing.Process):

def __init__(self, interval):

multiprocessing.Process.__init__(self)

self.interval = interval

def run(self):

n = 5

while n  0:

print("the time is {0}".format(time.ctime()))

time.sleep(self.interval)

n -= 1

if __name__ == '__main__':

p = ClockProcess(3)

p.start()      

注:进程p调用start()时,自动调用run()

结果

12345    the time is Tue Apr 21 20:31:30 2015the time is Tue Apr 21 20:31:33 2015the time is Tue Apr 21 20:31:36 2015the time is Tue Apr 21 20:31:39 2015the time is Tue Apr 21 20:31:42 2015


分享文章:python函数多进程 python中的多线程和多进程
分享链接:http://6mz.cn/article/docegdg.html

其他资讯