【4858.com】怎么样使用RabbitMQ

By admin in 4858.com on 2019年3月27日

网上参考大神们的博客,本身做了叁个RabbitMQ即时发音信的德姆o。

话不多说,间接上代码!

注:那份文档是自身和多少个朋友学习后联手形成的。

4858.com 1

 

一:搭建多少个消除方案框架:RabbitMQ_Demo

目录

  • RabbitMQ 概念
  • exchange沟通机机制
    • 何以是交流机
    • binding?
    • Direct Exchange交换机
    • Topic Exchange交换机
    • Fanout Exchange交换机
    • Header Exchange交换机
  • RabbitMQ 的 Hello – Demo(springboot实现)
  • RabbitMQ 的 Hello Demo(spring xml实现)
  • RabbitMQ 在生育条件下选用和出现的标题
    • Spring RabbitMQ 注解
    • 消息的 JSON 传输
    • 消息持久化,断线重连,ACK。

1.引言

RabbitMQ——Rabbit Message
Queue的简写,但不可能只是了然其为消息队列,消息代理更适于。RabbitMQ
是贰个由 Erlang
语言开发的AMQP(高级音讯队列协议)的开源完毕,其内部结构如下:

4858.com 2

RabbitMQ作为二个新闻代理,首要和消息交际,负责接收并转账音信。RabbitMQ提供了可信的新闻机制、跟踪机制和灵活的新闻路由,接济音信集群和分布式安排。适用于排队算法、秒杀活动、新闻分发、异步处理、数据同步、处理耗费时间职责、CQ奇骏S等使用场景。

【4858.com】怎么样使用RabbitMQ。上边我们就来学习下RabbitMQ。

1.使用VS的NuGet安装包管理工科具安装RabbitMQ.Client:

中间包涵多少个部分:

RabbitMQ 概念

RabbitMQ
即八个音讯队列,重庆大学是用来落到实处应用程序的异步和平化解耦,同时也能起到音信缓冲,音讯分发的效应。RabbitMQ使用的是AMQP协议,它是一种二进制协议。暗中同意运营端口
5672。

在 RabbitMQ 中,如下图结构:

4858.com 3

rabbitmq

  • 右侧 P 代表 生产者,也便是往 RabbitMQ 发新闻的次第。
  • 个中正是 RabbitMQ,当中包含了 调换机 和 队列。
  • 左边 C 代表 消费者,也正是往 RabbitMQ 拿音信的程序。

那么,内部相比关键的概念有 五个,分别为:虚拟主机,交流机,队列,和绑定。

  • 虚拟主机:3个虚拟主机持有一组沟通机、队列和绑定。为啥供给多个虚拟主机呢?很简短,RabbitMQ在这之中,用户只可以在虚拟主机的粒度举办权力决定。
    因而,固然急需禁止A组访问B组的沟通机/队列/绑定,必须为A和B分别创设贰个虚拟主机。每1个RabbitMQ服务器都有一个默许的虚拟主机“/”。
  • 交换机:Exchange 用于转载新闻,但是它不会做存款和储蓄 ,即使没有
    Queue bind 到 Exchange 的话,它会直接甩掉掉 Producer
    发送过来的音信。

    • 此间有一个比较关键的定义:***路由键 ***
      。新闻到交流机的时候,交互机会转载到对应的队列中,那么毕竟转载到哪个队列,就要基于该路由键。
  • 绑定:约等于调换机要求和队列相绑定,那其间如上海教室所示,是多对多的关联。

2. 条件搭建

正文主要依照Windows下行使Vs Code 基于.net
core实行demo演示。起先以前大家须求准备好之下条件。

  • 安装Erlang运行条件
    下载安装Erlang。
  • 安装RabbitMQ
    下载安装Windows版本的RabbitMQ。
  • 启动RabbitMQ Server
    点击Windows开端按钮,输入RabbitMQ找到RabbitMQ Comman Prompt,以管理员身份运营。
  • 逐一执行以下命令运转RabbitMQ服务

    rabbitmq-service install
    rabbitmq-service enable
    rabbitmq-service start
    
  • 执行rabbitmqlctl status检查RabbitMQ状态

  • 安装管理平台插件
    执行rabbitmq-plugins enable rabbitmq_management即可成功安装,使用暗中认可账号密码(guest/guest)登录即可。

4858.com 4

1:RabbitMQ 公用类库项目

exchange调换机机制

3. Hello RabbitMQ

在上马此前大家先来打探下新闻模型:
4858.com 5
买主(consumer)订阅有个别队列。生产者(producer)创立音信,然后公布到行列(queue)中,队列再将新闻发送到监听的买主。

下边我们我们经过demo来精通RabbitMQ的宗旨用法。

 

2:七个劳动者控制台项目

怎么是调换机

rabbitmq的message
model实际上海消防息不直接发送到queue中,中间有八个exchange是做新闻分发,producer甚至不知道音信发送到那个队列中去。由此,当exchange收到message时,必须精确了然该如何分发。是append到一定规则的queue,照旧append到四个queue中,照旧被撇下?这一个规则都以通过exchagne的4种type去定义的。

