【4858.com】音信队列

By admin in 4858.com on 2019年3月31日

RabbitMQ队列

第三大家在讲rabbitMQ在此之前大家要说一下python里的queue:二者干的事体是相同的,都是队列,用于传递新闻

在python的queue中有三个三个是线程queue,四个是进度queue(multiprocessing中的queue)。线程queue无法跨进度,用于四个线程之间进行数量同步交互;进度queue只是用于父进度与子进度,或许同属于同意父进度下的多少个子进程举办互相。约等于说假如是七个完全独立的次第,就算是python程序,也照样不可见用这些历程queue来通讯。那假设咱们有七个单身的python程序,分属于八个经过,或然是python和其余语言

安装:windows下

首先要求设置 Erlang环境

官网: 

Windows版下载地址:

Linux版:     使用yum安装

 

下一场安装RabbitMQ了 

第③下载RabbitMQ 的Windows版本

下载地址:

安装pika:

在此以前设置过了pip,间接打开cmd,运营pip install pika

安装达成之后,达成二个最简便的行列通讯:

4858.com 1

producer:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 #声明一个管道
 5 channel = connection.channel()
 6 
 7 #声明queue
 8 channel.queue_declare(queue = 'hello')
 9 #routing_key是queue的名字
10 channel.basic_publish(exchange='',
11                       routing_key='hello',#queue的名字
12                       body='Hello World!',
13                       )
14 print("[x] Send 'Hello World!'")
15 connection.close()

 

先创立3个宗旨的socket,然后建立3个管道,在管道中发音讯,然后声惠氏(WYETH)个queue,起个类别的名字,之后真正的发消息(basic_publish)

consumer:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 3 channel = connection.channel()
 4 
 5 channel.queue_declare(queue='hello')
 6 
 7 
 8 def callback(ch, method, properties, body):#回调函数
 9     print("---->",ch,method,properties)
10     print(" [x] Received %r" % body)
11 
12 channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
13                       queue='hello',
14                       no_ack=True
15                        )
16 
17 print(' [*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()

 

 start_consuming()只要一运营就径直运行下去,他不停收一条,永远在此间卡住。

在上面不管是produce照旧consume,里面都宣称了一个queue,那几个是怎么吧?因为大家不知道是顾客先起初运行照旧生产者先运转,那样一旦没有注解的话就会报错。

下边大家来看一下一对多,即3个劳动者对应七个买主:

首先大家运维一个买主,然后不断的用produce去发送数据,大家能够看看顾客是透过一种轮询的法门举办不断的接受多少,各类顾客消费1个。

那么一旦大家顾客收到了音信,然后处理那些消息须求30分钟,在拍卖的长河中,消费者断电了宕机了,那消费者还尚未拍卖完,我们设这些任务我们务必处理完,这我们应当有一个认可的信息,说这一个职责完结了照旧是绝非完结,所以笔者的劳动者要肯定消费者是还是不是把那一个职分处理完了,消费者处理完今后要给这一个生产者服务器端发送叁个确认新闻,生产者才会把那个职责从消息队列中去除。假使没有拍卖完,消费者宕机了,没有给劳动者发送确认音信,那就象征一向不拍卖完,那我们看看rabbitMQ是怎么处理的

我们能够在顾客的callback中添加多个time.sleep()实行模拟宕机。callback是1个回调函数,只要事件一触发就会调用那几个函数。函数执行完了就象征音讯处理完了,假使函数没有拍卖完,那就印证。。。。

我们得以看出在顾客代码中的basic_consume()中有贰个参数叫no_ack=True,这几个意思是那条消息是不是被处理完都不会发送确认音信,一般大家不加那一个参数,rabbitMQ暗许就会给您设置成音信处理完了就自动发送确认,大家以后把那一个参数去掉,并且在callback中添加一句话运转:ch.basic_ack(delivery_tag=method.delivery_tag)(手动处理)

def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

 

4858.com 24858.com 34858.com 4

运行的结果正是,作者先运营三次生产者,数据被消费者1收受到了,可是本人把消费者1宕机,结束运行,那么消费者2就收到了音讯,即若是消费者绝非发送确认新闻,生产者就不会把新闻删除。

RabbitMQ消息持久化:

咱俩得以生成好多的新闻队列,那大家怎么查看新闻队列的图景呢:rabbitmqctl.bat
list_queues

4858.com 5

近来的意况是,消息队列中还有消息,可是服务器宕机了,那这些新闻就丢了,那笔者就要求以此新闻强制的持久化:

channel.queue_declare(queue='hello2',durable=True)

 

在历次注明队列的时候添加一个durable参数(客户端和劳动器端都要抬高这一个参数),

4858.com 6

在那几个情景下,我们把rabbitMQ服务器重启,发现唯有队列名留下了,但是队列中的音信没有了,那样大家还索要在劳动者basic_publish中添加三个参数:properties

producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()

#声明queue
channel.queue_declare(queue = 'hello2',durable=True)
#routing_key是queue的名字
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,#make message persistent
                      )
                      )
print("[x] Send 'Hello World!'")
connection.close()

 

那般就足以使得音信持久化

如今是二个劳动者对应多个顾客,很公正的收发收发,但是实际的情事是,我们机器的配备是不平等的,有的配置是单核的一些配置是多核的,只怕i7处理器处理4条新闻的时候和别的的微处理器处理1条音信的时日大致,那差的处理器那里就会积聚新闻,而好的电脑那里就会形成闲置,在切实可行中做运转的,大家会在负载均衡中安装权重,何人的计划高权重高,职责就多一些,不过在rabbitMQ中,大家只做了二个简单的拍卖就可以达成公正的消息分发,你有多大的力量就处理多少新闻

即:server端给客户端发送新闻的时候,先反省今后还有稍稍新闻,若是当前音讯尚未处理完结,就不会发送给这一个消费者音讯。即便当前的顾客没有音信就发送

这么些只需求在顾客端实行修改加代码:

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello2',durable=True)


def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
                      queue='hello2',
                      #no_ack=False
                       )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

 大家在扭转三个consume2,在callback中sleep20秒来效仿

4858.com 74858.com 84858.com 9

本身先运转五个produce,被consume接受,然后在运转1个,就被consumer2接受,不过因为consumer第22中学sleep20秒,处理慢,所以那时候在起步produce,就又给了consume实行处理

 

python学习之RabbitMQ—–新闻队列,

RabbitMQ队列

前言:此次整治写一篇有关rabbitMQ的博客,比较上一篇redis,感觉rabbitMQ难度是增高不少。那篇博客会插入一些英文讲解,可是不难驾驭的。rabbitMQ的下载与安装,请参考redis&rabbitMQ安装。

Publish\Subscrible(新闻发表\订阅)

前方都以1对1的出殡接收数据,那本人想1对多,想广播一样,生产者发送三个信息,全部顾客都吸收消息。那我们如何是好呢?那几个时候大家就要用到exchange了

exchange在一端收新闻,在另一端就把音信放进queue,exchange必须准确的通晓收到的音信要怎么,是或不是应当发到一个特定的queue依然发给许多queue,只怕说把他甩掉,这一个都被exchange的门类所定义

