【4858.com】多进程的着力语法,知识十遗篇

By admin in 4858.com on 2019年4月5日

 

多进程

进度之间是互为独立的,python是开行进度的时候,是运维的是原生进度。过程是未曾GIL锁的,而且不存在锁的定义,进程之间的数据式无法共享的,而线程是足以的。

当10贰线程创制完结之后,start并从未了立时运营,仍然须求和其他线程抢CPU的资格,只是
光阴非常的短。
经过之间的通信分为两种,queue和pipe

写一个game 循环
game
loop是种种游戏的中央.它不停的取得用户输入,更新游戏状态,渲染游戏结果到荧屏上.网页游戏分为客户端和服务端两部分.两边的loop通过网络连接起来.平常状态下,客户端获取用户输入,发送到服务端,服务端处理总括数据,更新玩家状态,发送结果个客户端.比如玩家或然游戏物体的地方.分外重大的是,不要把客户端和服务端的成效混淆了,假使没有充裕的理由的话.
假诺在客户端做游戏总计,那么分化的客户端格外简单就差异步了.

进程

一、进度的概念

用muliprocessing那一个包中的Process来定义多进度,跟定义多线程类似

from multiprocessing import Process   # 导入进程模块
import time


def run(name):
    time.sleep(2)
    print("hello", name)

if __name__ == "__main__":
    p_obj_list = list()    # 存放进程对象
    for i in range(10):    # 启动10个进程
        p = Process(target=run, args=("QQ{0}".format(i),))  # 产生一个进程实例
        p.start()   # 启动进程
        p_obj_list.append(p)

    for p in p_obj_list:
        p.join()   # 等待进程结果
 1 import multiprocessing
 2 def foo(q):
 3     q.put([1,'hello',True])
 4 if __name__=='__main__':
 5     q=multiprocessing.Queue()#通过multiprocessing建立一个队列
 6     p=multiprocessing.Process(target=foo,args=(q,))
 7   #用multiprocessing在调用Process,建立一个子进程,定义函数名,将q作为参数传到foo函数,
 8     #foo函数就可以通过这个参数来与主进程做交互了。
 9     p.start()#激活这个子进程
10     print(q.get())#主进程

A game loop iteration is often called a tick. Tick is an event
meaning that current game loop iteration is over and the data for the
next frame(s) is ready.

1.含义:总括机中的程序关于某数码集合上的2次运营活动,是系统开始展览财富分配和调度的主导单位。说白了正是2个主次的推行实例。

实施3个主次正是一个历程,比如你打开浏览器看到本人的博客,浏览器自身是七个软件程序,你此时打开的浏览器正是二个经过。

 

二、进程中参加线程

from multiprocessing import Process
import time,threading


def thread_run(name):   # 定义线程执行的方法
    print("{0}:{1}".format(name, threading.get_ident()))  # thread.get_ident ()返回当前线程的标识符,标识符是一个非零整数


def run(name):
    time.sleep(2)
    print("hello", name)
    t = threading.Thread(target=thread_run, args=(name,))   # 嵌入线程
    t.start()   # 执行线程


if __name__ == "__main__":
    p_obj_list = list()
    for i in range(10):
        p = Process(target=run, args=("QQ{0}".format(i),))
        p.start()
        p_obj_list.append(p)

    for p in p_obj_list:
        p.join()

上面函数通过multiprocessing的queue来实现进程间通讯。

在下3个例证中,大家写多少个客户端,那几个客户端通过WebSocket连接服务器,同时运营3个简便的loop,接受输入发送给服务器,回显消息.Client
source code is located
here.

二.历程的特征

  • ### 二个进度里能够有多少个子进度

  • ### 新的经过的创造是一点1滴拷贝整个主进度

  • ### 进度里能够包括线程

  • ### 进度之间(包涵主进度和子进度)不设有多中国少年共产党享,相互通讯(浏览器和python之间的数据不能互通的),要通讯则要注重队列,管道之类的

 