The core idea in the messaging model in RabbitMQ is that the producer
never sends any messages directly to a queue. Actually, quite often
the producer doesn’t even know if a message will be delivered to any
queue at all.

Instead, the producer can only send messages to an exchange. An
exchange is a very simple thing. On one side it receives messages from
producers and the other side it pushes them to queues. The exchange
must know exactly what to do with a message it receives. Should it be
appended to a particular queue? Should it be appended to many queues?
Or should it get discarded. The rules for that are defined by the
exchange type.

exchange是四个音讯的agent,每二个虚构的host中都有定义。它的职责是把message路由到分歧的queue中。

3.1.音信的发送和选拔

创办RabbitMQ文件夹,打开命令提醒符,分别创设四个控制台项目Send、Receive。

dotnet new console --name Send //创建发送端控制台应用
cd Send //进入Send目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

dotnet new console --name Receive //创建接收端控制台应用
cd Receive //进入Receive目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

大家先来添加新闻发送端逻辑:

//Send.cs 
public static void Main(string[] args)
{
    //1.1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构建byte消息数据包
            string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
            var body = Encoding.UTF8.GetBytes(message);
            //6. 发送数据包
            channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

再来完善音讯接收端逻辑:

//Receive.cs  省略部分代码
public static void Main()
{
    //1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构造消费者实例
            var consumer = new EventingBasicConsumer(channel);
            //6. 绑定消息接收后的事件委托
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine(" [x] Received {0}", message);
                Thread.Sleep(6000);//模拟耗时
                Console.WriteLine (" [x] Done");
            };
            //7. 启动消费者
            channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

先运转信息接收端,再运营新闻发送端,结果如下图。

4858.com 6

从地点的代码中得以观看,发送端和消费端的代码前4步都是如出一辙的。首要的界别在于发送端调用channel.BasicPublish办法发送新闻;而接收端供给实例化一个EventingBasicConsumer实例来展开新闻处理逻辑。其它一些需求小心的是:音信接收端和发送端的队列名称(queue)必须保持一致,这里钦点的种类名称为hello。

2.劳动者端代码:

3:多个顾客控制台项目

binding?

exchange和queue通过routing-key关联,那两者之间的涉及是就是binding。如下图所示,X表示沟通机,品红代表队列,调换机通过一个routing-key去binding一个queue,routing-key有如何效能吗?看Direct
exchange类型交换机。

4858.com 7

3.2. 循环调度

运用工作行列的益处正是它亦可互为的拍卖队列。借使堆积了重重职分,大家只须求丰裕越多的劳力(workers)就能够了。我们先运营八个接收端,等待消息接收,再起步3个出殡和埋葬端实行音信发送。

4858.com 8

大家扩展运维贰个消费端后的运维结果:

4858.com 9

从图中可见,我们循环境与发展送4条消息,八个新闻接收端按梯次被循环分配。
暗许景况下,RabbitMQ将按梯次将每条消息发送给下二个买主。平均每种消费者将获得一致数量的新闻。那种分发音信的不二法门叫做循环(round-robin)。

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 
 8 namespace RabbitMQ.Producter
 9 {
10     class Program
11     {
12         /// <summary>
13         /// 连接配置
14         /// </summary>
15         private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
16         {
17             HostName="localhost",
18             UserName = "guest",
19             Password = "guest",
20             Port = 5672,
21             //VirtualHost = "JentVirtualHost"
22         };
23         /// <summary>
24         /// 路由名称
25         /// </summary>
26         const string ExchangeName = "Jent.Exchange";
27         /// <summary>
28         /// 队列名称
29         /// </summary>
30         const string QueueName = "Jent.Queue";
31         static void Main(string[] args)
32         {
33             DirectExchangeSendMsg();
34             Console.WriteLine("按任意键退出程序!");
35             Console.ReadKey();
36         }
37         /// <summary>
38         /// 单点精确路由模式
39         /// </summary>
40         private static void DirectExchangeSendMsg()
41         {
42             using (IConnection conn = rabbitMqFactory.CreateConnection())
43             {
44                 using (IModel channel = conn.CreateModel())
45                 {
46                     channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
47                     channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
48                     channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
49 
50                     var props = channel.CreateBasicProperties();
51                     props.Persistent = true;
52                     Console.WriteLine("请输入需要发送的消息:");
53                     string vadata = Console.ReadLine();
54                     while (vadata != "exit")
55                     {
56                         var msgBody = Encoding.UTF8.GetBytes(vadata);
57                         channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
58                         Console.WriteLine(string.Format("发送时间:{0},发送完毕,输入exit退出消息发送", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")));
59                         vadata = Console.ReadLine();
60                     }
61                 }
62             }
63         }
64     }
65 }

品种布局如图:

Directed Exchange

路由键exchange,该调换机械收割到新闻后会把信息发送到钦点routing-key的queue中。那新闻交流机是怎么通晓的吗?其实,producer
deliver新闻的时候会把routing-key add到 message
header中。routing-key只是贰个messgae的attribute。

A direct exchange delivers messages to queues based on a message routing key. The routing key is a message attribute added into the message header by the producer. The routing key can be seen as an "address" that the exchange use to decide how to route the message. A message goes to the queue(s) whose binding key exactly matches the routing key of the message.

Default Exchange
那种是非凡的Direct
Exchange,是rabbitmq内部暗中认可的贰个交流机。该交流机的name是空字符串,全部queue都私下认可binding
到该调换机上。全数binding到该沟通机上的queue,routing-key都和queue的name一样。

3.3. 音信确认

安份守己大家地点的demo,一旦RabbitMQ将音信发送到消费端,新闻就会立马从内部存款和储蓄器中移出,无论消费端是或不是处理到位。在那种情形下,新闻就会丢掉。

为了保险多少个新闻永远不会丢掉,RabbitMQ帮衬新闻确认(message
acknowledgments)
。当消费端接收新闻还要处理完结后,会发送1个ack(音信确认)信号到RabbitMQ,RabbitMQ接收到这么些信号后,就足以去除掉那条已经处理的音讯职务。但万一消费端挂掉了(比如,通道关闭、连接丢失等)没有发送ack信号。RabbitMQ就会知晓有个别新闻并未正规处理,RabbitMQ将会重复将音讯入队,要是有此外2个费用端在线,就会急忙的再一次发送到其它一个消费端。

RabbitMQ中平素不新闻超时的概念,唯有当消费端关闭或奔溃时,RabbitMQ才会再也分发音信。

微调下Receive中的代码逻辑:

 //5. 构造消费者实例
 var consumer = new EventingBasicConsumer(channel);
 //6. 绑定消息接收后的事件委托
 consumer.Received += (model, ea) =>
 {
     var message = Encoding.UTF8.GetString(ea.Body);
     Console.WriteLine(" [x] Received {0}", message);
     Thread.Sleep(6000);//模拟耗时
     Console.WriteLine(" [x] Done");
     // 7. 发送消息确认信号(手动消息确认)
     channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
 };
 //8. 启动消费者
 //autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕
 //autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认
 channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

重在改动的是将
autoAck:true修改为autoAck:fasle,以及在音讯处理完成后手动调用BasicAck方法举办手动音讯确认。

4858.com 10

从图中可见,音讯发送端连接发送4条消息,个中消费端1先被分配处理第③条音讯,消费端2被循环分配第壹条消息,第壹条消息由于并未空闲消费者还是在队列中。
在费用端2未处理完第2条音信在此之前,手动中断(ctrl+c)。我们可以发现RabbitMQ在下一遍分发时,会先行将被中止的新闻分发给消费端1甩卖。

3.顾客端代码:

4858.com 11

Topic Exchange

通配符交流机,exchange会把消息发送到一个或然多少个满足通配符规则的routing-key的queue。其中*表号匹配多个word,#同盟五个word和路线,路径之间通过.隔离。如满意a.*.c的routing-key有a.hello.c;满足#.hello的routing-key有a.b.c.helo。

3.4. 消息持久化

消息确认确认保障了正是消费端很是,新闻也不会丢掉能够被重新分发处理。可是一旦RabbitMQ服务端格外,信息依然会丢掉。除非我们钦点durable:true,不然当RabbitMQ退出或奔溃时,消息将依旧会丢掉。通过点名durable:true,并指定Persistent=true,来告诉RabbitMQ将音讯持久化。

//send.cs
//4. 申明队列(指定durable:true,告知rabbitmq对消息进行持久化)
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments
//将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//5. 构建byte消息数据包
string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 发送数据包(指定basicProperties)
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);

将音讯标记为持久性不能够完全保险音信不会丢掉。尽管它报告RabbitMQ将音讯保存到磁盘,可是当RabbitMQ接受音讯还要还不曾保存时​​,依然有三个十分的短的年华窗口。RabbitMQ
或然只是将消息保存到了缓存中,并从未将其写入到磁盘上。持久化是不可见肯定有限支持的,可是对于四个简短职务队列来说早已足足。假若急需确定保障新闻队列的持久化,能够使用publisher
confirms.

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 
 8 namespace RabbitMQ.Consumer
 9 {
10     class Program
11     {
12         /// <summary>
13         /// 连接配置
14         /// </summary>
15         private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
16         {
17             HostName = "127.0.0.1",
18             UserName = "guest",
19             Password = "guest",
20             Port = 5672,
21             //VirtualHost = "JentVirtualHost"
22         };
23         /// <summary>
24         /// 路由名称
25         /// </summary>
26         const string ExchangeName = "Jent.Exchange";
27         /// <summary>
28         /// 队列名称
29         /// </summary>
30         const string QueueName = "Jent.Queue";
31 
32         static void Main(string[] args)
33         {
34             DirectAcceptExchange();
35 
36             Console.WriteLine("输入任意值退出程序!");
37             Console.ReadKey();
38         }
39 
40         private static void DirectAcceptExchange()
41         {
42             using (IConnection conn = rabbitMqFactory.CreateConnection())
43             {
44                 using (IModel channel = conn.CreateModel())
45                 {
46                     channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
47                     channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
48                     channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
49 
50                     while (true)
51                     {
52                         BasicGetResult msgResponse = channel.BasicGet(QueueName, autoAck: false);
53                         if (msgResponse != null)
54                         {
55                             var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
56                             Console.WriteLine(string.Format("接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
57                         }
58                         //System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
59                     }
60                 }
61             }
62         }
63     }
64 }

 

Fanout Exchange

扇形交流机,该交流机会把音讯发送到全数binding到该沟通机上的queue。那种是publisher/subcribe形式。用来做广播最棒。
全数该exchagne上内定的routing-key都会被ignore掉。

The fanout copies and routes a received message to all queues that are
bound to it regardless of routing keys or pattern matching as with
direct and topic exchanges. Keys provided will simply be ignored.

3.5. 公而忘私分发

RabbitMQ的音讯分发暗许依据消费端的数量,按顺序循环分发。那样仅是有限辅助了消费端被平均分发音信的数码,但却忽视了消费端的闲忙情形。那就恐怕出现有个别消费端直接处理耗费时间任务处于阻塞状态,某个消费端直接处理一般职责处于空置状态,而只是它们分配的天职位数量量同样。

4858.com 12

但大家得以经过channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
设置prefetchCount : 1来报告RabbitMQ,在未收裁撤费端的音讯确认时,不再分发音信,也就确认保障了当消费端处于忙绿景观时,不再分配任务。

//Receive.cs
//4. 申明队列
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
//设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

此刻你要求小心的是一旦具有的消费端都地处辛勤景色,你的连串可能会被塞满。你供给注意那一点,要么添加越多的消费端,要么采用其它策略。

4.主次结果:

二:开发以前,必要引用RabbitMQ包

Header Exchange

安装header attribute参数类型的调换机。

4. Exchange

仔细的你只怕发现上面包车型大巴demo,生产者和买主直接是透过一致队列名称进行匹配衔接的。消费者订阅有些队列,生产者成立音信发布到行列中,队列再将音讯转载到订阅的主顾。那样就会有3个局限性,即消费者三回只好发送新闻到某多个队列。

那消费者如何才能发送消息到四个音讯队列呢?
RabbitMQ提供了Exchange,它相仿于路由器的遵从,它用于对音讯实行路由,将音讯发送到多个类别上。Exchange一方面从劳动者接收新闻,另一方面将音讯推送到行列。但exchange必须了解怎么样处理接收到的消息,是将其附加到特定队列依然增大到七个类别,依然一向忽略。而这一个规则由exchange
type定义,exchange的法则如下图所示。
4858.com 13

大规模的exchange type 有以下三种:

  • direct(分明的路由规则:消费端绑定的类别名称必须和新闻公布时钦定的路由名称相同)
  • topic (格局匹配的路由规则:协理通配符)
  • fanout (新闻广播,将消息分发到exchange上绑定的拥有队列上)

上边大家就来挨家挨户那介绍它们的用法。

4858.com 14

设置相应的Nuget包,可能下载相关dll也能够,然而建议在线安装nuget,更便宜

RabbitMQ 的 Hello Demo

设置就隐瞒了,建议遵照法定文档上做。先贴代码,稍后解释,代码如下:

布局 沟通机,队列,交流机与队列的绑定,新闻监视容器:

@Configuration
@Data
public class RabbitMQConfig {

    final static String queueName = "spring-boot";

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("spring-boot-exchange");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(queueName);
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    Receiver receiver() {
        return new Receiver();
    }
    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}

配备接收新闻者(即消费者):

public class Receiver {

    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(String message) {
        System.out.println("Received <" + message + ">");
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

安插发送新闻者(即生产者):

@RestController
public class Test {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping(value = "/test/{abc}",method = RequestMethod.GET)
    public String test(@PathVariable(value = "abc") String abc){
        rabbitTemplate.convertAndSend("spring-boot", abc + " from RabbitMQ!");
        return  "abc";
    }
}

上述便可完毕三个不难易行的 RabbitMQ
德姆o,具体代码在:点这里

那么,那里,分为八个部分分析:发新闻,沟通机队列,收音讯。

  • 对于发送信息:大家一般能够应用 RabbitTemplate,这一个是 Spring
    封装给了小编们,便于大家发送音信,我们调用
    rabbitTemplate.convertAndSend("spring-boot", xxx); 即可发送消息。
  • 对此调换机队列:如上代码,大家须求配备沟通机
    TopicExchange,配置队列 Queue,并且布署他们中间的绑定 Binding
  • 对此收受音信:首先需求成立三个音信监听容器,然后把大家的接受者注册到该容器中,那样,队列中有音讯,那么就会调用接收者的照应的艺术。如上代码
    container.setMessageListener(listenerAdapter);
    其中,MessageListenerAdapter 能够看作是
    大家接收者的三个包裹类,new MessageListenerAdapter(receiver, "receiveMessage");
    指明了一旦有新闻来,那么调用接收者哪个方法举行拍卖。

4.1 fanout

针对由表及里的牵记,大家先来领悟下fanout的广播路由体制。fanout的路由机制如下图,即发送到
fanout 类型exchange的音讯都会散发到独具绑定该exchange的连串上去。

4858.com 15

生产者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用fanout exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到指定exchange,fanout类型无需指定routingKey
channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: null, body: body);

顾客示例代码:

//申明fanout类型exchange
channel.ExchangeDeclare (exchange: "fanoutEC", type: "fanout");
//申明随机队列名称
var queuename = channel.QueueDeclare ().QueueName;
//绑定队列到指定fanout类型exchange,无需指定路由键
channel.QueueBind (queue : queuename, exchange: "fanoutEC", routingKey: "");

 

搜索:RabbitMQ.Client  
安装新型版即可,不明白怎么设置nuget,请移步 

RabbitMQ 的 Hello Demo(spring xml实现)

spring
xml情势贯彻RabbitMQ简单,可读性较好,配置简单,配置和落实如下所示。

上文已经讲述了rabbitmq的布局,xml格局通过properites文件存放用户配置音讯:

mq.host=127.0.0.1
mq.username=guest
mq.password=guest
mq.port=5672

配置application-mq.xml配置文件,表明连接、交流机、queue以及consumer监听。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
    <description>rabbitmq 连接服务配置</description>

    <!-- 连接配置 -->
    <context:property-placeholder location="classpath:mq.properties" />
    <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"/>
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- spring template声明-->
    <rabbit:template exchange="amqpExchange" id="amqpTemplate"  connection-factory="connectionFactory" />

    <!--申明queue-->
    <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" />
    <!--申明exchange交换机并绑定queue-->
    <rabbit:direct-exchange name="amqpExchange" durable="true" auto-delete="false" id="amqpExchange">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue_key" key="test_queue_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>


    <!--consumer配置监听-->
    <bean id="reveiver" class="com.demo.mq.receive.Reveiver" />
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
        <rabbit:listener queues="test_queue_key" ref="reveiver" method="receiveMessage"/>
    </rabbit:listener-container>
</beans>

上述代码中,引入properties文件就不多说了。

<rabbit:connection-factory>标签注解创制connection的factory工厂。

<rabbit-template>声明spring
template,和上文spring中使用template一样。template可声明exchange。

<rabbit:queue>扬言一个queue并设置queue的布置项,直接看标签属性就可以明白queue的计划项。

<rabbit:direct-exchange>宣称调换机并绑定queue。

<rabbit:listener-container>发明监听container并配备consumer和监听routing-key。

剩余就不难了,application-context.xml中把rabbitmq配置import进去。

<?xml version="1.0" encoding="UTF-8"?>
<beans
        xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:task="http://www.springframework.org/schema/task"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
       http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd">

    <context:component-scan base-package="com.demo.**" />
    <import resource="application-mq.xml" />
</beans>

Producer落成,发送新闻照旧使用template的convertAndSend() deliver音信。

@Service
public class Producer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    private final static Logger logger = LoggerFactory.getLogger(Producer.class);

    public void sendDataToQueue(String queueKey, Object object) {
        try {
            amqpTemplate.convertAndSend(queueKey, object);
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("exeception={}",e);
        }

    }
}

配置consumer

package com.demo.mq.receive;

import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;

@Service
public class Reveiver {
    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(String message) {
        System.out.println("reveice msg=" + message.toString());
        latch.countDown();
    }
}

测试deliver消息

Controller
@RequestMapping("/demo/")
public class TestController {
    private final static Logger logger = LoggerFactory.getLogger(TestController.class);
    @Resource
    private Producer producer;


    @RequestMapping("/test/{msg}")
    public String send(@PathVariable("msg") String msg){
        logger.info("#TestController.send#abc={msg}", msg);
        System.out.println("msg="+msg);
        producer.sendDataToQueue("test_queue_key",msg);
        return "index";
    }
}

4.2. direct

direct相对于fanout就属于完全匹配、单播的情势,路由体制如下图,即队列名称和新闻发送时钦点的路由完全匹配时,新闻才会发送到钦赐队列上。
4858.com 16

生产者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用direct exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "directEC", type: "direct");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到direct类型exchange,必须指定routingKey
channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: null, body: body);

消费者示例代码:

//申明direct类型exchange
channel.ExchangeDeclare (exchange: "directEC", type: "direct");
//绑定队列到direct类型exchange,需指定路由键routingKey
channel.QueueBind (queue : green, exchange: "directEC", routingKey: "green");

注:在率先步事先,你必要安装RabbitMQ客户端,可从

安装好今后,开端激动的写代码啦!

RabbitMQ 在生产条件下选取和产出的标题

在生产环境中,由于 Spring 对 RabbitMQ
提供了有的利于的笺注,所以首先能够运用这么些申明。例如:

  • @EnableRabbit:@EnableRabbit 和 @Configuration
    评释在3个类中组成使用,借使此类可以回来一个RabbitListenerContainerFactory 类型的
    bean,那么就相当于能够把该终端(消费端)和 RabbitMQ
    进行一连。Ps:(生成端不是通过 RabbitListenerContainerFactory 来和
    RabbitMQ 连接,而是经过 RabbitTemplate )
  • @RabbitListener:当对应的队列中有音信的时候,该评释修饰下的不二法门会被实践。
  • @RabbitHandler:接收者能够监听五个类别,分裂的行列音信的类型大概区别,该表明能够使得分歧的音信让分歧形式来响应。

实际那些评释的利用,可以参考这里的代码:点这里

第②,生产环境下的 RabbitMQ
大概不会在劳动者只怕消费者本机上,所以需求再度定义
ConnectionFactory,即:

@Bean
ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
    connectionFactory.setUsername(userName);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(vhost);
    return connectionFactory;
}

此处,能够另行设置须求延续的 RabbitMQ 的
ip,端口,虚拟主机,用户名,密码。

然后,能够先从生产端考虑,生产端须要连接 RabbitMQ,那么能够经过
RabbitTemplate 举行一连。 Ps:(RabbitTemplate
用于生产端发送新闻到交流机中),如下代码:

@Bean(name="myTemplate")
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(integrationEventMessageConverter());
    template.setExchange(exchangeName);
    return template;
}