exchange在概念的时候是有项指标,以决定到底是那多少个queue符合条件,能够承受消息:

fanout:全部bind到此exchange的queue都得以接受新闻

direct:通过rounroutingKey和exchange决定的相当唯一的queue能够接到音信

topic:全数符合routingKey的routingKey所bind的queue还可以音讯

headers:通过headers来支配把新闻发给哪些queue

消息publisher:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='log',type = 'fanout')
 9 
10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
11 channel.basic_publish(exchange='logs',routing_key='',body=message)
12 print("[x] Send %r " % message)
13 connection.close()

 

此处的exchange以前是空的,未来赋值log;在那里也未曾表明queue,广播不须要写queue

 消息subscriber:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 channel.exchange_declare(exchange='logs',exchange_type='fanout')
 5 
 6 #exclusive唯一的,不指定queue名字,rabbit会随机分配一个名字
 7 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 
11 channel.queue_bind(exchange='logs',queue=queue_name)
12 
13 print('[*] Waiting for logs,To exit press CTRL+C')
14 
15 def callback(ch,method,properties,body):
16     print("[X] %r" % body)
17 channel.basic_consume(callback,queue = queue_name,no_ack=True)
18 channel.start_consuming()

 

在顾客那里大家有定义了三个queue,注意一下表明中的内容。不过大家在发送端没有注解queue,为何发送端不须要接收端需求呢?在consume里有三个channel.queue_bind()函数,里面绑定了exchange转换器上,当然里面还索要一个queue_name

运作结果:

4858.com 104858.com 114858.com 124858.com 13

就也正是收音机一样,实时播报,打开四个买主,生产者发送一条数据,然后二个顾客同时吸收到

RabbitMQ队列

第1大家在讲rabbitMQ从前我们要说一下python里的queue:二者干的事体是一样的,都以队列,用于传递音讯

在python的queue中有多个一个是线程queue,3个是经过queue(multiprocessing中的queue)。线程queue无法跨进度,用于四个线程之间开始展览多少同步交互;进度queue只是用来父进度与子进程,大概同属于同意父进程下的四个子进度实行相互。也正是说假若是多少个完全部独用立的先后,即便是python程序,也照样不可见用那一个历程queue来通讯。那要是我们有五个独立的python程序,分属于三个经过,或然是python和别的语言

安装:windows下

首先必要安装 Erlang环境 官网: 
Windows版下载地址:
Linux版:     使用yum安装   然后安装RabbitMQ了  首先下载RabbitMQ
的Windows版本 下载地址:

安装pika:

事先安装过了pip,直接打开cmd,运维pip install pika

设置收尾之后,实现3个最简易的行列通讯:

4858.com 14

producer:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 #声明一个管道
 5 channel = connection.channel()
 6 
 7 #声明queue
 8 channel.queue_declare(queue = 'hello')
 9 #routing_key是queue的名字
10 channel.basic_publish(exchange='',
11                       routing_key='hello',#queue的名字
12                       body='Hello World!',
13                       )
14 print("[x] Send 'Hello World!'")
15 connection.close()

 

先创制三个核心的socket,然后建立一个管道,在管道中发音信,然后声喜宝(Hipp)(Beingmate)个queue,起个连串的名字,之后真正的发音讯(basic_publish)

consumer:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 3 channel = connection.channel()
 4 
 5 channel.queue_declare(queue='hello')
 6 
 7 
 8 def callback(ch, method, properties, body):#回调函数
 9     print("---->",ch,method,properties)
10     print(" [x] Received %r" % body)
11 
12 channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
13                       queue='hello',
14                       no_ack=True
15                        )
16 
17 print(' [*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()

 

 start_consuming()只要一运转就间接运转下去,他不止收一条,永远在那边卡住。

在地点不管是produce还是consume,里面都声称了二个queue,这么些是干什么吗?因为我们不知情是主顾头阵轫运维依旧生产者先运营,那样只要没有评释的话就会报错。

下边我们来看一下一对多,即二个劳动者对应三个顾客:

第壹大家运转3个顾客,然后不断的用produce去发送数据,大家得以看出顾客是通过一种轮询的措施开始展览持续的承受多少,每个消费者消费二个。

那就是说一旦大家顾客接受了新闻,然后处理那一个音讯需求30分钟,在拍卖的进度中,消费者断电了宕机了,那消费者还未曾处理完,我们设那几个职务大家不可能不处理完,那大家应有有三个承认的消息,说这些任务到位了只怕是从未有过形成,所以笔者的生产者要认同消费者是还是不是把那么些职务处理完了,消费者处理完事后要给这几个生产者服务器端发送3个认可新闻,生产者才会把那一个义务从讯息队列中删除。假使没有拍卖完,消费者宕机了,没有给劳动者发送确认消息,那就意味着尚未处理完,这大家看看rabbitMQ是怎么处理的

咱俩得以在消费者的callback中添加一个time.sleep()进行效仿宕机。callback是三个回调函数,只要事件一触发就会调用那个函数。函数执行完了就代表音讯处理完了,即使函数没有处理完,那就认证。。。。

咱俩得以观望在消费者代码中的basic_consume()中有3个参数叫no_ack=True,这些意思是那条信息是还是不是被处理完都不会发送确认消息,一般大家不加这些参数,rabbitMQ暗许就会给您设置成音讯处理完了就机关发送确认,我们未来把这些参数去掉,并且在callback中添加一句话运转:ch.basic_ack(delivery_tag=method.delivery_tag)(手动处理)

def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

 

4858.com 154858.com 164858.com 17

运作的结果正是,作者先运维1遍生产者,数据被消费者1接收到了,不过本身把顾客1宕机,截至运作,那么消费者2就收取了音信,即要是消费者没有发送确认信息,生产者就不会把消息删除。

RabbitMQ信息持久化:

我们得以变动好多的音讯队列,那大家怎么查看音讯队列的动静吗:rabbitmqctl.bat
list_queues

4858.com 18

现今的情形是,音讯队列中还有新闻,可是服务器宕机了,那那个音信就丢了,那本身就要求那么些信息强制的持久化:

channel.queue_declare(queue='hello2',durable=True)

 

在每一遍注解队列的时候添加叁个durable参数(客户端和服务器端都要丰裕这几个参数),

4858.com 19

在那一个情状下,大家把rabbitMQ服务珍视启,发现唯有队列名留下了,不过队列中的消息并未了,那样我们还必要在劳动者basic_publish中添加二个参数:properties

producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()

#声明queue
channel.queue_declare(queue = 'hello2',durable=True)
#routing_key是queue的名字
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,#make message persistent
                      )
                      )
print("[x] Send 'Hello World!'")
connection.close()

 

如此就可以使得音信持久化