三、老爹和儿子进度

各样子进程都以由多个父进度运营的,每一种程序也是有二个父进度

from multiprocessing import Process
import os


def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  # 获得父进程ID
    print('process id:', os.getpid())  # 获得子进程ID
    print("\n\n")


def f(name):
    info('\033[31;1m function f\033[0m')
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1m main process line\033[0m')
    p = Process(target=f, args=('QQ',))
    p.start()
    p.join()

  

 

 

 1 from multiprocessing import  Pipe,Process
 2 def foo(sk):
 3     sk.send('hello world')#通过管道sk发送内容
 4     print(sk.recv())#打印接收到的内容
 5 if __name__ == '__main__':
 6     sock,conn=Pipe()#定义一个管道的两头
 7     p=Process(target=foo,args=(sock,))#由于上面已经通过multiprocessing导入了Process,
 8     # 所以这里直接就可以创建一个子进程,并将sock(管道的一头)作为参数给foo函数
 9     p.start()#激活这个进程
10     print(conn.recv())#打印接收到的内容,conn是管道的另一头
11     conn.send('hi son')#通过管道发送内容

3.1

Example 3.1 source
code

我们使用aiohttp来创立二个game
server.那个库能够成立asyncio的client和server.这一个库的补益是同时帮衬http请求和websocket.所以服务器就不须求把结果处理成html了.
来看一下server怎么样运转:

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


async def wshandler(request):
    app = request.app
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    app["sockets"].append(ws)

    while 1:
        msg = await ws.receive()
        if msg.tp == web.MsgType.text:
            print("Got message %s" % msg.data)
            ws.send_str("Pressed key code: {}".format(msg.data))
        elif msg.tp == web.MsgType.close or\
             msg.tp == web.MsgType.error:
            break

    app["sockets"].remove(ws)
    print("Closed connection")
    return ws

async def game_loop(app):
    while 1:
        for ws in app["sockets"]:
            ws.send_str("game loop says: tick")
        await asyncio.sleep(2)


app = web.Application()
app["sockets"] = []

asyncio.ensure_future(game_loop(app))

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

以此代码就不翻译了,

叁.进程和线程之间的区分

  • ### 线程共享地址空间,而经过之间有相互独立的半空中

  • ### 线程之间数据互通,互相操作,而经过不得以

  • ### 新的线程比新的进程成立不难,比开进度的付出小很多

  • ### 主线程能够影响子线程,而主进度无法影响子进度

 

 

经过间数据交互与共享

精通分歧进程之间内部存款和储蓄器是不共享的,要想完成七个进程间的通讯供给选拔multiprocessing库中的queue(队列)模块,那个multiprocessing库中的queue模块跟单纯的queue库是差异的。进度导入前者(那里的queue是特意为经过之间的通讯设计的)不失误,导入后者(那里的queue主要是线程间数据交互)出错。

下边代码通过Pipe来兑现三个经过间的通信。

3.二 有请求才先河loop

地方的例证,server是不停的loop.以往改成有请求才loop.
同时,server上也许存在多个room.1个player创设了七个session(一场比赛照旧三个副本?),其余的player能够参加.

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


async def wshandler(request):
    app = request.app
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    app["sockets"].append(ws)

    if app["game_is_running"] == False:
        asyncio.ensure_future(game_loop(app))
    while 1:
        msg = await ws.receive()
        if msg.tp == web.MsgType.text:
            print("Got message %s" % msg.data)
            ws.send_str("Pressed key code: {}".format(msg.data))
        elif msg.tp == web.MsgType.close or\
             msg.tp == web.MsgType.error:
            break

    app["sockets"].remove(ws)
    print("Closed connection")

    return ws

async def game_loop(app):
    app["game_is_running"] = True
    while 1:
        for ws in app["sockets"]:
            ws.send_str("game loop says: tick")
        if len(app["sockets"]) == 0:
            break
        await asyncio.sleep(2)
    app["game_is_running"] = False


app = web.Application()

app["sockets"] = []
app["game_is_running"] = False

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

四.在python中,进程与线程的用法就只是名字不一致,使用的格局也是没多大分歧

一、线程访问queue

import queue,threading


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = queue.Queue()   # 把这个q传给了子线程
    p = threading.Thread(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
[66, None, 'hello word']
 1 from multiprocessing import  Manager,Process
 2 def foo(l,i):#收到参数,l是Mlist,i是循环的i
 3     l.append(i*i)#将i平方添加到Mlist
 4 if __name__=='__main__':
 5     manager=Manager()
 6     Mlist=manager.list([11,22,33])#定义一个列表
 7 
 8     l=[]
 9     for i in range(5):#创建5个子进程
10         p=Process(target=foo,args=(Mlist,i))#定义一个进程,将Mlist和i作为参数传到foo
11         p.start()#激活这个进程,执行foo函数
12         l.append(p)#将5个进程添加到l这个列表
13     for i in l:
14         i.join()#循环这个列表,然后将每个进程join
15     print(Mlist)#当所有的子进程都结束,运行主进程

3.3 管理task

直接操作task对象.未有人的时候,能够cancel掉task.
注意!:
【4858.com】多进程的着力语法,知识十遗篇。This cancel()
call tells scheduler not to pass execution to this coroutine anymore and
sets its state tocancelled
which then can be checked by cancelled()
method. And here is one caveat worth to mention: when you have external
references to a task object and exception happens in this task, this
exception will not be raised. Instead, an exception is set to this task
and may be checked by exception()
method. Such silent fails are not useful when debugging a code. Thus,
you may want to raise all exceptions instead. To do so you need to call
result()
method of unfinished task explicitly. This can be done in a callback:

假如想要cancel掉,也不想触发exception,那么就检查一下canceled状态.
app["game_loop"].add_done_callback(lambda t: t.result() if not t.cancelled() else None)

伍.归纳实例

一)创造三个简练的多进度:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

def func(name):
    time.sleep(1)
    print('hello',name,time.ctime())

ml = []
for i in range(3):
    p = multiprocessing.Process(target=func,args=('yang',))
    p.start()
    ml.append(p)

for i in ml:
    i.join() #注意这里,进程必须加join方法,不然会导致僵尸进程

  

运维结果:

4858.com 1

 

不管怎么说,反正报错了,同样的代码,在python自带的IDLE里尝试:

4858.com 2

未有任杨晓伟西就得了了。好的,这里要说下了,遵照本身个人的知道,当您用pycharm可能IDLE时,pycharm可能IDLE在你的微机里作者也是三个进度,并且私下认可是主进度。所以在pycharm会报错,而在IDLE里运维正是空手,个人驾驭,对不对一时不谈,前期学到子进度时再说。

 

消除办法正是,其余的不变,加一个if __name == ‘__main__’判断就行:

4858.com 3

 

如此那般就化解了,好的,你未来得以回味到那句话了,进度与线程的用法就只是名字差别,使用的不二等秘书籍也是没多大分别。不多说,自行体会。而运作结果看出的时日是一道的,那么那进程才是的确含义上的交互运转。

 

二)自定义类式进度

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