在该代码中,new RabbitTemplate(connectionFactory);
设置了生产端连接到RabbitMQ,template.setMessageConverter(integrationEventMessageConverter());
设置了 生产端发送给沟通机的音信是以怎么样格式的,在
integrationEventMessageConverter() 代码中:

public MessageConverter integrationEventMessageConverter() {
    Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
    return messageConverter;
}

如上 Jackson2JsonMessageConverter 指明了 JSON。上述代码的最后
template.setExchange(exchangeName); 指明了
要把劳动者要把消息发送到哪个调换机上。

有了上述,那么,大家即可使用
rabbitTemplate.convertAndSend("spring-boot", xxx); 发送消息,xxx
表示任意档次,因为上述的安装会帮大家把这一个项目转化成 JSON 传输。

随着,生产端发送大家说过了,那么未来得以看看消费端:

对此消费端,大家能够只开创
SimpleRabbitListenerContainerFactory,它可以帮我们转移
RabbitListenerContainer,然后大家再利用 @RabbitListener
钦命接收者收到音讯时处理的艺术。

@Bean(name="myListenContainer")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setMessageConverter(integrationEventMessageConverter());
    factory.setConnectionFactory(connectionFactory());
    return factory;
}

这其中
factory.setMessageConverter(integrationEventMessageConverter());
钦点了我们承受音信的时候,以 JSON
传输的新闻能够转换来对应的门类传入到点子中。例如:

