ActiveMQ

ActiveMQ 官方网址:http://activemq.apache.org

前言

Java 消息服务

不同系统之间的信息交换,是我们开发中比较常见的场景,比如系统 A 要把数据发送给系统 B,这个问题我们应该如何去处理?1994年,原来的 Sun 公司领衔提出了一种面向消息的中间件服务–JMS 规范(标准);常用的几种信息交互技术(httpClient、hession、dubbo、jms、webservice 五种)。

JMS 概述

JMS 即 Java 消息服务(Java Message Service 的简称),是 Java EE 的标准/规范之一。这种规范(标准)指出:消息的发送应该是异步的、非阻塞的。也就是说消息的发送者发送完消息后就直接返回了,不需要等待接收者返回后才能返回,发送者和接收者可以说是互不影响。所以这种规范(标准)能够减轻或消除系统瓶颈,实现系统之间去除耦合,提高系统的整体可伸缩性和灵活性。JMS 只是 Java EE 中定义的一组标准的 API,它自身并不是一个消息服务系统,它是消息传送服务的一个抽象,也就是说它定义了消息传送的接口而并没有具体实现。

ActiveMQ 概述

我们知道 JMS 只是消息服务的一组规范和接口,并没有具体的实现,而 ActiveMQ 就是 JMS 规范的具体实现;它是 Apache 下的一个项目,采用 Java 语言开发;是一款非常流行的开源消息服务器。

ActiveMQ 与 JMS 关系

JMS 只是定义了一组有关消息传送的规范和标准,并没有真正实现,也就是说 JMS 只是定义了一组接口而已;就像 JDBC 抽象了关系数据库访问、JPA 抽象了对象与关系数据库映射、JNDI 抽象了命名目录服务访问一样,JMS 具体的实现由不同的消息中间件厂商提供,比如 Apache ActiveMQ 就是 JMS 规范的具体实现,Apache ActiveMQ 才是一个消息服务系统,而 JMS 不是。

ActiveMQ 下载地址

http://activemq.apache.org/download-archives.html


ActiveMQ 依赖 JDK 版本

MQ 版本号 Build-JDK 依赖 JDK
apache-activemq-5.0.0 1.5.0_12 1.5 +
apache-activemq-5.1.0 1.5.0_12 1.5 +
apache-activemq-5.2.0 1.5.0_15 1.5 +
apache-activemq-5.3.0 1.5.0_17 1.5 +
apache-activemq-5.4.0 1.5.0_19 1.5 +
apache-activemq-5.5.0 1.6.0_23 1.6 +
apache-activemq-5.6.0 1.6.0_26 1.6 +
apache-activemq-5.7.0 1.6.0_33 1.6 +
apache-activemq-5.8.0 1.6.0_37 1.6 +
apache-activemq-5.9.0 1.6.0_51 1.6 +
apache-activemq-5.10.0 1.7.0_12-ea 1.7 +
apache-activemq-5.11.0 1.7.0_60 1.7 +
apache-activemq-5.12.0 1.7.0_80 1.7 +
apache-activemq-5.13.0 1.7.0_80 1.7 +
apache-activemq-5.14.0 1.7.0_80 1.7 +
apache-activemq-5.15.0 1.8.0_112 1.8 +

如何查看官方发布 ActiveMQ 依赖 JDK 版本
查看文件 activemq-all-*.jar\META-INF\MANIFEST.MF 的属性值 Build-Jdk

图一


JMS 中的关键接口

  • ConnectionFactory:用于创建连接到消息中间件的连接工厂。
  • Connection:代表了应用程序和服务之间的连接通路。
  • Destination:指消息发布的地点,包括队列模式和主体模式。
  • Session:表示一个单线程的上下文,用于发送和接受消息。
  • MessageConsumer:由会话创建,用于接受发送到目的的消息。
  • MessageProducer:由会话创建,用于发送消息。
  • Message:是在消费者和生产者之间传递的对象,消息头,一组消息属性,和一个消息体。

图二