class myprocess(multiprocessing.Process):
    def __init__(self,name):
        super(myprocess,self).__init__()
        self.name = name

    def run(self):
        time.sleep(1)
        print('hello',self.name,time.ctime())

if __name__ == '__main__':
    ml = []
    for i in range(3):
        p = myprocess('yang')
        p.start()
        ml.append(p)

    for j in ml:
        j.join()

  

运营结果:

4858.com 4

 

 

下一场setDaemon之类的形式和线程也是完全壹致的。

 

三)每一个进程都有根进度,换句话,每四个进度都有父进度

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time,os

def info():
    print('mudule name:',__name__)
    print('parent process:',os.getppid()) #父进程号
    print('son process:',os.getpid())     #子进程号

if __name__ == '__main__':
    info()
    print('-----')
    p = multiprocessing.Process(target=info,args=[])
    p.start()
    p.join()

  

运营结果:

 

4858.com 5

 

而查看本人本机的长河:

4858.com 6

 

可以驾驭,620肆就是pycharm,就是那时的根进程,而主进度就是自家那些py文件(由__main__能够),接着再往下的子进程等等等的。

 

二、进度访问queue

from multiprocessing import Process
import queue


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = queue.Queue()   # 把这个q传给了子线程
    p = Process(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
Traceback (most recent call last):
  File "C:/Users/dell/PycharmProjects/untitled/process/进程的定义.py", line 77, in <module>
    p.start()
  File "C:\Python36\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Python36\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Python36\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Python36\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Python36\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

上边代码通过Manger达成子进程间的通信。

三.肆 等待七个事件

Example 3.4 source
code
在无数状态下,须要在服务器处理客户端的handler中,
等待多少个事件.除了等候客户端的音信,只怕还亟需拭目以俟不一样的新闻产生.比如,
游戏1局的小运到了,须要八个timer的能量信号.或许,供给此外进程的音讯,可能其余server的音讯.(使用分布式音讯系统).
上面这几个例子使用了Condition.那里不保留全体的socket,而是在每趟循环结束通过Condition.notify_all来公告.那一个利用pub/sub情势达成.
为了在叁个handler中,等待七个事件,首先我们运用ensure_future来包装一下.

if not recv_task: 
  recv_task = asyncio.ensure_future(ws.receive())
if not tick_task: 
  await tick.acquire() 
  tick_task = asyncio.ensure_future(tick.wait())```

在我们调用Condition.call之前,我们需要获取一下锁.这个锁在调用了tick.wait之后就释放掉.这样其他的协程也可以用了.但是当我们得到一个notification, 会重新获取锁.所以我们在收到notification之后要release一下.

done, pending = await asyncio.wait( [recv_task, tick_task],
return_when=asyncio.FIRST_COMPLETED)“`
以此会阻塞住直到有三个职务成功,今年会回到八个列表,实现的和依旧在运行的.若是task
is done,大家再安装为None,那样下3个循环里会再二次创设.

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)



tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

async def game_loop():
    while 1:
        await tick.acquire()
        tick.notify_all()
        tick.release()
        await asyncio.sleep(1)

asyncio.ensure_future(game_loop())

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

(那几个根本是asyncio.Condition的用法)

陆.多进度间的通讯和多中国少年共产党享

第1大家都早已精晓进度之间是单身的,不得以互通,并且数据交互独立,而在其实费用中,一定会境遇必要经过间通信的情景供给,那么大家怎么搞呢

有三种办法:

  • pipe
  • queue

1)使用queue通信

在10二线程那里已经学过queue了,创设queue的诀要,q =
queue.Queue(),那种创建是创办的线程queue,并不是进程queue。制程queue的措施是:

4858.com 7

 

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(q,name,age): #这里必须要把q对象作为参数传入才能实现进程之间通信
    q.put({'name':name,'age':age})

if __name__ == '__main__':
    q = multiprocessing.Queue() #创建进程queue对象
    ml = []
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(q,'yang',21))
        p.start()
        ml.append(p)
    print(q.get()) #获取queue信息
    print(q.get()) 
    print(q.get())
    for i in ml:
        i.join()

  

运转结果:

4858.com 8

 

好的,已经经过queue实现通讯,那么细心的情侣恐怕会想,此时的queue到底是同二个吧还是copy的吧?早先测试,码如下:

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(q,name,age):
    q.put({'name':name,'age':age})
    print('id:',id(q))