@Slf4j
@Component
@RabbitListener(containerFactory = "helloRabbitListenerContainer",queues = "spring-boot")
public class Receiver {
    @RabbitHandler
    public void receiveTeacher(Teacher teacher) {
        log.info("##### = {}",teacher);
    }
}

恐怕出现的标题:

4.3. topic

topic是direct的提拔版,是一种形式匹配的路由机制。它帮助采用二种通配符来展开方式匹配:符号#和符号*。其中*协作2个单词,
#则象征匹配0个或多少个单词,单词之间用.分割。如下图所示。
4858.com 17

劳动者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用topic exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到topic类型exchange,必须指定routingKey
channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);

顾客示例代码:

//申明topic类型exchange
channel.ExchangeDeclare (exchange: "topicEC", type: "topic");
//申明随机队列名称
var queuename = channel.QueueDeclare ().QueueName;
//绑定队列到topic类型exchange,需指定路由键routingKey
channel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");

        不过RabbitMQ又是依靠于Erlang
OTP平台,所以,安装RabbitMQ在此之前,必要先从

 

音信持久化

在生育条件中,咱们需求考虑万一劳动者挂了,消费者挂了,恐怕 rabbitmq
挂了何等。一般的话,借使劳动者挂了或许消费者挂了,其实是从未影响,因为音讯就在队列之中。那么万一
rabbitmq
挂了,此前在队列之中的音讯如何做,其实能够做音信持久化,RabbitMQ
会把新闻保存在磁盘上。

