ActiveMQ 官方网址:http://activemq.apache.org
前言
Java 消息服务
不同系统之间的信息交换,是我们开发中比较常见的场景,比如系统 A 要把数据发送给系统 B,这个问题我们应该如何去处理?1994年,原来的 Sun 公司领衔提出了一种面向消息的中间件服务–JMS 规范(标准);常用的几种信息交互技术(httpClient、hession、dubbo、jms、webservice 五种)。
JMS 概述
JMS 即 Java 消息服务(Java Message Service 的简称),是 Java EE 的标准/规范之一。这种规范(标准)指出:消息的发送应该是异步的、非阻塞的。也就是说消息的发送者发送完消息后就直接返回了,不需要等待接收者返回后才能返回,发送者和接收者可以说是互不影响。所以这种规范(标准)能够减轻或消除系统瓶颈,实现系统之间去除耦合,提高系统的整体可伸缩性和灵活性。JMS 只是 Java EE 中定义的一组标准的 API,它自身并不是一个消息服务系统,它是消息传送服务的一个抽象,也就是说它定义了消息传送的接口而并没有具体实现。
ActiveMQ 概述
我们知道 JMS 只是消息服务的一组规范和接口,并没有具体的实现,而 ActiveMQ 就是 JMS 规范的具体实现;它是 Apache 下的一个项目,采用 Java 语言开发;是一款非常流行的开源消息服务器。
ActiveMQ 与 JMS 关系
JMS 只是定义了一组有关消息传送的规范和标准,并没有真正实现,也就是说 JMS 只是定义了一组接口而已;就像 JDBC 抽象了关系数据库访问、JPA 抽象了对象与关系数据库映射、JNDI 抽象了命名目录服务访问一样,JMS 具体的实现由不同的消息中间件厂商提供,比如 Apache ActiveMQ 就是 JMS 规范的具体实现,Apache ActiveMQ 才是一个消息服务系统,而 JMS 不是。
ActiveMQ 下载地址
http://activemq.apache.org/download-archives.html
ActiveMQ 依赖 JDK 版本
| MQ 版本号 | Build-JDK | 依赖 JDK |
|---|---|---|
| apache-activemq-5.0.0 | 1.5.0_12 | 1.5 + |
| apache-activemq-5.1.0 | 1.5.0_12 | 1.5 + |
| apache-activemq-5.2.0 | 1.5.0_15 | 1.5 + |
| apache-activemq-5.3.0 | 1.5.0_17 | 1.5 + |
| apache-activemq-5.4.0 | 1.5.0_19 | 1.5 + |
| apache-activemq-5.5.0 | 1.6.0_23 | 1.6 + |
| apache-activemq-5.6.0 | 1.6.0_26 | 1.6 + |
| apache-activemq-5.7.0 | 1.6.0_33 | 1.6 + |
| apache-activemq-5.8.0 | 1.6.0_37 | 1.6 + |
| apache-activemq-5.9.0 | 1.6.0_51 | 1.6 + |
| apache-activemq-5.10.0 | 1.7.0_12-ea | 1.7 + |
| apache-activemq-5.11.0 | 1.7.0_60 | 1.7 + |
| apache-activemq-5.12.0 | 1.7.0_80 | 1.7 + |
| apache-activemq-5.13.0 | 1.7.0_80 | 1.7 + |
| apache-activemq-5.14.0 | 1.7.0_80 | 1.7 + |
| apache-activemq-5.15.0 | 1.8.0_112 | 1.8 + |
如何查看官方发布 ActiveMQ 依赖 JDK 版本
查看文件 activemq-all-*.jar\META-INF\MANIFEST.MF 的属性值 Build-Jdk

JMS 中的关键接口
- ConnectionFactory:用于创建连接到消息中间件的连接工厂。
- Connection:代表了应用程序和服务之间的连接通路。
- Destination:指消息发布的地点,包括队列模式和主体模式。
- Session:表示一个单线程的上下文,用于发送和接受消息。
- MessageConsumer:由会话创建,用于接受发送到目的的消息。
- MessageProducer:由会话创建,用于发送消息。
- Message:是在消费者和生产者之间传递的对象,消息头,一组消息属性,和一个消息体。