if __name__ == '__main__':
    q = multiprocessing.Queue()
    ml = []
    print('id:',id(q))
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(q,'yang',21))
        p.start()
        ml.append(p)
    print(q.get())
    print(q.get())
    print(q.get())
    for i in ml:
        i.join()

  

在Windows平台运营结果:

4858.com 9

 

Linux的ubuntu下是那般的:

4858.com 10

 

那就不佳怎么说了,作者个人的通晓,线程和经过这类与电脑硬件(CPU,RAM)等有关系的都有不明确因素,姑且认为在Windows平台里queue是copy的,在Linux里是同2个啊,并且据经验职员表示,在macbook上也是同3个。

 

还有个难题, 要是使用的queue是线程式的吗?

代码别的都没变,只改了此间:

4858.com 11

 

结果:

4858.com 12

 

就算报错了,可是却有八个关键点,提醒的是不可能pickle线程锁对象,也等于说刚才我们运用的queue是进度对象,所以能够pickle,注意了,那里正是关键点,使用了pickle,那么也即是说,在Windows平台里是copy的,借使不是copy,就不须要存在pickle对吧?直接拿来用就是啊,干嘛要pickle之后取的时候再反pickle呢对吗?

 

再看Linux下吧,由于Linux暗许是python二,所以模块包名稍微有点差别

4858.com 13

结果阻塞住了,然而前边的要么出来了,看到的id果然如故1样的。

 

那边就有三点须求专注:(个人精通,如有误望指正)

1.进程里的确不能够使用线程式queue

2.Windows平台的进度式queue是copy的

三.Linux平台的线程式和进度式都以同一个,不过如果在经过里使用线程式queue会阻塞住

但自个儿个人认为copy更有安全性

 

2)使用pipe通信

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(conn):
    conn.send('约吗?')  #子进程发送数据
    print(conn.recv())  #接受数据,不能加参数1024之类的
    conn.close()        #子进程关闭连接
if __name__ == '__main__':
    parent_conn,son_conn = multiprocessing.Pipe() #创建pipe对象,父进程,子进程
    ml = []
    p = multiprocessing.Process(target=func,args=(son_conn,))
    p.start()
    print(parent_conn.recv())  #父进程接受数据,不能加参数1024之类的
    parent_conn.send('不约')    #发送数据
    p.join()                   #join方法是进程特有

 

  

运维结果:

4858.com 14

 

那般就联系上了,相信你发觉了,基本和前边的socket大约,可是唯一的两样是recv()方法不能够加参数,不信的话,你加来尝试

回望线程通讯,相信你会认为进度比线程更利于

 

自然pipe也足以有多少个:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

def func(conn):
    conn.send('约吗?')  #子进程发送数据
    print(conn.recv())
    conn.close()        #子进程关闭连接
if __name__ == '__main__':
    parent_conn,son_conn = multiprocessing.Pipe() #创建pipe对象,父进程,子进程
    ml = []
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(son_conn,))
        p.start()
        ml.append(p)
        print(parent_conn.recv())  #父进程接受数据,不能加参数1024之类的
        parent_conn.send('不约')
    for i in ml:
        i.join()

  

运转结果:

4858.com 15

 

7.进程之间数据共享——manager

相比较不难,就使用了经过里的manager对象下的1壹数据类型,别的的很简短的,小编就不注释了

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(l,d,num):
    l.append(num)
    d[num] = num

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        l = manager.list()
        d = manager.dict()
        ml = []
        for i in range(6):
            p = multiprocessing.Process(target=func,args=(l,d,i))
            p.start()
            ml.append(p)
        for i in ml:
            i.join()
        print('d:',d)
        print('l:',l)

  

运营结果:

4858.com 16

 

那样是否就贯彻了数码共享了?

 

好的,进程也剖析完了

 

3、进度访问`multiprocessing库中的Queue模块`