做法是能够先从 Connection 对象中获得三个 Channel
信道对象,然后再能够因而该对象设置 音讯持久化。

5. RPC

CR-VPC——Remote Procedure Call,远程进度调用。
那RabbitMQ怎么着实行远程调用呢?示意图如下:
4858.com 18
首先步,主若是展开远程调用的客户端供给钦定接收远程回调的队列,并申明消费者监听此行列。
4858.com ,其次步,远程调用的服务端除了要评释消费端接收远程调用请求外,还要将结果发送到客户端用来监听的结果的队列中去。

远程调用客户端:

 //申明唯一guid用来标识此次发送的远程调用请求
 var correlationId = Guid.NewGuid().ToString();
 //申明需要监听的回调队列
 var replyQueue = channel.QueueDeclare().QueueName;
 var properties = channel.CreateBasicProperties();
 properties.ReplyTo = replyQueue;//指定回调队列
 properties.CorrelationId = correlationId;//指定消息唯一标识
 string number = args.Length > 0 ? args[0] : "30";
 var body = Encoding.UTF8.GetBytes(number);
 //发布消息
 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
 Console.WriteLine($"[*] Request fib({number})");
 // //创建消费者用于处理消息回调(远程调用返回结果)
 var callbackConsumer = new EventingBasicConsumer(channel);
 channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
 callbackConsumer.Received += (model, ea) =>
 {
      //仅当消息回调的ID与发送的ID一致时,说明远程调用结果正确返回。
     if (ea.BasicProperties.CorrelationId == correlationId)
     {
         var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
         Console.WriteLine($"[x]: {responseMsg}");
     }
 };