Mac 上安装 ActiveMQ

  • 下载 ActiveMQ(博主准备的版本是 5.15.9)
  • 通过命令解压到本地
  • 启动 ActiveMQ,执行命令 ./activemq start(见图二)。启动后有两个端口号,一个是 Web 控制台的 8161,一个是消息服务 Broker 连接端口 61616
  • 浏览器访问 http://127.0.0.1:8161/admin/ ,账号 admin 密码 admin(见图三)
  • 成功登录(见图四)

图三

图四

图五

  • Home:查看 ActiveMQ 的常见信息
  • Queues:查看 ActiveMQ 的队列信息
  • Topics:查看 ActiveMQ 的主题信息
  • Subscribers:查看主题的订阅者信息
  • Connections:查看 ActiveMQ 客户端的连接信息
  • Network:查看 ActiveMQ 的网络信息
  • Scheduled:查看 ActiveMQ 的定时任务
  • Send:用于通过表单方式向队列或者主题发送具体的消息

ActiveMQ 名词解释

  • Number Of Consumers 消费者端的消费者数量
  • Number Of Pending Messages 等待消费的消息,这个是当前未出队列的数量。可以理解为总接收数-总出队列数
  • Messages Enqueued 进入队列的消息,进入队列的总数量,包括出队列的。这个数量只增不减
  • Messages Dequeued 出了队列的消息,可以理解为是消费这消费掉的数量

这个要分两种情况理解

在 queues 里出队列消息和进入队列的总数量相等(因为一个消息只会被成功消费一次),如果暂时不等是因为消费者还没来得及消费。
在 topics 里出队列消息因为多消费者从而导致数量会比入队列数高。
简单的理解上面的意思就是
当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1;
当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1;
在来一条消息时,等待消费的消息是1,进入队列的消息就是2;

没有消费者时 Pending Messages 和入队列数量一样;
有消费者消费的时候 Pedding 会减少,出队列会增加;
到最后就是入队列和出队列的数量一样多;
以此类推,进入队列的消息和出队列的消息是池子,等待消费的消息是水流。


ActiveMQ 特点

  • 支持多语言编写客户端
  • 对 Spring 的支持,很容易和 Spring 整合
  • 支持多种传输协议:TCP、SSL、NIO、UDP 等
  • 支持 AJAX

ActiveMQ 的消息模式

  • 队列消息模式
  • 订阅模式

点对点的消息模式

  点对点的消息模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向 ActiveMQ 发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在 ActiveMQ 服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上 ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。

特点:
(1)客户端包括生产者和消费者;
(2)队列中的一个消息只能被一个消费者使用;
(3)消费者可以随时取消息;

订阅模式

  订阅/发布模式,同样可以有着多个发送端与多个接收端,但是接收端与发送端存在时间上的依赖,就是如果发送端发送消息的时候,接收端并没有监听消息,那么 ActiveMQ 将不会保存消息,将会认为消息已经发送,换一种说法,就是发送端发送消息的时候,接收端不在线,是接收不到消息的,哪怕以后监听消息,同样也是接收不到的。这个模式还有一个特点,那就是,发送端发送的消息,将会被所有的接收端给接收到,不类似点对点,一条消息只会被一个接收端给接收到。

特点:
(1)客户端包括发布者和订阅者;
(2)主题中的消息可以被所有订阅者消费;
(3)消费者不能消费订阅之前发送的消息;


点对点的消息模式和订阅模式区别

  • 订阅模式需要先订阅 TOPIC,即先启动消费者再启动生产者才能正常消费

ActiveMQ 的安全配置

  • 管理后台的密码设置
    打开 127.0.0.1:8161/admin/ 就是 ActiveMQ 的管理控制台,它的默认账号和密码都是 admin, 在生产环境肯定需要更改密码的,这要怎么做呢?
    在 activemq/conf/jetty.xml 中找到
1
2
3
4
5
6
<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
<property name="name" value="BASIC" />
<property name="roles" value="user,admin" />
<!-- set authenticate=false to disable login -->
<property name="authenticate" value="true" />
</bean>

高版本的已经默认成为了 true。所以我们直接进行下一步即可

在 activemq/conf/jetty-realm.properties 文件中配置,打开如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------

# Defines users that can access the web (console, demo, etc.)
# username: password [,rolename ...]
admin: admin, admin
user: user, user