如今是一个劳动者对应八个买主,很公道的收发收发,不过其实的情况是,大家机器的铺排是分裂等的,有的配置是单核的一部分配置是多核的,大概i7处理器处理4条音信的时候和别的的电脑处理1条音信的光阴基本上,这差的微型总括机那里就会堆积新闻,而好的微处理器那里就会形成闲置,在实际中做运营的,大家会在负载均衡中装置权重,什么人的布局高权重高,义务就多一点,不过在rabbitMQ中,大家只做了一个大致的处理就足以兑现公正的新闻分发,你有多大的能力就处理多少音讯

即:server端给客户端发送新闻的时候,先检查未来还有多少音信,如果当前音信尚未处理完毕,就不会发送给那个消费者信息。要是当前的主顾绝非音讯就发送

这一个只要求在顾客端进行改动加代码:

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello2',durable=True)


def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
                      queue='hello2',
                      #no_ack=False
                       )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

 大家在扭转2个consume2,在callback中sleep20秒来效仿

4858.com 204858.com 214858.com 22

本身先运维三个produce,被consume接受,然后在开发银行3个,就被consumer2接受,可是因为consumer2中sleep20秒,处理慢,所以那时候在起步produce,就又给了consume实行处理

 

rabbitMQ是消息队列;想想此前的我们学过队列queue:threading
queue(线程queue,七个线程之间开始展览多少交互)、进度queue(父进程与子进度展开互动可能同属于同一父进程下的七个子进度展开互相);要是两个独立的先后,那么之间是不能够透过queue进行互动的,那时候大家就需求1当中等代理即rabbitMQ

rabbitMQ是音讯队列;想想从前的大家学过队列queue:threading
queue(线程queue,三个线程之间展开数据交互)、进度Queue(父进程与子进度展开交互只怕同属于同一父进度下的几个子进度展开互动);假使五个独立的次序,那么之间是不能够经过queue进行互动的,那时候大家就须要2其中路代理即rabbitMQ.

有取舍的收取音讯(exchange_type = direct)

RabbitMQ还帮忙依据重点字发送,即:队列绑定关键字,发送者将数据依据重庆大学字发送到音信exchange,exchange依照重庆大学字判定应该将数据发送到内定的连串

4858.com 23

publisher:

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
 7 
 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
11 
12 print("[X] Send %r:%r" %(severity,message))
13 connection.close()

 

subscriber:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]#
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print('[*]Waiting for logs.To exit press CTRL+c')

def callback(ch,method,properties,body):
    print("[x] %r:%r"%(method.routing_key,body))

channel.basic_consume(callback,queue = queue_name,no_ack=True)
channel.start_consuming()

 

进一步细致的过滤(exchange_type=topic)

4858.com 24

 

publish:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

subscriber:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='topic_logs',
 8                          exchange_type='topic')
 9 
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12 
13 binding_keys = sys.argv[1:]
14 if not binding_keys:
15     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
16     sys.exit(1)
17 
18 for binding_key in binding_keys:
19     channel.queue_bind(exchange='topic_logs',
20                        queue=queue_name,
21                        routing_key=binding_key)
22 
23 print(' [*] Waiting for logs. To exit press CTRL+C')
24 
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 
30 channel.basic_consume(callback,
31                       queue=queue_name,
32                       no_ack=True)
33 
34 channel.start_consuming()

 

 

如上都是服务器端发音信,客户端收新闻,新闻流是单向的,那假若我们想要发一条命令给长途的客户端去履行,然后想让客户端执行的结果再次来到,则那种格局叫做rpc

RabbitMQ RPC

4858.com 25

rpc server:

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.queue_declare(queue='rpc_queue')
 7 def fib(n):
 8     if n==0:
 9         return 0
10     elif n==1:
11         return 1
12     else:
13         return fib(n-1)+fib(n-2)
14 
15 def on_request(ch,method,props,body):
16     n = int(body)
17     print("[.] fib(%s)" %n)
18     response = fib(n)
19 
20     ch.basic_publish(exchange='',routing_key=props.reply_to,
21                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
22                      body = str(response))
23     ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue')
26 
27 print("[x] Awaiting rpc requests")
28 channel.start_consuming()

 

 

rpc client:

 1 import pika
 2 import uuid,time
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 
 7         self.channel = self.connection.channel()
 8 
 9         result = self.channel.queue_declare(exclusive=True)
10         self.callback_queue =  result.method.queue
11 
12         self.channel.basic_consume(self.on_response,#回调函数,只要一收到消息就调用
13                                    no_ack=True,queue=self.callback_queue)
14 
15     def on_response(self,ch,method,props,body):
16         if self.corr_id == props.correlation_id:
17             self.response = body
18 
19     def call(self,n):
20         self.response = None
21         self.corr_id = str(uuid.uuid4())
22         self.channel.basic_publish(exchange='',routing_key='rpc_queue',
23                                    properties=pika.BasicProperties(
24                                        reply_to=self.callback_queue,
25                                        correlation_id=self.corr_id
26                                    ),
27                                    body=str(n),#传的消息,必须是字符串
28                                    )
29         while self.response is None:
30             self.connection.process_data_events()#非阻塞版的start_consuming
31             print("no message....")
32             time.sleep(0.5)
33         return int(self.response)
34 fibonacci_rpc = FibonacciRpcClient()
35 print("[x] Requesting fib(30)")
36 response = fibonacci_rpc.call(30)
37 print("[.] Got %r"%response)

【4858.com】音信队列。 

之前的start_consuming是进入贰个不通形式,没有消息就等候新闻,有音信就收过来

self.connection.process_data_events()是三个非阻塞版的start_consuming,正是说发了1个东西给客户端,每过一点时光去反省有没有消息,若是没有消息,能够去干其他事情

reply_to = self.callback_queue是用来收取反应队列的名字

corr_id =
str(uuid.uuid4()),correlation_id第③在客户端会通过uuid4生成,第贰在劳动器端再次来到执行结果的时候也会传过来三个,所以说若是服务器端发过来的correlation_id与团结的id相同
,那么服务器端发出来的结果就自然是本身正要客户端发过去的指令的履行结果。今后就一个劳动器端2个客户端,无所谓缺人不确认。今后客户端是非阻塞版的,大家能够不让它打字与印刷没有新闻,而是进行新的一声令下,那样就两条音信,不自然按顺序达成,这我们就要求去肯定各类重返的结果是哪个命令的履行结果。

完全的方式是这么的:生产者发了一个指令给消费者,不精通客户端哪天回来,仍然要去收结果的,但是它又不想进入阻塞方式,想每过一段时间看那一个消息收回来没有,如若音信收回来了,就代表收完了。 

运维结果:

4858.com 264858.com 27

服务器端开启,然后在起步客户端,客户端先是等待音信的发送,然后做出反应,直到算出斐波那契

 

 

 

 

 

 

 

 

 

 

Publish\Subscrible(音讯发布\订阅)

前方都以1对1的出殡和埋葬接收数据,那本身想1对多,想广播一样,生产者发送三个音讯,全数顾客都接受音讯。那我们怎么办呢?那一个时候大家就要用到exchange了

exchange在一端收音信,在另一端就把音信放进queue,exchange必须准确的接头收到的新闻要怎么,是不是应当发到贰个一定的queue照旧发给许多queue,或许说把她放任,那个都被exchange的品类所定义