长距离调用服务端:

//申明队列接收远程调用请求
channel.QueueDeclare(queue: "rpc_queue", durable: false,
    exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
//请求处理逻辑
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    int n = int.Parse(message);
    Console.WriteLine($"Receive request of Fib({n})");
    int result = Fib(n);
    //从请求的参数中获取请求的唯一标识,在消息回传时同样绑定
    var properties = ea.BasicProperties;
    var replyProerties = channel.CreateBasicProperties();
    replyProerties.CorrelationId = properties.CorrelationId;
    //将远程调用结果发送到客户端监听的队列上
    channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
        basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
    //手动发回消息确认
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);

       
关于那有个其他始末,推荐阅读:

三:完成RabbitMQ基本收发音信的效益

劳动者也许消费者断线重连

此间 Spring 有活动重连机制。

6. 总结

听别人讲下面的demo和对两种不相同exchange路由体制的上学,大家发现RabbitMQ首如若关联到以下多少个主导概念:

  1. Publisher:生产者,音信的发送方。
  2. Connection:网络连接。
  3. Channel:信道,多路复用连接中的一条独立的双向数据流通道。
  4. Exchange:调换器(路由器),负责音信的路由到对应队列。
  5. Binding:队列与调换器间的涉嫌绑定。消费者将关爱的行列绑定到钦定沟通器上,以便Exchange能准确分发音讯到钦命队列。
  6. Queue:队列,新闻的缓冲存储区。
  7. Virtual
    Host:虚拟主机,虚拟主机提供能源的逻辑分组和分手。包罗连接,沟通,队列,绑定,用户权限,策略等。
  8. Broker:音讯队列的服务器实体。
  9. Consumer:消费者,音讯的接收方。