from multiprocessing import Process,Queue


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = Queue()   # 把这个q传给了子线程
    p = Process(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
[66, None, 'hello word']

父进程约等于克隆2个Q,把温馨的Q克隆了一份交给子进度,子进度那一年往Q里面放了1份数据,然后父进程又能实际的得到到。可是你克隆了一份是还是不是就和父进度未有关联了,为何还是能维系在1道吧?不过实际上:等于那多少个Q里面包车型客车数目又把它类别化了,种类化到1个中等的地点,类似于翻译,然后反系列化给那几个父过程那边来了,其实那五个Q正是透过pickle来体系化的,不是1个真正的Q。

4858.com ,总计:五个线程之间能够修改贰个数额,不加锁,恐怕就会出错。未来经过中的Queue,是完成了数量的传递,不是在修改同一份数据,只是实现3个进度的数目传给了其它二个进度。

4858.com 17

叁.伍 和线程壹起利用

Example 3.5 source
code

其一事例,我们把asyncio的loop放到别的八个独立线程中.上边也说过了,因为python的GIL的宏图,不容许同时运营四个code.所以使用八线程来处理总括瓶颈的标题,并不是叁个好主意.然后还有此外二个行使线程原因正是:
若是局地函数或许库不协助asyncio,那么就会阻塞住主线程的运转.那种情状下唯一的办法就是放在其余多少个线程中.

要留心asyncio本人不是threadsafe的,可是提供了多个函数.call_soon_threadsafe和run_coroutine_threadsafe.
当你运营这几个事例的时候,你晤面到notify的线程id就是主线程的id,那是因为notify协程运营在主线程中,sleep运维在此外一个线程,所以不会阻塞住主线程.

import asyncio
from aiohttp import web

from concurrent.futures import ThreadPoolExecutor
import threading
from time import sleep


async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

def game_loop(asyncio_loop):
    print("Game loop thread id {}".format(threading.get_ident()))
    # a coroutine to run in main thread
    async def notify():
        print("Notify thread id {}".format(threading.get_ident()))
        await tick.acquire()
        tick.notify_all()
        tick.release()

    while 1:
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        # blocking the thread
        sleep(1)
        # make sure the task has finished
        task.result()

print("Main thread id {}".format(threading.get_ident()))

asyncio_loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
asyncio_loop.run_in_executor(executor, game_loop, asyncio_loop)

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

 

 

三.陆 多进度和扩展scaling up

二个线程的server能够干活了,可是那个server唯有1个cpu可用.
为了扩充,大家必要周转八个经过,各样进度包涵自个儿的eventloop.
所以大家须要进度之间通讯的形式.同时在娱乐世界,平常会有恢宏的盘算(寻路什么的).那些任务常常不会神速成功(一个tick内).
在协程中进行大气耗费时间的乘除未有意义,因为会阻塞住消息循环本人.所以在那种景观下,把大气的一个钱打二十七个结交给此外的长河就很有要求了
最不难易行的措施就是运行四个单线程的server.然后,能够使用haproxy那样的load
balancer,来把客户端的连接分散到差别的历程上去.进城之间的通讯有成都百货上千方法.一种是基于互连网连接,也足以扩张到八个server.今后已经有许多达成了音讯和仓库储存系统的框架(基于asyncio).
比如:

aiomcache
for memcached client
aiozmq
for zeroMQ
aioredis
for Redis storage and pub/sub

还有其余的有个别乱七八糟,在git上,抢先4分之3是aio打头.
动用网络新闻,能够丰裕有效的贮存数据,或然调换新闻.可是要是要拍卖多量实时的数码,而且有大气进程通讯的情事,就分外了.在这种情况下,四个更确切的秘籍是行使正式的unix
pipe.asyncio has support for pipes and there is a very low-level
example of the server which uses
pipes
inaiohttp repository.
在那一个例子中,我们接纳python的高层次的multiprocessing库来触发一个新的历程来展开总计,通过multiprocessing.Queue来拓展进程间通讯.不幸的是,方今的multiprocessing完成并不辅助asyncio.所以阻塞的调用就会堵塞住event
loop.
那正是利用线程的最棒案例.因为大家在别的3个线程运行multiprocessing的代码.看代码

import asyncio
from aiohttp import web

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Queue, Process
import os
from time import sleep


async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

def game_loop(asyncio_loop):
    # coroutine to run in main thread
    async def notify():
        await tick.acquire()
        tick.notify_all()
        tick.release()

    queue = Queue()

    # function to run in a different process
    def worker():
        while 1:
            print("doing heavy calculation in process {}".format(os.getpid()))
            sleep(1)
            queue.put("calculation result")

    Process(target=worker).start()

    while 1:
        # blocks this thread but not main thread with event loop
        result = queue.get()
        print("getting {} in process {}".format(result, os.getpid()))
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        task.result()

asyncio_loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
asyncio_loop.run_in_executor(executor, game_loop, asyncio_loop)

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

worker()在别的叁个进程中运营.包涵了1部分耗费时间总括,把结果放在queue中.得到结果随后,通告主线程的主eventloop这些等待的client.那么些例子十一分简陋,进度未有适当结束,同时worker也许须要此外一个queue来输入数据.

Important! If you are going to run anotherasyncio
event loop in a different thread or sub-process created from main
thread/process, you need to create a loop explicitly, using
asyncio.new_event_loop()
, otherwise, it will not work.

四、通过Pipe()完结进度间的数额交互,manger达成数据共享

下面的事例是经过进度中的Queue,来进展多中国少年共产党享的,其实还有1种艺术贯彻数据共享,那就是管道,pipe,以及数额共享manger。

4.1、Pipe()函数

管道函数会回到由管道两方连日来的一组连接对象,该管道私下认可是双向的(双向的)。

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([66, None, 'hello,word'])  # 发送消息给父进程
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # 接收子进程的消息
    p.join()

4.贰、接受反复和发送多次

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([66, None, 'hello,word'])  # 发送消息给父进程
    conn.send("QQ")  # 发送消息给父进程
    print(conn.recv())   # 接收父进程的消息
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())  # 接收两次
    parent_conn.send("微信")   # 发送给子进程
    p.join()