注意:重点看倒数第二行,那里三个分别是用户名,密码,角色,其中 admin 角色是固定的。

注意:ActiveMQ 默认是不需要密码,生产消费者就可以连接的。

需要经过配置,才能设置密码,这一步在生产环境中一定要配置。找到activemq/conf/activemq.xml, 并打开在 节点中,在 节点上面,增加如下的一个插件

1
2
3
4
5
6
7
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>
</users>
</simpleAuthenticationPlugin>
</plugins>

这样就开启了密码认证
然后账号密码的配置在 activemq/conf/credentials.properties 文件中

打开这个文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------

# Defines credentials that will be used by components (like web console) to access the broker

# 账号
activemq.username=system
# 密码
activemq.password=manager
guest.password=password

这样就配置完毕了。


创建过程中的注意点
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

  • Boolean.FALSE 表示本次会话不开启事务管理,假如需要开启事务管理,将其改为 Boolean.TRUE,同时需要在发送消息后添加 session.commit(),否则,消息是不会被提交的
  • Session.AUTO_ACKNOWLEDGE 表示消息确认机制
    AUTO_ACKNOWLEDGE:自动确认
    CLIENT_ACKNOWLEDGE:客户端确认
    SESSION_TRANSACTED:事务确认,如果使用事务推荐使用该确认机制
    DUPS_OK_ACKNOWLEDGE:懒散式确认,消息偶尔不会被确认,也就是消息可能会被重复发送,但发生的概率很小

使用 Spring 集成 JMS 连接 ActiveMQ

  • CollectionFactory 用于管理连接的连接工厂
    • 一个 Spring 为我们提供的连接池
    • JmsTemplate 每次发消息都会重新创建连接,会话和 producer
    • Spring 中提供了 SingleConnectionFactory 和 CachingConnectionFactory
  • JmsTemplate 用于发送和接收消息的模板类
    • 是 Spring 提供的,只需向 Spring 容器内注册这个类就可以使用 JmsTemplate 方便的操作 jms
    • JmsTemplate 类是线程安全的,可以在整个应用范围使用。
  • MessageListener 消息监听器
    • 实现一个 onMessage 方法,该方法只接收一个 Message 参数

为什么要对消息中间件集群

  • 实现高可用,以单点故障引起的服务中断
  • 实现负载均衡,以提升效率为更多客户提供服务

集群方式

  • 客户端集群:让多个消费者消费同一个队列
  • Broker clusters:多个 Broker 之间同步消息
  • Master Slave:实现高可用

客户段配置

ActiveMQ 失效转移(failover)
允许当其中一台消息服务器宕机时,客户端在传输层上重新连接到其它消息服务器。

1
语法:failover : (uri1,...,uriN) ? transportOptions

transportOptions 参数说明

  • randomize 默认为 true,表示在 URI 列表中选择 URI 连接时是否采用随机策略
  • initialReconnectDelay 默认为10,单位毫秒,表示第一次尝试重连之间等待的时间
  • maxReconnectDelay 默认30000,单位毫秒,最长重连的时间间隔

Broker Cluster 集群配置

原理
图六

NetworkConnector(网络连接器)

  网络连接器主要用于配置 ActiveMQ 服务器与服务器之间的网络通讯方式,用于服务器透传消息。
  网络连接器分为静态连接器和动态连接器。

  • 静态连接器

    1
    2
    3
    <networkConnectors>
    <networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>
    </networkConnectors>
  • 动态连接器

    1
    2
    3
    4
    5
    6
    7
    <networkConnectors>
    <networkConnector uri="multicast://default"/>
    </networkConnectors>

    <transportConnectors>
    <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
    </transportConnectors>

Master/Slave 集群配置

ActiveMQ Master Slave 集群方案

  • Share nothing storage master/slave(已过时,5.8+后移除)
  • Shared storage master/slave 共享存储
  • Replicated LevelDB Store 基于复制的 LevelDB Store

共享存储集群的原理
图七


本文结束啦 感谢您阅读
如果你觉得这篇文章对你有用,欢迎赞赏哦~
0%