此次作为入门就讲到那里,下次我们来上课下EventBus +
RabbitMQ
何以达成事件的分发。

参考资料:
RabbitMQ Tutorials
Demo路径——RabbitMQ

 

1:在RabbitMQ_Lib类库中新建类:MyRabbitMQ.cs

ACK 确认机制

每一种Consumer恐怕须求一段时间才能处理完收到的多寡。即便在那几个历程中,Consumer出错了,非凡退出了,而数据还未曾拍卖完了,那么
非凡不幸,那段数据就不见了。因为我们选择no-ack的方法实行确认,也正是说,每一次Consumer接到数据后,而不管是或不是处理完成,RabbitMQ Server会立刻把那个Message标记为形成,然后从queue中除去了。

设若多个Consumer卓殊退出了,它处理的数额可见被此外的Consumer处理,那样数据在那种地方下就不会丢掉了(注意是那种气象下)。
为了保障数据不被遗失,RabbitMQ扶助新闻确认机制,即acknowledgments。为了保险数据能被正确处理而不仅是被Consumer收到,那么大家不可能动用no-ack。而相应是在处理完数据后发送ack。

在处理数量后发送的ack,便是告诉RabbitMQ数据已经被选拔,处理完了,RabbitMQ能够去安全的删减它了。
假如Consumer退出了解而并未发送ack,那么RabbitMQ就会把那个Message发送到下一个Consumer。这样就保证了在Consumer很是退出的情事下数据也不会丢掉。

  此德姆o只是‘direct’格局的新闻发送接收格局。

4858.com 194858.com 20

个人对 RabbitMQ ACK 的一些思疑,求助:点这里

public class MyRabbitMQ
    {
        //连接工厂
        private ConnectionFactory factory { get; set; } = new ConnectionFactory
        {
            HostName = "localhost",
            Port = 5672,
            UserName = "guest",
            Password = "guest"
        };

        private IConnection connection { get; set; }

        private IModel channel { get; set; }

        //生产方 构造函数
        public MyRabbitMQ(string exchangeName = "",string exchangeType = ExchangeType.Direct)
        {
            //创建一个连接
            connection = factory.CreateConnection();
            channel = connection.CreateModel();

            //创建一个转发器
            channel.ExchangeDeclare(exchangeName, exchangeType);
        }

        //消费方 构造函数
        public MyRabbitMQ(string exchangeName = "", string queueName = "", string routingKey = "")
        {
            //创建一个连接
            connection = factory.CreateConnection();
            channel = connection.CreateModel();

            //创建一个队列
            channel.QueueDeclare(queueName, true, false, false);

            //队列绑定
            channel.QueueBind(queueName, exchangeName, routingKey);
        }

        public void SendMessage(string message="", string exchangeName = "", string routingKey = "")
        {
            channel.BasicPublish(exchangeName, routingKey, null, Encoding.UTF8.GetBytes(message));
        }

        public QueueingBasicConsumer ReceiveMessage(string queueName = "")
        {
            //EventingBasicConsumer
            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queueName, true, consumer);
            return consumer;
        }
    }