exchange在概念的时候是有项目标,以决定到底是那四个queue符合条件,尚可音信:

fanout:全体bind到此exchange的queue都足以接受消息

direct:通过rounroutingKey和exchange决定的不得了唯一的queue能够收到消息

topic:全数符合routingKey的routingKey所bind的queue可以承受消息

headers:通过headers来控制把音信发给哪些queue

消息publisher:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='log',type = 'fanout')
 9 
10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
11 channel.basic_publish(exchange='logs',routing_key='',body=message)
12 print("[x] Send %r " % message)
13 connection.close()

 

那边的exchange从前是空的,今后赋值log;在此地也未曾证明queue,广播不须求写queue

 消息subscriber:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 channel.exchange_declare(exchange='logs',exchange_type='fanout')
 5 
 6 #exclusive唯一的,不指定queue名字,rabbit会随机分配一个名字
 7 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 
11 channel.queue_bind(exchange='logs',queue=queue_name)
12 
13 print('[*] Waiting for logs,To exit press CTRL+C')
14 
15 def callback(ch,method,properties,body):
16     print("[X] %r" % body)
17 channel.basic_consume(callback,queue = queue_name,no_ack=True)
18 channel.start_consuming()

 

在顾客那里大家有定义了一个queue,注意一下申明中的内容。不过大家在发送端没有注解queue,为啥发送端不要求接收端供给呢?在consume里有2个channel.queue_bind()函数,里面绑定了exchange转换器上,当然里面还索要一个queue_name

运维结果:

4858.com 284858.com 294858.com 304858.com 31

就一定于收音机一样,实时播报,打开四个顾客,生产者发送一条数据,然后二个买主同时收到到

新闻队列:

 

有取舍的收纳音信(exchange_type = direct)

RabbitMQ还协助依照重庆大学字发送,即:队列绑定关键字,发送者将数据根据重点字发送到音讯exchange,exchange依据首要字判定应该将数据发送到内定的队列

4858.com 32

publisher:

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
 7 
 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
11 
12 print("[X] Send %r:%r" %(severity,message))
13 connection.close()

 

subscriber:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]#
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print('[*]Waiting for logs.To exit press CTRL+c')

def callback(ch,method,properties,body):
    print("[x] %r:%r"%(method.routing_key,body))

channel.basic_consume(callback,queue = queue_name,no_ack=True)
channel.start_consuming()

 

更进一步细致的过滤(exchange_type=topic)

4858.com 33

 

publish:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

subscriber:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='topic_logs',
 8                          exchange_type='topic')
 9 
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12 
13 binding_keys = sys.argv[1:]
14 if not binding_keys:
15     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
16     sys.exit(1)
17 
18 for binding_key in binding_keys:
19     channel.queue_bind(exchange='topic_logs',
20                        queue=queue_name,
21                        routing_key=binding_key)
22 
23 print(' [*] Waiting for logs. To exit press CTRL+C')
24 
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 
30 channel.basic_consume(callback,
31                       queue=queue_name,
32                       no_ack=True)
33 
34 channel.start_consuming()

 

 

如上都是服务器端发音讯,客户端收新闻,新闻流是单向的,那假设大家想要发一条命令给长途的客户端去执行,然后想让客户端执行的结果再次来到,则那种格局叫做rpc

RabbitMQ RPC

4858.com 34

rpc server:

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.queue_declare(queue='rpc_queue')
 7 def fib(n):
 8     if n==0:
 9         return 0
10     elif n==1:
11         return 1
12     else:
13         return fib(n-1)+fib(n-2)
14 
15 def on_request(ch,method,props,body):
16     n = int(body)
17     print("[.] fib(%s)" %n)
18     response = fib(n)
19 
20     ch.basic_publish(exchange='',routing_key=props.reply_to,
21                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
22                      body = str(response))
23     ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue')
26 
27 print("[x] Awaiting rpc requests")
28 channel.start_consuming()

 

 

rpc client:

 1 import pika
 2 import uuid,time
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 
 7         self.channel = self.connection.channel()
 8 
 9         result = self.channel.queue_declare(exclusive=True)
10         self.callback_queue =  result.method.queue
11 
12         self.channel.basic_consume(self.on_response,#回调函数,只要一收到消息就调用
13                                    no_ack=True,queue=self.callback_queue)
14 
15     def on_response(self,ch,method,props,body):
16         if self.corr_id == props.correlation_id:
17             self.response = body
18 
19     def call(self,n):
20         self.response = None
21         self.corr_id = str(uuid.uuid4())
22         self.channel.basic_publish(exchange='',routing_key='rpc_queue',
23                                    properties=pika.BasicProperties(
24                                        reply_to=self.callback_queue,
25                                        correlation_id=self.corr_id
26                                    ),
27                                    body=str(n),#传的消息,必须是字符串
28                                    )
29         while self.response is None:
30             self.connection.process_data_events()#非阻塞版的start_consuming
31             print("no message....")
32             time.sleep(0.5)
33         return int(self.response)
34 fibonacci_rpc = FibonacciRpcClient()
35 print("[x] Requesting fib(30)")
36 response = fibonacci_rpc.call(30)
37 print("[.] Got %r"%response)

 

之前的start_consuming是跻身多个围堵格局,没有消息就等候新闻,有消息就收过来

self.connection.process_data_events()是2个非阻塞版的start_consuming,便是说发了贰个东西给客户端,每过一点时光去检查有没有新闻,假诺没有新闻,能够去干其他作业

reply_to = self.callback_queue是用来接收反应队列的名字

corr_id =
str(uuid.uuid4()),correlation_id第贰在客户端会通过uuid4生成,第①在服务器端再次回到执行结果的时候也会传过来三个,所以说假设服务器端发过来的correlation_id与友好的id相同
,那么服务器端发出来的结果就一定是自个儿正好客户端发过去的通令的施行结果。今后就1个劳动器端3个客户端,无所谓缺人不认可。今后客户端是非阻塞版的,大家得以不让它打字与印刷没有音讯,而是进行新的下令,那样就两条新闻,不自然按顺序完毕,那大家就需求去肯定各样再次回到的结果是哪个命令的施行结果。

全部的格局是那般的:生产者发了1个限令给顾客,不亮堂客户端何时回来,依旧要去收结果的,不过它又不想进入阻塞情势,想每过一段时间看那几个音信收回来没有,固然新闻收回来了,就表示收完了。 

运作结果:

4858.com 354858.com 36

劳务器端开启,然后在运营客户端,客户端先是等待新闻的出殡,然后做出反应,直到算出斐波这契

 

 

 

 

 

 

 

 

 

 

RabbitMQ队列
首先大家在讲rabbitMQ以前我们要说一下python里的queue:二者干的业务是同等的,都是队列,用于…

  • RabbitMQ
  • ZeroMQ
  • ActiveMQ
  • ………..

一 、简单的rabbitMQ队列通讯

4858.com 37