Mac 上安装 ActiveMQ
- 下载 ActiveMQ(博主准备的版本是 5.15.9)
- 通过命令解压到本地
- 启动 ActiveMQ,执行命令 ./activemq start(见图二)。启动后有两个端口号,一个是 Web 控制台的 8161,一个是消息服务 Broker 连接端口 61616
- 浏览器访问 http://127.0.0.1:8161/admin/ ,账号 admin 密码 admin(见图三)
- 成功登录(见图四)



- Home:查看 ActiveMQ 的常见信息
- Queues:查看 ActiveMQ 的队列信息
- Topics:查看 ActiveMQ 的主题信息
- Subscribers:查看主题的订阅者信息
- Connections:查看 ActiveMQ 客户端的连接信息
- Network:查看 ActiveMQ 的网络信息
- Scheduled:查看 ActiveMQ 的定时任务
- Send:用于通过表单方式向队列或者主题发送具体的消息
ActiveMQ 名词解释
- Number Of Consumers 消费者端的消费者数量
- Number Of Pending Messages 等待消费的消息,这个是当前未出队列的数量。可以理解为总接收数-总出队列数
- Messages Enqueued 进入队列的消息,进入队列的总数量,包括出队列的。这个数量只增不减
- Messages Dequeued 出了队列的消息,可以理解为是消费这消费掉的数量
这个要分两种情况理解
在 queues 里出队列消息和进入队列的总数量相等(因为一个消息只会被成功消费一次),如果暂时不等是因为消费者还没来得及消费。
在 topics 里出队列消息因为多消费者从而导致数量会比入队列数高。
简单的理解上面的意思就是
当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1;
当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1;
在来一条消息时,等待消费的消息是1,进入队列的消息就是2;
没有消费者时 Pending Messages 和入队列数量一样;
有消费者消费的时候 Pedding 会减少,出队列会增加;
到最后就是入队列和出队列的数量一样多;
以此类推,进入队列的消息和出队列的消息是池子,等待消费的消息是水流。
ActiveMQ 特点
- 支持多语言编写客户端
- 对 Spring 的支持,很容易和 Spring 整合
- 支持多种传输协议:TCP、SSL、NIO、UDP 等
- 支持 AJAX
ActiveMQ 的消息模式
- 队列消息模式
- 订阅模式
点对点的消息模式
点对点的消息模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向 ActiveMQ 发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在 ActiveMQ 服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上 ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。
特点:
(1)客户端包括生产者和消费者;
(2)队列中的一个消息只能被一个消费者使用;
(3)消费者可以随时取消息;
订阅模式
订阅/发布模式,同样可以有着多个发送端与多个接收端,但是接收端与发送端存在时间上的依赖,就是如果发送端发送消息的时候,接收端并没有监听消息,那么 ActiveMQ 将不会保存消息,将会认为消息已经发送,换一种说法,就是发送端发送消息的时候,接收端不在线,是接收不到消息的,哪怕以后监听消息,同样也是接收不到的。这个模式还有一个特点,那就是,发送端发送的消息,将会被所有的接收端给接收到,不类似点对点,一条消息只会被一个接收端给接收到。
特点:
(1)客户端包括发布者和订阅者;
(2)主题中的消息可以被所有订阅者消费;
(3)消费者不能消费订阅之前发送的消息;
点对点的消息模式和订阅模式区别
- 订阅模式需要先订阅 TOPIC,即先启动消费者再启动生产者才能正常消费
ActiveMQ 的安全配置
- 管理后台的密码设置
打开 127.0.0.1:8161/admin/ 就是 ActiveMQ 的管理控制台,它的默认账号和密码都是 admin, 在生产环境肯定需要更改密码的,这要怎么做呢?
在 activemq/conf/jetty.xml 中找到
1 | <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint"> |
高版本的已经默认成为了 true。所以我们直接进行下一步即可
在 activemq/conf/jetty-realm.properties 文件中配置,打开如下
1 | ## --------------------------------------------------------------------------- |
注意:重点看倒数第二行,那里三个分别是用户名,密码,角色,其中 admin 角色是固定的。
注意:ActiveMQ 默认是不需要密码,生产消费者就可以连接的。
需要经过配置,才能设置密码,这一步在生产环境中一定要配置。找到activemq/conf/activemq.xml, 并打开在
1 | <plugins> |
这样就开启了密码认证
然后账号密码的配置在 activemq/conf/credentials.properties 文件中
打开这个文件如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
# Defines credentials that will be used by components (like web console) to access the broker
# 账号
activemq.username=system
# 密码
activemq.password=manager
guest.password=password
这样就配置完毕了。
创建过程中的注意点
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
- Boolean.FALSE 表示本次会话不开启事务管理,假如需要开启事务管理,将其改为 Boolean.TRUE,同时需要在发送消息后添加 session.commit(),否则,消息是不会被提交的
- Session.AUTO_ACKNOWLEDGE 表示消息确认机制
AUTO_ACKNOWLEDGE:自动确认
CLIENT_ACKNOWLEDGE:客户端确认
SESSION_TRANSACTED:事务确认,如果使用事务推荐使用该确认机制
DUPS_OK_ACKNOWLEDGE:懒散式确认,消息偶尔不会被确认,也就是消息可能会被重复发送,但发生的概率很小
使用 Spring 集成 JMS 连接 ActiveMQ
- CollectionFactory 用于管理连接的连接工厂
- 一个 Spring 为我们提供的连接池
- JmsTemplate 每次发消息都会重新创建连接,会话和 producer
- Spring 中提供了 SingleConnectionFactory 和 CachingConnectionFactory
- JmsTemplate 用于发送和接收消息的模板类
- 是 Spring 提供的,只需向 Spring 容器内注册这个类就可以使用 JmsTemplate 方便的操作 jms
- JmsTemplate 类是线程安全的,可以在整个应用范围使用。
- MessageListener 消息监听器
- 实现一个 onMessage 方法,该方法只接收一个 Message 参数
为什么要对消息中间件集群
- 实现高可用,以单点故障引起的服务中断
- 实现负载均衡,以提升效率为更多客户提供服务
集群方式
- 客户端集群:让多个消费者消费同一个队列
- Broker clusters:多个 Broker 之间同步消息
- Master Slave:实现高可用
客户段配置
ActiveMQ 失效转移(failover)
允许当其中一台消息服务器宕机时,客户端在传输层上重新连接到其它消息服务器。1
语法:failover : (uri1,...,uriN) ? transportOptions
transportOptions 参数说明
- randomize 默认为 true,表示在 URI 列表中选择 URI 连接时是否采用随机策略
- initialReconnectDelay 默认为10,单位毫秒,表示第一次尝试重连之间等待的时间
- maxReconnectDelay 默认30000,单位毫秒,最长重连的时间间隔
Broker Cluster 集群配置
原理
NetworkConnector(网络连接器)
网络连接器主要用于配置 ActiveMQ 服务器与服务器之间的网络通讯方式,用于服务器透传消息。
网络连接器分为静态连接器和动态连接器。
静态连接器
1
2
3<networkConnectors>
<networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>
</networkConnectors>动态连接器
1
2
3
4
5
6
7<networkConnectors>
<networkConnector uri="multicast://default"/>
</networkConnectors>
<transportConnectors>
<transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
</transportConnectors>
Master/Slave 集群配置
ActiveMQ Master Slave 集群方案
- Share nothing storage master/slave(已过时,5.8+后移除)
- Shared storage master/slave 共享存储
- Replicated LevelDB Store 基于复制的 LevelDB Store
共享存储集群的原理