总结

  1. RabbitMQ 效用:异步,解耦,缓冲,音信分发。
  2. RabbitMQ 主要分为二个部分,生产者,沟通机和队列,消费者。
  3. 内需留意音信持久化,目标为了预防 RabbitMQ 宕机;考虑 ACK
    机制,目标为了假诺买主对音信的处理失利了,那么继续要怎么处理。

View Code

写在最终

  1. 写出来,说出来才了解对不对,知道不对才能改进,修正了才能成长。
  2. 在技术下边,希望大家眼里都容不得沙子。假诺有窘迫的地点可能要求校对的地方希望得以建议,格外谢谢。

能够看出,首先有五个构造函数,一个是给劳动者接纳,3个是给消费者应用,注意参数有所不相同,能够看看生产者与顾客关怀的点是不平等的。

随便生产可能消费,都以1个客户端,都亟待成立3个RabbitMQ连接并成立叁个channel,才能够开始展览有关的操作,那个操作都以由channel发起的,那样说应该比较白话了。

布局生产者的时候,主要是创办3个转载器,转载器的名字及项目须要定义,

转载器常用类型包蕴三种:direct、fanout、topic,

这几种类型那里说的更了解:

本例子中是以topic为例子的

 

2:MQ_Producter项目中发送消息(生产者中发送音信)

4858.com 214858.com 22

class Program
    {
        static void Main(string[] args)
        {
            string exchangeName = "07281616_exchange_topic";
            string routingkeya = "0728.a.c.routingkey";
            string routingkeyb = "0728.b.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, ExchangeType.Topic);
            for (int i = 0; i < 3600; i++)
            {
                System.Threading.Thread.Sleep(1000);
                if (i % 2 == 0)
                {
                    var message = $"{routingkeyb} -- {DateTime.Now.ToLongTimeString()}";
                    Console.WriteLine($"auto send: {message}  for {routingkeyb}");
                    myMQ.SendMessage(message, exchangeName, routingkeyb);
                }
                else
                {
                    var message = $"{routingkeya} -- {DateTime.Now.ToLongTimeString()}";
                    Console.WriteLine($"auto send: {message}  for {routingkeya}");
                    myMQ.SendMessage(message, exchangeName, routingkeya);
                }

            }
        }
    }

View Code

此间发送了3600次,每秒发1遍音信,奇数和偶数发送的路由规则差异,会有四个例外的客户端来接过,那样方便大家测试音信是不是被分发到了不一致的队列上

 

3:五个顾客项目举办信息的吸收

顾客一:

4858.com 234858.com 24

class Program
    {
        static void Main(string[] args)
        {
            string queueName = "07281616_queue";
            string exchangeName = "07281616_exchange_topic";
            var routingRule = "0728.*.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, queueName, routingRule);
            var consumer = myMQ.ReceiveMessage(queueName);
            while (true)
            {
                //BasicConsume 方法是可阻塞的,比较好
                var msgResponse = consumer.Queue.Dequeue();
                //这种方法不好,没有阻塞等待
                //var msgResponse = channel.BasicGet("zzs_queue", true);
                var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                Console.WriteLine($"Received: {msgBody}  (only for {routingRule})");
            }
        }
    }

View Code

 

买主二:

4858.com 254858.com 26

class Program
    {
        static void Main(string[] args)
        {
            string queueName = "07281626_queue";
            string exchangeName = "07281616_exchange_topic";
            var routingRule = "0728.a.*.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, queueName, routingRule);
            var consumer = myMQ.ReceiveMessage(queueName);
            while (true)
            {
                //BasicConsume 方法是可阻塞的,比较好
                var msgResponse = consumer.Queue.Dequeue();
                //这种方法不好,没有阻塞等待
                //var msgResponse = channel.BasicGet("zzs_queue", true);
                var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                Console.WriteLine($"Received: {msgBody} (only for {routingRule})");
            }
        }
    }

View Code

 

五个买主分别接受分裂队列上的音信

 

4:运行!

先编写翻译一下,到bin目录下先运维生产者,在运营几个买主

4858.com 27

 

也得以先关掉消费端,过7秒再关闭生产端,在web
管理界面能够看来今后有1个系列里有新闻,二个3条四个4条

4858.com 28

 

 四:总结

一体化例子的拥有代码都在那里了,代码里有关怀释也很精通,是自笔者自个儿实现的第3个RabbitMQ收发成效,实际选拔中毫无疑问能够有那么些恢弘,新手们有疑忌或然自个儿驾驭的有狼狈的地点,烦请评论处提议哈,大家共同进步!

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图
Copyright @ 2010-2019 美高梅手机版4858 版权所有