由上航海用教室能够,数据是头阵给exchange调换器,exchage再发给相应队列。pika模块是python对rabbitMQ的API接口。接收端有三个回调函数,一接收到多少就调用该函数。一条音讯被2个买主接受后,该消息就从队列删除。OK,精晓上边的知识后,先来探望三个总结的rabbitMQ列队通讯。

4858.com ,send端:

 1 import pika
 2 #连上rabbitMQ
 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 channel=connection.channel()       #生成管道,在管道里跑不同的队列
 5 
 6 #声明queue
 7 channel.queue_declare(queue='hello1')
 8 
 9 #n RabbitMQ a message can never be sent directly to the queue,it always needs to go through an exchange.
10 #向队列里发数据
11 channel.basic_publish(exchange='',      #先把数据发给exchange交换器,exchage再发给相应队列
12                       routing_key='hello1', #向"hello'队列发数据
13                       body='HelloWorld!!')  #发的消息
14 print("[x]Sent'HelloWorld!'")
15 connection.close()

receive端:

 1 import pika
 2 
 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 channel=connection.channel()
 5 
 6 # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
 7 # We could avoid that if we were sure that the queue already exists. For example if send.py program
 8 # was run before. But we're not yet sure which program to run first. In such cases it's a good
 9 # practice to repeat declaring the queue in both programs.
10 channel.queue_declare(queue='hello1')#声明队列,保证程序不出错
11 
12 
13 def callback(ch,method,properties,body):
14     print("-->ch",ch)
15     print("-->method",method)
16     print("-->properties",properties)
17     print("[x] Received %r" % body)         #一条消息被一个消费者接收后,该消息就从队列删除
18 
19 
20 channel.basic_consume(callback,              #回调函数,一接收到消息就调用回调函数
21                       queue='hello1',
22                       no_ack=False)    #消费完毕后向服务端发送一个确认,默认为False
23 
24 print('[*] Waiting for messages.To exit press CTRL+C')
25 channel.start_consuming()

运行结果:(上边的代码对应本身写的注释相信是看得懂的~)

4858.com 384858.com 39

rabbitMQ_1_send.py
 [x] Sent 'Hello World!'


rabbitMQ_2_receive.py
 [*] Waiting for messages. To exit press CTRL+C
