十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
1、 rabbitmq介绍
RabbitMQ是一个开源的靠AMQP协议实现的服务,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
它可以使对应的客户端(client)与对应的消息中间件(broker)进行交互。消息中间件从发布者(publisher)那里收到消息(发布消息的应用,也称为producer),然后将他们转发给消费者(consumers,处理消息的应用)。由于AMQP是一个网络协议,所以发布者、消费者以及消息中间件可以部署到不同的物理机器上面
2、 消息队列的概念
消息即是信息的载体。为了让消息发送者和消息接收者都能够明白消息所承载的信息(消息发送者需要知道如何构造消息;消息接收者需要知道如何解析消息),它们就需要按照一种统一的格式描述消息,这种统一的格式称之为消息协议。所以,有效的消息一定具有某一种格式;而没有格式的消息是没有意义的。
3、 消息队列应用的场景
消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削锋和消息通讯四个场景
3.1异步处理
场景说明:如用户注册后,需要发送邮件和注册短信,传统的做法有两种
1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。
假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?
引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。
3.2应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口
传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合
如何解决以上问题呢?引入应用消息队列后的方案,如下图:
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
3.3、流量削峰
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
a、可以控制活动的人数
b、可以缓解短时间内高流量压垮应用
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理
3.4、日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下
日志采集客户端,负责日志数据采集,定时写受写入Kafka队列
Kafka消息队列,负责日志数据的接收,存储和转发
日志处理应用:订阅并消费kafka队列中的日志数据
3.5、消息通讯
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等
点对点通讯:
客户端A和客户端B使用同一队列,进行消息通讯。
聊天室通讯:
客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果
4、 常见的消息队列产品
4.1、redis
是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完成可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时,Redis的性能要高于RabbitMQ,而如否数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
4.2、 mecahceq
持久化消息队列(简称mcq)是一个轻量级的消息队列,特性如下:
简单易用
处理速度快
多条队列
并发性能好
与memcache的协议兼容。意味着只要装了前者的extension即可不需要额外的插件
在zend framework中使用很方便 php开发框架
4.3、 MSMQ
这是微软的产品里唯一被认为有价值的东西关键是它并不复杂,除了接收和发送,没有别的,它有一些硬性限制,比如大消息体积是4MB。
4.4、 ZeroMQ
ZeroMQ是一个非常轻量级的消息系统,号称最快的消息队列系统,专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常可以发现它。
与RabbitMQ相比,ZeroMQ支持许多高级消息场景,能够实现RabbitMQ不擅长的高级/复杂的队列,但是你必须实现ZeroMQ框架中的各个块(比如Socket或Device等)。
ZeroMQ具有一个独特的非中间件的模式
你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务角色。你只需要简单地引用ZeroMQ程序库,可以使用NuGet安装(微软开发的.Net平台),然后你就可以愉快地在应用程序之间发送消息了。
但是ZeroMQ仅提供非持久性的队列,即没有地方可以观察它是否有问题出现,也就是说如果down机,数据将会丢失。
4.5、 Jafka/Kafka
Jafka/Kafka(能将消息分散到不同的节点上)是LinkedIn于2010年12月开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:
1)快速持久化,可以在O(1)的系统开销下进行消息持久化;
2)高吞吐,在一台普通的服务器上既可以打到10W/s的吞吐速率;
3)完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现复杂均衡;
4)支持Hadoop数据并行加载,统一了在线和离线的消息处理,对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。
5)相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统
4.6、 Apache ActiveMQ
ActiveMQ居于(RabbitMQ&ZeroMQ)之间,类似于ZemoMQ,支持高级消息场景,支持数据的持久化
ActiveMQ被誉为Java世界的中坚力量。它有很长的历史,且被广泛使用。它还是跨平台的,给那些非微软平台的产品提供了一个天然的集成接入点。
然而它只有跑过了MSMQ才有可能被考虑。如需配置ActiveMQ则需要在目标机器上安装Java环境。
类似于RabbitMQ,它易于实现高级场景,而且只需付出低消耗。它被誉为消息中间件的“瑞士军刀”。
4.7、 RabbitMQ
RabbitMQ是使用Erlang编写的一个开源消息队列,本身支持很多的协议:AMQP(高级消息队列协议), XMPP(可扩展消息处理现场协议), SMTP(简单邮件传输协议), STONP(简单文本定向消息协议),也正是如此,使的它变的非常重量级,更适合于企业级的开发。
它实现了代理(Broker)架构,意味着消息在发送到客户端之前可以在中央节点上排队。此特性使得RabbitMQ易于使用和部署,适宜于很多场景如路由、负载均衡或消息持久化等,用消息队列只需几行代码即可搞定。但是,这使得它的可扩展性差,速度较慢,因为中央节点增加了延迟,消息封装后也比较大。如需配置RabbitMQ则需要在目标机器上安装Erlang环境
最终,上述同类产品:
1. 都有各自客户端API或支持多种编程语言
2. 都有大量的文档
3. 都提供了积极的支持
4. ActiveMQ、RabbitMQ、MSMQ、Redis都需要启动服务进程,这些都可以监控 和配置,其他几个就有问题了
5. 都相对提供了良好的可靠性(一致性)、扩展性和负载均衡,当然还有性能
5、Rabbitmq基础概念
应用场景框架
RabbitMQ Server:也叫broker server,它不是运送食物的卡车,而是一种传输服务。原话是RabbitMQ isn’t a food truck, it’s a delivery service. 他的角色就是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。但是这个保证也不是100%的保证,但是对于普通的应用来说这已经足够了。当然对于商业系统来说,可以再做一层数据一致性的guard(监控是否执行成功),就可以彻底保证系统的一致性了。
Client A & B:也叫Producer,数据的发送方。Create messages and Publish (Send) them to a broker server (RabbitMQ)。一个Message有两个部分:Payload(有效载荷)和Label(标签)。Payload顾名思义就是传输的数据,Label是Exchange的名字或者说是一个tag,它描述了payload,而且RabbitMQ也是通过这个label来决定把这个Message发给哪个Consumer。AMQP仅仅描述了label,而RabbitMQ决定了如何使用这个label的规则。
Client 1,2,3:也叫Consumer,数据的接收方。Consumers attach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一个有名字的邮箱。当有Message到达某个邮箱后,RabbitMQ把它发送给它的某个订阅者即Consumer。当然可能会把同一个Message发送给很多的Consumer。在这个Message中,只有payload,label已经被删掉了。对于Consumer来说,它是不知道谁发送的这个信息的。就是协议本身不支持。但是当然了如果Producer发送的payload包含了Producer的信息就另当别论了。
对于一个数据从Producer到Consumer的正确传递,还有三个概念需要明确:exchanges, queues and bindings。
Exchanges:消息交换机,它指定消息按什么规则,路由到哪个队列
Queues:消息队列载体,每个消息都会被投入到一个或多个队列
Bindings:它的作用就是把exchange和queue按照路由规则绑定起来
Routing Key:路由关键字,exchange根据这个关键字进行消息投递
Connection:就是一个TCP的连接。
Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。
Channel:虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。
Vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。每个virtual host本质上都是一个RabbitMQ Server,拥有它自己的queue,exchagne,和bings rule等等。这保证了你可以在多个不同的application中使用RabbitMQ。
6、Channel的选择
那么,为什么使用Channel,而不是直接使用TCP连接?
对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。
7、消息队列执行过程
客户端连接到消息队列服务器,打开一个Channel。
客户端声明一个Exchange,并设置相关属性。
客户端声明一个Queue,并设置相关属性。
客户端使用Routing key,在Exchange和Queue之间建立好绑定关系。
客户端投递消息到Exchange。
Exchange接收到消息后,就根据消息的key和已经设置的Binding,进行消息路由,将消息投递到一个或多个队列里。
8、消息持久化
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,大多数用户都会选择持久化。消息队列持久化包括3个部分:
Exchange持久化,在声明时指定durable => 1
Queue持久化,在声明时指定durable => 1
消息持久化,在投递时指定delivery_mode => 2(1是非持久化)
若Exchange和Queue都是持久化的,那么它们之间的Binding也是持久化的;而Exchange和Queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
Consumer从durable queue中取回一条消息之后并发回了ack消息,RabbitMQ就会将其标记,方便后续垃圾回收。如果一条持久化的消息没有被consumer取走,RabbitMQ重启之后会自动重建exchange和queue(以及bingding关系),消息通过持久化日志重建再次进入对应的queues,exchanges。
9、安装rabbitmq
24 yum -y localinstall erlang-18.1-1.el6.x86_64.rpm rabbitmq-server-3.6.6-1.el6.noarch.rpm socat-1.7.3.2-2.el7.x86_64.rpm (安装rabbitmq)
22 chkconfig --add rabbitmq-server (加入开机自启)
25 chkconfig rabbitmq-server on (开启rabbitmq开机自启)
26 /etc/init.d/rabbitmq-server start (开启rabbitmq)
查看rebbitmq是否运行
27 ps -ef | grep rabbitmq
30 rabbitmq-plugins enable rabbitmq_management
#开启rabbitmq的web管理插件,用户可以通过浏览器进行访问
31 rabbitmqctl add_user admin 123.com
#创建登录用户admin 密码123.com
32 rabbitmqctl set_user_tags admin administrator
#将admin用户添加到管理员组当中
查看端口netstat -anpt | grep 15672
浏览器访问IP地址:15672
10、Rabbitmq集群
(1.40-1.60的操作)
集群方式
Rabbitmq集群大概分为二种方式:
24 yum -y localinstall erlang-18.1-1.el6.x86_64.rpm rabbitmq-server-3.6.6-1.el6.noarch.rpm socat-1.7.3.2-2.el7.x86_64.rpm (安装rabbitmq)
22 chkconfig --add abbitmq-server (加入开机自启)
25 chkconfig rabbitmq-server on (开启rabbitmq开机自启)
26 /etc/init.d/rabbitmq-server start (开启rabbitmq)
查看rebbitmq是否运行
27 ps -ef | grep rabbitmq
[root@localhost ~]# vim /etc/hosts (四台主机都要操作)
192.168.1.30 rabbitmq1
192.168.1.40 rabbitmq2
192.168.1.50 rabbitmq3
192.168.1.60 rabbitmq4
##不想每台都手写可以使用scp
[root@rabbitmq1 ~]# scp /etc/hosts root@192.168.1.40:/etc/hosts ##提示输入yes
在rabbitmq1上查看cookie节点信息并复制
##安装集群的时候需要节点cookie信息一致
rabbitmq2,3和4上将文件cookie信息和rabbitmq1改成相同
[root@rabbitmq2 ~]# echo MIIPQWTLIDGVGKMDWQFX > /var/lib/rabbitmq/.erlang.cookie
[root@rabbitmq3 ~]#echo MIIPQWTLIDGVGKMDWQFX > /var/lib/rabbitmq/.erlang.cookie
[root@rabbitmq4 ~]#echo MIIPQWTLIDGVGKMDWQFX > /var/lib/rabbitmq/.erlang.cookie
配置完成后reboot重启虚拟机,
##注意:2和3需要手动重启,重启完成后主机名会变为rabbitmq1、2、3,启动不了的开机界面会一直停留在这个地方5-6分钟恢复,可以再次手动重启一下就不会这样了
[root@rabbitmq1 ~]# ps -ef | grep rabbit ##重启完成后查看是否启动
在rabbitmq1上操作
45 rabbitmqctl stop_app
46 rabbitmqctl reset
47 rabbitmqctl start_app
##设置完成后会提示节点名称并复制
在rabbitmq2、3,4上加入节点(2,3和4操作相同)
[root@rabbitmq2 ~]# rabbitmqctl stop_app
Stopping node rabbit@rabbitmq2 ...
[root@rabbitmq2 ~]# rabbitmqctl reset
Resetting node rabbit@rabbitmq2 ...
[root@rabbitmq2 ~]# rabbitmqctl join_cluster --ram rabbit@rabbitmq1
Clustering node rabbit@rabbitmq2 with rabbit@rabbitmq1 ...
##-join_cluter加入集群 --ram 以内存节点方式加入后面跟节点名rabbit@rabbitmq1
[root@rabbitmq2 ~]# rabbitmqctl start_app
Starting node rabbit@rabbitmq2 ...
[root@rabbitmq2 ~]# rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management
Applying plugin configuration to rabbit@rabbitmq2... started 6 plugins.
##开启rabbitmq插件,不打开的话,就不能使用浏览器访问rabbit页面进行管理
回到rabbitmq1上创建管理用户和查看集群状态
[root@rabbitmq1 ~]# rabbitmqctl add_user admin redhat
Creating user "admin" ...
[root@rabbitmq1 ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
##创建用户admin密码为redhat,并加入到管理员组中
[root@rabbitmq1 ~]# rabbitmq-plugins enable rabbitmq_management
Plugin configuration unchanged.
Applying plugin configuration to rabbit@rabbitmq1... nothing to do.
[root@rabbitmq1 ~]# rabbitmqctl cluster_status ##查看节点状态
##rabbit1工作模式为磁盘节点 rabbit2、3,4为ram内存节点模式
##running_nodes:正在运行的节点
##cluster_name:节点名称
##alarms:发生问题时rabbit1、2、3,4会进行报警
浏览器访问进行管理:
http://192.168.1.30:15672 用户名为admin密码redhat
点击vxgp后在下面找到premissions选项设置为admin用户权限访问
设置完成后再次查看
设置匹配策略
发布消息:
设置发布消息内容
可以看到已经有通知了
添加rabbitmq节点:rabbitmq4 192.168.83.4
rabbitmq1、2、3添加hosts主机都添加192.168.83.4 rabbitmq4
rabbitmq4安装同上面安装一样
[root@localhost ~]# chkconfig rabbitmq-server on
[root@localhost ~]# /etc/init.d/rabbitmq-server start
[root@localhost ~]# echo MIIPQWTLIDGVGKMDWQFX > /var/lib/rabbitmq/.erlang.cookie
[root@localhost ~]# reboot
[root@rabbitmq4 ~]# rabbitmqctl stop_app
[root@rabbitmq4 ~]# rabbitmqctl reset
[root@rabbitmq4 ~]# rabbitmqctl join_cluster --ram rabbit@rabbitmq1
删除节点:
##在rabbitmq4上先停止节点
[root@rabbitmq1 ~]# rabbitmqctl stop_app
回到主节点上rabbitmq1
[root@rabbitmq1 ~]# rabbitmqctl -n rabbit@rabbitmq1 forget_cluster_node rabbit@rabbitmq4
Removing node rabbit@rabbitmq4 from cluster ...
##-n 指定节点名称
#=forget_cluster_node 后面跟要删除的节点名称
另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。