4.3、manger

manger能够成功多少间的共享。

from multiprocessing import Process, Manager
import os


def f(d, l):
    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
    print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()   # 声明一个字典,这个字典是用manger声明的,不是用dict()声明的
        # manger.dict()是用专门的语法生产一个可在多进程之间进行传递和共享的一个字典
        l = manager.list(range(5))  # 同样声明一个列表
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
        print(d)
        print(l)

线程修改同一份数据的时候必要加锁,过程修改数据吧:不用加锁,因为那些manger已经帮您加锁了,它就默许不允许多个进程同时修改1份数据。七个经过未有艺术同时修改一份数据,进度之间是独自的,它自身也要加锁,因为它把团结的事物还要copy好几份,跟刚刚的不胜Queue壹样,copy11个字典最后合成2个字典

 

 

 

协程
协程,又叫微线程,实际上正是单线程,通过python语法,或模块来贯彻产出。
实质上正是二个历程一个线程。

进度锁和进度池的运用

4858.com 18

1、进程锁

因此multiprocessing中的Lock模块来促成进度锁

from multiprocessing import Process,Lock   # 导入进程锁


def f(l, i):
    l.acquire()    # 加锁
    try:
        print("hello word", i)
    finally:
        l.release()   # 释放锁

if __name__ == "__main__":
    lock = Lock()     # 定义锁
    for num in range(10):
        Process(target=f, args=(lock, num,)).start()  # 把锁传入进程中