-->ch <pika.adapters.blocking_connection.BlockingChannel object at 0x000000000250AEB8>
-->method <Basic.Deliver(['consumer_tag=ctag1.f9533f4c8c59473c8096817670ad69d6', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello1'])>
-->properties <BasicProperties>
 [x] Received b'Hello World!!'

View Code

透过深切的测试,有以下五个意识:

  1. 先运行rabbitMQ_1_send.py发送数据,rabbitMQ_2_receive.py未运营。发现当receive运维时还是可以接收数据。
  2. 运营多个(eg:一个)接收数据的客户端,再运行发送端,客户端1吸收多少,再运行发送端,客户端2收到多少,再运转载送端,客户端3收下数量。

RabbitMQ会默许把p发的音信依次分发给各种消费者(c),跟负载均衡大约。

 

原理:

二、全英文ack

在看上边的例证,你会意识有一句代码no_ack=False(消费完毕后向服务端发送3个确认,暗中同意为False),以自家罗马尼亚(罗曼ia)语四级飘过的程度,看完下边关于ack的讲解感觉写得很牛啊!!于是分享一下:

Doing a task can take a few seconds. You
may wonder what happens if one of the consumers starts a long task and
dies with it only partly done. With our current code once RabbitMQ
delivers message to the customer it immediately removes it from memory.
In this case, if you kill a worker we will lose the message it was just
processing. We’ll also lose all the messages that were dispatched to
this particular worker but were not yet handled.

But we don’t want to lose any tasks. If a
worker dies, we’d like the task to be delivered to another
worker.

In order to make sure a message is never
lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is
sent back from the consumer to tell RabbitMQ that a particular message
had been received, processed and that RabbitMQ is free to delete
it.

If a consumer dies (its channel is
closed, connection is closed, or TCP connection is lost) without sending
an ack, RabbitMQ will understand that a message wasn’t processed fully
and will re-queue it. If there are other consumers online at the same
time, it will then quickly redeliver it to another consumer. That way
you can be sure that no message is lost, even if the workers
occasionally die.

There aren’t any message timeouts;
RabbitMQ will redeliver the message when the consumer dies. It’s fine
even if processing a message takes a very, very long time.

Message
acknowledgments are turned on by default. In previous examples we
explicitly turned them off via the no_ack=True flag. It’s time to
remove this flag and send a proper acknowledgment from the worker, once
we’re done with a task.

Using this code we can be sure that even
if you kill a worker using CTRL+C while it was processing a message,
nothing will be lost. Soon after the worker dies all unacknowledged
messages will be redelivered.

自个儿把发送端和接收端分别比作生产者与买主。生产者发送任务A,消费者接受任务A并处理,处理完后生产者将新闻队列中的职分A删除。今后大家相见了3个难题:假如消费者收到职务A,但在拍卖的历程中突出其来宕机了。而那时候生产者将消息队列中的任务A删除。实际上任务A并未成功拍卖完,也正是丢失了职务/音讯。为消除这一个标题,应使顾客接受任务并打响拍卖完后发送三个ack到生产者!生产者收到ack后就领会职务A已被成功拍卖,那时才从消息队列中校职责A删除,如若没有接受ack,就必要把职务A发送给下3个顾客,直到职分A被成功拍卖。

 

4858.com 40

叁 、新闻持久化

前方已经通晓,生产者生产数量,消费者再起步是足以接收数据的。

然则,生产者生产数量,然后重启rabbitMQ,消费者是无力回天接收数据。

eg:音信在传输进度中rabbitMQ服务器宕机了,会发现在此以前的音讯队列就不存在了,那时大家即将用到音信持久化,新闻持久化会让队列不趁着服务器宕机而泯没,会永远的保留下来。下边看下关于音讯持久化的英文讲解:

We have learned how to make sure that
even if the consumer dies, the task isn’t lost(by default, if wanna
disable  use no_ack=True). But our tasks will still be lost if RabbitMQ
server stops.

When RabbitMQ quits or crashes it will forget the
queues and messages unless you tell it not to. Two things are
required to make sure that messages aren’t lost: we need to mark both
the queue and messages as durable.

First, we
need to make sure that RabbitMQ will never lose our queue. In order to
do so, we need to declare it as durable:

      1 channel.queue_declare(queue=’hello’,
durable=True)

Although this command is correct by
itself, it won’t work in our setup. That’s because we’ve already defined
a queue called hello which is not durable. RabbitMQ doesn’t allow you to redefine an
existing queue with different parameters and will return an
error(会曝错) to any program that tries to do that. But there is
a quick workaround – let’s declare a queue with different name, for
exampletask_queue:

      1
channel.queue_declare(queue=’task_queue’, durable=True)

This queue_declare change needs to be
applied to both the producer and consumer code.

At that point we’re sure that
the task_queue queue won’t be lost even if RabbitMQ restarts. Now we
need to mark our messages as persistent –
by supplying a delivery_mode property with a value 2.

      1
channel.basic_publish(exchange=”,
      2
                      routing_key=”task_queue”,
      3
                      body=message,
      4
                      properties=pika.BasicProperties(
      5
                         delivery_mode = 2,      # make message
persistent
      6
                      ))

地点的英文对音讯持久化讲得很好。消息持久化分为两步:

  • 持久化队列。通过代码完成持久化hello队列:channel.queue_declare(queue=’hello’,
    durable=True)
  • 持久化队列中的音信。通过代码落成:properties=pika.BasicProperties( delivery_mode = 2, )

这里有个点要注意下:

即使您在代码中已兑现持久化hello队列与队列中的新闻。那么你重启rabbitMQ后再也运维代码只怕会爆错!

因为: RabbitMQ doesn’t allow you to
redefine an existing queue with different parameters and will return an
error.

为了消除那一个题材,能够声澳优(Ausnutria Hyproca)(Nutrilon)个与重启rabbitMQ在此之前不一样的队列名(queue_name).

 

壹 、安装和着力使用

肆 、消息公平分发

假定Rabbit只管按梯次把新闻发到各个消费者身上,不考虑消费者负载的话,很也许出现,几个机器配置不高的主顾那里堆积了成都百货上千音讯处理不完,同时布置高的消费者却平素很自在。为化解此题材,能够在每种消费者端,配置perfetch=1,意思正是报告RabbitMQ在作者这么些消费者当前音讯还没处理完的时候就毫无再给作者发新新闻了。

4858.com 41

 

带新闻持久化+公平分发的欧洲经济共同体代码

生产者端:

4858.com 424858.com 43

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.queue_declare(queue='task_queue', durable=True)  #队列持久化
 9  
10 message = ' '.join(sys.argv[1:]) or"Hello World!"
11 channel.basic_publish(exchange='',
12                       routing_key='task_queue',
13                       body=message,
14                       properties=pika.BasicProperties(
15                          delivery_mode = 2, # make message persistent消息持久化
16                       ))
17 print(" [x] Sent %r" % message)
18 connection.close()

View Code

顾客端:

4858.com 444858.com 45

 1 #!/usr/bin/env python
 2 import pika
 3 import time
 4  
 5 connection =pika.BlockingConnection(pika.ConnectionParameters(
 6         host='localhost'))
 7 channel = connection.channel()
 8  
 9 channel.queue_declare(queue='task_queue', durable=True)
10 print(' [*] Waiting for messages. To exit press CTRL+C')
11  
12 def callback(ch, method, properties, body):
13     print(" [x] Received %r" % body)
14     time.sleep(body.count(b'.'))
15     print(" [x] Done")
16     ch.basic_ack(delivery_tag =method.delivery_tag)   
17  
18 channel.basic_qos(prefetch_count=1)
19 channel.basic_consume(callback,
20                       queue='task_queue')
21  
22 channel.start_consuming()

View Code

本人在运作方面程序时对顾客端里回调函数的一句代码(ch.basic_ack(delivery_tag
=method.delivery_tag))12分迷惑。那句代码去掉消费者端也能照样收到新闻啊。那句代码有毛线用处??

劳动者端新闻持久后,需求在顾客端加上(ch.basic_ack(delivery_tag
=method.delivery_tag)): 保险音讯被消费后,消费端发送一个ack,然后服务端从队列删除该音讯.

 

安装RabbitMQ服务
 

⑤ 、音信发布与订阅

此前的例子都基本皆以1对1的新闻发送和接受,即音讯只好发送到内定的queue里,但有点时候你想让你的消息被抱有的queue收到,类似广播的法力,那时候就要用到exchange了。PS:有趣味的询问redis的公布与订阅,能够看看自家写的博客python之redis。

An exchange is a very simple thing. On
one side it receives messages from producers and the other side it
pushes them to queues. The exchange must know exactly what to do with a
message it receives. Should it be appended to a particular queue? Should
it be appended to many queues? Or should it get discarded(丢弃). The
rules for that are defined by the exchange type.

Exchange在概念的时候是有项指标,以决定到底是怎么Queue符合条件,可以接受消息

 

fanout: 全数bind到此exchange的queue都得以吸收音信

direct: 通过routingKey和exchange决定的不得了唯一的queue能够接过音信

topic:全部符合routingKey(此时得以是三个表明式)的routingKey所bind的queue能够吸纳音讯

 

表明式符号表达: #代表二个或多少个字符,*意味着任何字符
     
    例:#.a会匹配a.a,aa.a,aaa.a等
               
*.a会匹配a.a,b.a,c.a等
          
 注:使用RoutingKey为#,Exchange
Type为topic的时候一定于选择fanout

 

上面小编分别讲下fanout,direct,topic:

1、fanout

fanout: 全部bind到此exchange的queue都得以接到音信

4858.com 46

send端:

4858.com 474858.com 48

 1 import pika
 2 import sys
 3 
 4 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel=connection.channel()
 6 
 7 channel.exchange_declare(exchange='logs',
 8                       type='fanout')
 9 
10 message=''.join(sys.argv[1:])or"info:HelloWorld!"
11 channel.basic_publish(exchange='logs',
12                       routing_key='',  #fanout的话为空(默认)
13                       body=message)
14 print("[x]Sent%r"%message)
15 connection.close()

View Code

receive端:

4858.com 494858.com 50

 1 import pika
 2 
 3 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel=connection.channel()
 5 
 6 channel.exchange_declare(exchange='logs',type='fanout')
 7 
 8 #不指定queue名字(为了收广播),rabbit会随机分配一个queue名字,
 9 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
10 result=channel.queue_declare(exclusive=True)
11 queue_name=result.method.queue
12 
13 #把声明的queue绑定到交换器exchange上
14 channel.queue_bind(exchange='logs',
15                 queue=queue_name)
16 
17 print('[*]Waitingforlogs.ToexitpressCTRL+C')
18 
19 def callback(ch,method,properties,body):
20     print("[x]%r"%body)
21 
22 
23 channel.basic_consume(callback,
24                       queue=queue_name,
25                       no_ack=True)
26 
27 channel.start_consuming()

View Code

有三个点要留意下:

  • fanout-广播,send端的routing_key=”, #fanout的话为空(暗中同意)

  • receive端有一句代码:result=channel.queue_declare(exclusive=True),功用:不点名queue名字(为了收广播),rabbitMQ会随机分配三个queue名字,exclusive=True会在应用此queue的买主断开后,自动将queue删除。

 

② 、有取舍的选取消息(exchange
type=direct)

RabbitMQ还支持依照重庆大学字发送,即:队列绑定关键字,发送者将数据依照重点字发送到音信exchange,exchange根据 关键字
判定应该将数据发送至钦赐队列。

4858.com 51

send端:

4858.com 524858.com 53

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localh'))ost
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 severity = sys.argv[1] iflen(sys.argv) > 1 else 'info'
12 message = ' '.join(sys.argv[2:]) or'Hello World!'
13 channel.basic_publish(exchange='direct_logs',
14                       routing_key=severity, #关键字不为空,告知消息发送到哪里(info,error~)
15                       body=message)
16 print(" [x] Sent %r:%r" % (severity, message))
17 connection.close()

View Code

receive端:

4858.com 544858.com 55

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 result =channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 severities = sys.argv[1:]
15 if not severities:
16     sys.stderr.write("Usage: %s [info] [warning] [error]\n" %sys.argv[0])
17     sys.exit(1)
18  
19 for severity in severities:
20     channel.queue_bind(exchange='direct_logs',
21                        queue=queue_name,
22                        routing_key=severity)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" %(method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

View Code

其实最早先笔者看代码是一脸懵逼的~
上面是本身在cmd进行测试的截图(合营着截图看会简单了解些),二个send端,四个receive端(先起receive端,再起receive端):

send端:

4858.com 56

receive端-1:

4858.com 57

receive端-2:

4858.com 58

 

③ 、更密切的消息过滤topic(供参考)

Although using the direct exchange
improved our system, it still has limitations – it can’t do routing
based on multiple criteria.

In our logging system we might want to
subscribe to not only logs based on severity, but also based on the
source which emitted the log. You might know this concept from
the syslog unix tool, which routes logs based on both severity
(info/warn/crit…) and facility (auth/cron/kern…).

That would give us a lot of flexibility –
we may want to listen to just critical errors coming from ‘cron’ but
also all logs from ‘kern’.

觉得自个儿英文水准不高啊~,小编相比着垃圾有道翻译,加上自身的驾驭,大致知道地方在讲什么样。

比方:
借使是系统的荒谬,就把音信发送到A,倘若是MySQL的一无所长,就把音讯发送到B。但是对B来说,想实现接收MySQL的错误信息,能够用有取舍的收到音讯(exchange type=direct),让机要字为error就兑现了啊!未来B有个供给:不是兼备的错误新闻都收到,只接收钦赐的不当。在某种新闻再进行过滤,那正是更细致的音信过滤topic。

 

send端:

4858.com 594858.com 60

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')  #类型为topic
10  
11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
13 channel.basic_publish(exchange='topic_logs',
14                       routing_key=routing_key,
15                       body=message)
16 print(" [x] Sent %r:%r" % (routing_key, message))
17 connection.close()

View Code

receive端:

4858.com 614858.com 62

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')
10  
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 binding_keys = sys.argv[1:]
15 if not binding_keys:
16     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
17     sys.exit(1)
18  
19 for binding_key in binding_keys:
20     channel.queue_bind(exchange='topic_logs',
21                        queue=queue_name,
22                        routing_key=binding_key)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

View Code

 

 

python安装RabbitMQ模块

六、RPC(Remote Procedure Call)

大切诺基PC的概念可看笔者百度的(其实就就好像笔者在此之前做的FTP,小编从客户端发3个下令,服务端再次来到相关音信):

4858.com 634858.com 64

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

View Code

下边重点讲下奥迪Q7PC通讯,笔者刚初始学挺难的,学完事后觉得帕杰罗PC通讯的沉思很有启发性,代码的例子写得也很牛!!

4858.com 65

client端发的新闻被server端接收后,server端会调用callback函数,执行任务后,还亟需把相应的音信发送到client,然而server如何将音信发还给client?借使有多少个client连接server,server又怎么了然是要发放哪个client??

LacrossePC-server暗许监听rpc_queue.肯定不能够把要发放client端的新闻发到rpc_queue吧(rpc_queue是监听client端发到server端的数码)。

客观的方案是server端另起七个queue,通过queue将音信再次来到给对应client。但难点又来了,queue是server端起的,故client端肯定不知晓queue_name,连queue_name都不知道,client端接收毛线的数量??

缓解形式:

客户端在出殡和埋葬指令的同时告诉服务端:任务执行完后,数据通过某队列重返结果。客户端监听该队列就OK了。

client端:

 1 import pika
 2 import uuid
 3 
 4 
 5 class FibonacciRpcClient(object):
 6     def __init__(self):
 7         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 8 
 9         self.channel = self.connection.channel()
10         #随机建立一个queue,为了监听返回的结果
11         result = self.channel.queue_declare(exclusive=True)
12         self.callback_queue = result.method.queue   ##队列名
13 
14         self.channel.basic_consume(self.on_response,  #一接收客户端发来的指令就调用回调函数on_response
15                                    no_ack=True,
16                                    queue=self.callback_queue)
17 
18     def on_response(self, ch, method, props, body):  #回调
19         #每条指令执行的速度可能不一样,指令1比指令2先发送,但可能指令2的执行结果比指令1先返回到客户端,
20         #此时如果没有下面的判断,客户端就会把指令2的结果误认为指令1执行的结果
21         if self.corr_id == props.correlation_id:
22             self.response = body
23 
24     def call(self, n):
25         self.response = None    ##指令执行后返回的消息
26         self.corr_id = str(uuid.uuid4())   ##可用来标识指令(顺序)
27         self.channel.basic_publish(exchange='',
28                                    routing_key='rpc_queue', #client发送指令,发到rpc_queue
29                                    properties=pika.BasicProperties(
30                                        reply_to=self.callback_queue, #将指令执行结果返回到reply_to队列
31                                        correlation_id=self.corr_id,
32                                    ),
33                                    body=str(n))
34         while self.response is None:
35             self.connection.process_data_events() #去queue接收数据(不阻塞)
36         return int(self.response)
37 
38 
39 fibonacci_rpc = FibonacciRpcClient()
40 
41 print(" [x] Requesting fib(30)")
42 response = fibonacci_rpc.call(30)
43 print(" [.] Got %r" % response)

server端:

 1 import pika
 2 import time
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     host='localhost'))
 6 
 7 channel = connection.channel()
 8 
 9 channel.queue_declare(queue='rpc_queue')
