消息队列 MQ
Message Queue
是构建分布式互联网应用的基础设施
通过 MQ 实现的松耦合架构设计可以提高系统可用性以及可扩展性
是适用于现代应用的最佳设计方案
MQ 产品生态丰富多个子产品线联合打造金融级
高可用消息服务以及对物联网的原生支持
覆盖金融保险、(新)零售、物联网、移动互联网、传媒泛娱乐、教育、物流、能源、交通等行业
一、什么是消息中间件
消息中间件顾名思义实现两个系统或两个客户端之间进行消息传送
二、什么是ActiveMQ
ActiveMQ开源的基于JMS(Java Message Servie)规范的消息中间件
ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件
三、什么时候用ActiveMQ
ActiveMQ常被应用与系统业务的解耦,异步消息的推送,增加系统并发量,提高用户体验
在比较耗时且异步的远程开锁操作时
四、如何使用ActiveMQ
1.AcitveMQ的数据传送流程
2.ActiveMQ的两种消息传递类型
1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者接受该条队列里的数据
2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据,与MQTT协议的实现是类似的
两种消息传递类型的不同,点对点传输消费者可以接收到在连接之前生产者所推送的数据
而基于发布/订阅模式的传输方式消费者只能接收到连接之后生产者推送的数据
3.ActiveMQ的安装与启动
1官网下载对应服务器版本
2解压apache-activemq-5.15.9/bin目录
3执行./activemq start启动ActiveMQ
4浏览器输入ActiveMQ启动的服务器ip:8161便可进入web界面
点击Manage ActiveMQ broker可以查看消息推送的状态
默认账号密码为admin,admin
5启动错误分析
进入/root/apache-activemq-5.15.9/data目录
查看activemq.log文件,根据错误提示信息修改,例如端口号被占用等
4.ActiveMQ的代码测试
1)构建maven项目引入依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
2)生产者类
public class MyProducer {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
// 打开连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
Destination destination = session.createQueue("myQueue");
// 创建一个生产者
MessageProducer producer = session.createProducer(destination);
// 向队列推送10个文本消息数据
for (int i = 1 ; i <= 5 ; i++){ // 创建文本消息
TextMessage message = session.createTextMessage("第" + i + "个文本消息");
//发送消息
producer.send(message);
//在本地打印消息
System.out.println("已发送的消息:" + message.getText());
}
//关闭连接
connection.close();
}
}
运行结果:
已发送的消息:第1个文本消息
已发送的消息:第2个文本消息
已发送的消息:第3个文本消息
已发送的消息:第4个文本消息
已发送的消息:第5个文本消息
测试查看web后台显示,消息在队列中等待消费
(3)消费者类
public class MyConsumer {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
// 打开连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
Destination destination = session.createQueue("myQueue");
// 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
// 创建消费的监听
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
测试结果:
消费的消息:第1个文本消息
消费的消息:第2个文本消息
消费的消息:第3个文本消息
消费的消息:第4个文本消息
消费的消息:第5个文本消息
web后台显示有一个消费者处于连接状态,且已消费了5个message,而该条队列已没有message待消费了
4)当运行两个消费者类,消息是怎么被消费的呢?
是两个消费者都能收到生产者生产的message,还是只有其中一个消费者能消费呢?
先运行两个消费者,在运行一个生产者对目标队列生产message,会发现有以下情况
// Consumer1控制台
消费的消息:第1个文本消息
消费的消息:第3个文本消息
消费的消息:第5个文本消息
// Consumer2控制台
消费的消息:第2个文本消息
消费的消息:第4个文本消息
即队列中的数据会平均的分给每一个消费者消费,且每一条数据只能被消费一次
5)以上基于队列点对点的传输类型,以下是基于发布/订阅模式传输的类型测试
public class MyProducerForTopic {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
// 打开连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
Destination destination = session.createTopic("topicTest");
// 创建一个生产者
MessageProducer producer = session.createProducer(destination);
// 向队列推送10个文本消息数据
for (int i = 1 ; i <= 10 ; i++){
// 创建文本消息
TextMessage message = session.createTextMessage("第" + i + "个文本消息");
//发送消息
producer.send(message);
//在本地打印消息
System.out.println("已发送的消息:" + message.getText());
}
//关闭连接
connection.close();
}
}
public class MyConsumerForTopic {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
// 打开连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
Destination destination = session.createTopic("topicTest");
// 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
// 创建消费的监听
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
现在如果先启动生产者,再启动消费者
会发现消费者是无法接收到之前生产者之前所生产的数据
只有消费者先启动,再让生产者消费才可以正常接收数据
这也是发布/订阅的主题模式与点对点的队列模式的一个明显区别。
如果启动两个消费者
那么每一个消费者都能完整的接收到生产者生产的数据
即每一条数据都被消费了两次
这是发布/订阅的主题模式与点对点的队列模式的另一个明显区别
Message Queue
是构建分布式互联网应用的基础设施
通过 MQ 实现的松耦合架构设计可以提高系统可用性以及可扩展性
是适用于现代应用的最佳设计方案
MQ 产品生态丰富多个子产品线联合打造金融级
高可用消息服务以及对物联网的原生支持
覆盖金融保险、(新)零售、物联网、移动互联网、传媒泛娱乐、教育、物流、能源、交通等行业
一、什么是消息中间件
消息中间件顾名思义实现两个系统或两个客户端之间进行消息传送
二、什么是ActiveMQ
ActiveMQ开源的基于JMS(Java Message Servie)规范的消息中间件
ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件
三、什么时候用ActiveMQ
ActiveMQ常被应用与系统业务的解耦,异步消息的推送,增加系统并发量,提高用户体验
在比较耗时且异步的远程开锁操作时
四、如何使用ActiveMQ
1.AcitveMQ的数据传送流程
2.ActiveMQ的两种消息传递类型
1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者接受该条队列里的数据
2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据,与MQTT协议的实现是类似的
两种消息传递类型的不同,点对点传输消费者可以接收到在连接之前生产者所推送的数据
而基于发布/订阅模式的传输方式消费者只能接收到连接之后生产者推送的数据
3.ActiveMQ的安装与启动
1官网下载对应服务器版本
2解压apache-activemq-5.15.9/bin目录
3执行./activemq start启动ActiveMQ
4浏览器输入ActiveMQ启动的服务器ip:8161便可进入web界面
点击Manage ActiveMQ broker可以查看消息推送的状态
默认账号密码为admin,admin
5启动错误分析
进入/root/apache-activemq-5.15.9/data目录
查看activemq.log文件,根据错误提示信息修改,例如端口号被占用等
4.ActiveMQ的代码测试
1)构建maven项目引入依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
2)生产者类
public class MyProducer {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
// 打开连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
Destination destination = session.createQueue("myQueue");
// 创建一个生产者
MessageProducer producer = session.createProducer(destination);
// 向队列推送10个文本消息数据
for (int i = 1 ; i <= 5 ; i++){ // 创建文本消息
TextMessage message = session.createTextMessage("第" + i + "个文本消息");
//发送消息
producer.send(message);
//在本地打印消息
System.out.println("已发送的消息:" + message.getText());
}
//关闭连接
connection.close();
}
}
运行结果:
已发送的消息:第1个文本消息
已发送的消息:第2个文本消息
已发送的消息:第3个文本消息
已发送的消息:第4个文本消息
已发送的消息:第5个文本消息
测试查看web后台显示,消息在队列中等待消费
(3)消费者类
public class MyConsumer {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
// 打开连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
Destination destination = session.createQueue("myQueue");
// 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
// 创建消费的监听
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
测试结果:
消费的消息:第1个文本消息
消费的消息:第2个文本消息
消费的消息:第3个文本消息
消费的消息:第4个文本消息
消费的消息:第5个文本消息
web后台显示有一个消费者处于连接状态,且已消费了5个message,而该条队列已没有message待消费了
4)当运行两个消费者类,消息是怎么被消费的呢?
是两个消费者都能收到生产者生产的message,还是只有其中一个消费者能消费呢?
先运行两个消费者,在运行一个生产者对目标队列生产message,会发现有以下情况
// Consumer1控制台
消费的消息:第1个文本消息
消费的消息:第3个文本消息
消费的消息:第5个文本消息
// Consumer2控制台
消费的消息:第2个文本消息
消费的消息:第4个文本消息
即队列中的数据会平均的分给每一个消费者消费,且每一条数据只能被消费一次
5)以上基于队列点对点的传输类型,以下是基于发布/订阅模式传输的类型测试
public class MyProducerForTopic {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
// 打开连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
Destination destination = session.createTopic("topicTest");
// 创建一个生产者
MessageProducer producer = session.createProducer(destination);
// 向队列推送10个文本消息数据
for (int i = 1 ; i <= 10 ; i++){
// 创建文本消息
TextMessage message = session.createTextMessage("第" + i + "个文本消息");
//发送消息
producer.send(message);
//在本地打印消息
System.out.println("已发送的消息:" + message.getText());
}
//关闭连接
connection.close();
}
}
public class MyConsumerForTopic {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
// 打开连接
connection.start();
// 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
Destination destination = session.createTopic("topicTest");
// 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
// 创建消费的监听
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
现在如果先启动生产者,再启动消费者
会发现消费者是无法接收到之前生产者之前所生产的数据
只有消费者先启动,再让生产者消费才可以正常接收数据
这也是发布/订阅的主题模式与点对点的队列模式的一个明显区别。
如果启动两个消费者
那么每一个消费者都能完整的接收到生产者生产的数据
即每一条数据都被消费了两次
这是发布/订阅的主题模式与点对点的队列模式的另一个明显区别
千年的回眸
ActiveMQ是Apache软件基金会研发的开放源代码消息中间件
ActiveMQ是纯Java程序,需要操作系统支持Java虚拟机
中文名 ActiveMQ
外文名 Apache ActiveMQ
支持语言 Java,C,C++,C#,Python,Ruby,Perl
应用协议 OpenWire,Stomp REST
支持规范 JMS1.1,J2EE 1.4 AMQP 1.0
出品厂商 Apache
支持系统 windows,linux
Spring Framework
集群 (Clustering)
支持的编程语言包括:C、C++、C#、Delphi、Erlang、Adobe Flash、Haskell、Java、JavaScript、Perl、PHP、Pike、Python和Ruby
协议支持包括:OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP以及AMQP