经过中不是相互独立的吧?为何还要加锁:就算各种进度都以单身运转的,可是难点来了,它们共享1块显示屏。这些锁存在的意义就是显示屏共享。如若经过1想着打字与印刷数据,而经过2想也想打字与印刷数据的情状,就有十分的大希望乱套了,然后通过那个锁来支配,去打字与印刷的时候,这一个显示器唯有自己独占,导致显示器不会乱。

4858.com 19

2、进程池apply和apply_saync

2.1、appley

一道实施,也便是串行执行的

from multiprocessing import Pool  # 导入进程池模块pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印进程号


if __name__ == "__main__":
    pool = Pool(processes=5)   # 设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行
    for i in range(10):
        pool.apply(func=foo, args=(i,))   # 同步执行挂起进程
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

2.2、apply_saync

异步执行,也正是并行执行。

from multiprocessing import Pool  # 导入进程池模块pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印进程号


if __name__ == "__main__":
    pool = Pool(processes=5)   # 设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行
    for i in range(10):
        pool.apply_async(func=foo, args=(i,))   # 采用异步方式执行foo函数
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

二.三、异步下回调函数

程序执行落成之后,再回调过来执行这一个Bar函数。

from multiprocessing import Process,Pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印子进程的进程号


def bar(arg):
    print('-->exec done:', arg, os.getpid())   # 打印进程号

if __name__ == "__main__":
    pool = Pool(processes=2)
    print("主进程", os.getpid())   # 主进程的进程号
    for i in range(3):
        pool.apply_async(func=foo, args=(i,), callback=bar)   # 执行回调函数callback=Bar
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

#执行结果
主进程 752
end
in process 2348
-->exec done: None 752
in process 8364
-->exec done: None 752
in process 2348
-->exec done: None 752

注:

  1. 回调函数表达fun=Foo干不完就不实施bar函数,等Foo执行完就去履行Bar
  2. 以此回调函数是主进度去调用的,而不是每一种子进度去调用的。
  3. 回调函数的用途:

      比如说你从各种机器上备份达成,在回调函数中机动写二个本子,说备份实现

  4. 回调函数是主进度调用的原由?

      固然是子进度去调用那个回调函数,有稍许个子进度就有微微个再而三,如若是主进度的话,只须求壹遍长连接就能够了,这一个成效就高了

  

 

上海体育场面是用yield达成了1个八个函数逇并发处理。

 1 from greenlet import greenlet#导入这个模块
 2 def foo():#定义一个函数
 3     print('ok1')#打印
 4     gr2.switch()#将程序切换到下面一个函数,按照名字切
 5     print('ok3')#打印
 6     gr2.switch()#将程序切换到下面一个函数,按照名字切
 7 def bar():
 8     print('ok2')#打印
 9     gr1.switch()#切到上面foo函数
10     print('ok4')
11 gr1=greenlet(foo)#实例化这个函数
12 gr2=greenlet(bar)
13 gr1.switch()#在外面写这个就执行了这个函数

通过greenlet模块的switch来落到实处协程的切换,greenlet模块供给手动去pycharm下载

 1 import gevent#导入这个模块
 2 def foo():
 3     print('running in foo')
 4     gevent.sleep(2)#打印之后睡一秒,模拟io操作
 5     print('switch to foo again')
 6 def bar():
 7     print('switch  to bar')
 8     gevent.sleep(1)#打印之后睡一秒,模拟io操作
 9     print('switch to bar again')
10 gevent.joinall([gevent.spawn(foo),gevent.spawn(bar)])
11 '''
12 这个程序的运行过程是,先执行foo函数,
13 打印之后遇到了IO操作,然后自动切换到下一个函数执行,
14 打印之后又遇到了IO操作,然后切回foo函数发现IO2秒还没有结束,
15 然后又切到了bar函数发现IO结束,打印,再切回foo函数打印
16 '''

上边代码通过gevent模块来贯彻写成的IO时期活动切换完结产出的先后。
gevent需要从pycharm下载。

4858.com 20

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图
Copyright @ 2010-2019 美高梅手机版4858 版权所有