10 
11 
12 def fib(n):
13     if n == 0:
14         return 0
15     elif n == 1:
16         return 1
17     else:
18         return fib(n - 1) + fib(n - 2)
19 
20 
21 def on_request(ch, method, props, body):
22     n = int(body)
23 
24     print(" [.] fib(%s)" % n)
25     response = fib(n)  #从客户端收到的消息
26 
27     ch.basic_publish(exchange='',   ##服务端发送返回的数据到props.reply_to队列(客户端发送指令时声明)
28                      routing_key=props.reply_to,  #correlation_id (随机数)每条指令都有随机独立的标识符
29                      properties=pika.BasicProperties(correlation_id= \
30                                                          props.correlation_id),
31                      body=str(response))
32     ch.basic_ack(delivery_tag=method.delivery_tag)  #客户端持久化
33 
34 
35 channel.basic_qos(prefetch_count=1)  #公平分发
36 channel.basic_consume(on_request,    #一接收到消息就调用on_request
37                       queue='rpc_queue')
38 
39 print(" [x] Awaiting RPC requests")
40 channel.start_consuming()

 

转折评释出处: 

pip install pika
or
easy_install pika
or
源码

https://pypi.python.org/pypi/pika

贰 、完毕最简便易行的队列通讯

发送端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

接收端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print(ch,method,properties)
    #ch:<pika.adapters.blocking_connection.BlockingChannel object at 0x002E6C90>    管道内存对象地址
    #methon:<Basic.Deliver(['consumer_tag=ctag1.03d155a851b146f19cee393ff1a7ae38',   #具体信息
            # 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=lzl'])>
    #properties:<BasicProperties>
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)   #接受到消息后不返回ack,无论本地是否处理完消息都会在队列中消失
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

注:windows连linux上的rabbitMQ会冒出报错,需求提供用户名密码

叁 、RabbitMQ音信分发轮询

先运转消息生产者,然后再分别运行三个顾客,通过生产者多发送几条消息,你会发现,这几条音讯会被每一种分配到各类消费者身上

4858.com 66

 

在那种形式下,RabbitMQ会暗许把p发的新闻公平的次第分发给种种消费者(c),跟负载均衡大致

4858.com 674858.com 68

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

pubulish.py

4858.com 694858.com 70

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print(ch,method,properties)
    #ch:<pika.adapters.blocking_connection.BlockingChannel object at 0x002E6C90>    管道内存对象地址
    #methon:<Basic.Deliver(['consumer_tag=ctag1.03d155a851b146f19cee393ff1a7ae38',   #具体信息
            # 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=lzl'])>
    #properties:<BasicProperties>
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

透超过实际施pubulish.py和consume.py能够兑现地点的新闻公平分发,这假使c1接收音讯之后宕机了,会产出哪些动静呢?rabbitMQ是怎么处理的?今后大家模拟一下

4858.com 714858.com 72

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

publish.py

4858.com 734858.com 74

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

在consume.py的callback函数里扩展了time.sleep模拟函数处理,通过地方程序实行效仿发现,c1收受到消息后并未处理完突然宕机,信息就从队列上海消防失了,rabbitMQ把音信删除掉了;就算程序须要消息必须要拍卖完才能从队列里删除,那我们就需求对先后开始展览处理一下

4858.com 754858.com 76

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

publish.py

4858.com 774858.com 78

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    #time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

由此把consume.py接收端里的no_ack``=``True去掉之后并在callback函数里面添加ch.basic_ack(delivery_tag ``= method.delivery_tag,就能够完毕音讯不被拍卖完不可能在队列里清除

查看新闻队列数:

4858.com 79

四 、音信持久化

比方新闻在传输进程中rabbitMQ服务器宕机了,会发现从前的音讯队列就不设有了,那时大家即将用到音讯持久化,音讯持久化会让队列不随着服务器宕机而消退,会永远的保存下去

发送端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl',durable=True)    #队列持久化

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode = 2     #消息持久化
                      )
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

接收端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl',durable=True)

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

⑤ 、音讯公平分发

尽管Rabbit只管按顺序把消息发到种种消费者身上,不考虑消费者负载的话,很也许出现,三个机器配置不高的主顾那里堆积了好多音讯处理不完,同时安顿高的买主却直接很自在。为缓解此难点,能够在依次消费者端,配置perfetch=1,意思正是报告RabbitMQ在作者那几个消费者当前音讯还没处理完的时候就毫无再给笔者发新音讯了

4858.com 80

channel.basic_qos(prefetch_count=1)

带音信持久化+公平分发

4858.com 814858.com 82

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl',durable=True)    #队列持久化

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode = 2     #消息持久化
                      )
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

pubulish.py

4858.com 834858.com 84

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl',durable=True)

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

6、Publish\Subscribe(音讯揭露\订阅) 

以前的例证都基本都以1对1的音讯发送和接受,即新闻只能发送到钦点的queue里,但稍事时候你想让你的新闻被全数的Queue收到,类似广播的法力,那时候就要用到exchange了,

An exchange is a very simple thing. On one
side it receives messages from producers and the other side it pushes
them to queues. The exchange must know exactly what to do with a message
it receives. Should it be appended to a particular queue? Should it be
appended to many queues? Or should it get discarded. The rules for that
are defined by the exchange type.

Exchange在概念的时候是有档次的,以决定到底是何等Queue符合条件,可以吸收接纳音信

fanout: 全体bind到此exchange的queue都能够收起音讯
direct: 通过routingKey和exchange决定的尤其唯一的queue基本上能用消息
topic:全体符合routingKey(此时能够是一个表明式)的routingKey所bind的queue能够收起音讯

headers: 通过headers
来决定把音信发给哪些queue

表达式符号表明:#意味着三个或四个字符,*表示任何字符

     
 例:#.a会匹配a.a,aa.a,aaa.a等
           
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange
Type为topic的时候一定于接纳fanout 


fanout收下全数广播:广播表示近日消息是实时的,如果没有八个顾客在收受消息,新闻就会放任,在此间消费者的no_ack已经无用,因为fanout不会管你处理音讯甘休没有,发过的消息不会重发,记住广播是实时的

4858.com 85

 

4858.com 864858.com 87

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',   #广播不用声明queue
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

publish.py

4858.com 884858.com 89

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,
                                                # exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue

channel.queue_bind(exchange='logs',         # 绑定转发器,收转发器上面的数据
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

consume.py


有取舍的吸收音信 direct:
 同fanout一样,
no_ack在此要设置为True,不然队列里多少不会清空(即使也不会重发)**

RabbitMQ还支持依据重庆大学字发送,即:队列绑定关键字,发送者将数据依照重点字发送到音讯exchange,exchange依据关键字 判定应该将数据发送至钦赐队列

4858.com 90

 

4858.com 914858.com 92

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

publish.py

4858.com 934858.com 94

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

consume.py


更周全的新闻过滤 topic:

Although using the direct exchange improved our system, it still has
limitations – it can’t do routing based on multiple
criteria.

In our logging system we might want to
subscribe to not only logs based on severity, but also based on the
source which emitted the log. You might know this concept from
the syslog unix
tool, which routes logs based on both severity (info/warn/crit…) and
facility (auth/cron/kern…).

That would give us a lot of flexibility –
we may want to listen to just critical errors coming from ‘cron’ but
also all logs from ‘kern’

4858.com 95

4858.com 964858.com 97

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

publish.py

4858.com 984858.com 99

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

consume.py

 

ENVISIONPC(Remote procedure
call )双向通讯

To illustrate how an RPC service could be
used we’re going to create a simple client class. It’s going to expose a
method named call which sends an RPC request and
blocks until the answer is received:

4858.com 100

rpc client:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika
import uuid,time


class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, #只要收到消息就执行on_response
                                   no_ack=True,     #不用ack确认
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:    #验证码核对
            self.response = body


    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        print(self.corr_id)
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,    #发送返回信息的队列name
                                       correlation_id=self.corr_id,     #发送uuid 相当于验证码
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()   #非阻塞版的start_consuming
            print("no messages")
            time.sleep(0.5)     #测试
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()    #实例化
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)       #执行call方法
print(" [.] Got %r" % response)

rpc server:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,    #回信息队列名
                     properties=pika.BasicProperties(correlation_id=
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


#channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,
                      queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图
Copyright @ 2010-2019 美高梅手机版4858 版权所有