消息队列 MQ
Message Queue
是构建分布式互联网应用的基础设施
通过 MQ 实现的松耦合架构设计可以提高系统可用性以及可扩展性
是适用于现代应用的最佳设计方案
MQ 产品生态丰富多个子产品线联合打造金融级
高可用消息服务以及对物联网的原生支持
覆盖金融保险、(新)零售、物联网、移动互联网、传媒泛娱乐、教育、物流、能源、交通等行业

一、什么是消息中间件
消息中间件顾名思义实现两个系统或两个客户端之间进行消息传送

二、什么是ActiveMQ
ActiveMQ开源的基于JMS(Java Message Servie)规范的消息中间件
ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件

三、什么时候用ActiveMQ
ActiveMQ常被应用与系统业务的解耦,异步消息的推送,增加系统并发量,提高用户体验
在比较耗时且异步的远程开锁操作时

四、如何使用ActiveMQ
1.AcitveMQ的数据传送流程
2.ActiveMQ的两种消息传递类型
1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者接受该条队列里的数据
2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据,与MQTT协议的实现是类似的
两种消息传递类型的不同,点对点传输消费者可以接收到在连接之前生产者所推送的数据
而基于发布/订阅模式的传输方式消费者只能接收到连接之后生产者推送的数据

3.ActiveMQ的安装与启动
1官网下载对应服务器版本
2解压apache-activemq-5.15.9/bin目录
3执行./activemq start启动ActiveMQ
4浏览器输入ActiveMQ启动的服务器ip:8161便可进入web界面
点击Manage ActiveMQ broker可以查看消息推送的状态
默认账号密码为admin,admin

5启动错误分析
进入/root/apache-activemq-5.15.9/data目录
查看activemq.log文件,根据错误提示信息修改,例如端口号被占用等
4.ActiveMQ的代码测试
1)构建maven项目引入依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
2)生产者类

public class MyProducer {
    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打开连接
        connection.start();
        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
        Destination destination = session.createQueue("myQueue");
        // 创建一个生产者
        MessageProducer producer = session.createProducer(destination);
        // 向队列推送10个文本消息数据
        for (int i = 1 ; i <= 5 ; i++){ // 创建文本消息
            TextMessage message = session.createTextMessage("第" + i + "个文本消息");
            //发送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("已发送的消息:" + message.getText());
        }
        //关闭连接
        connection.close();
    }

}

运行结果:
已发送的消息:第1个文本消息
已发送的消息:第2个文本消息
已发送的消息:第3个文本消息
已发送的消息:第4个文本消息
已发送的消息:第5个文本消息
测试查看web后台显示,消息在队列中等待消费

(3)消费者类

public class MyConsumer {
    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打开连接
        connection.start();
        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
        Destination destination = session.createQueue("myQueue");
        // 创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        // 创建消费的监听
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消费的消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

测试结果:
消费的消息:第1个文本消息
消费的消息:第2个文本消息
消费的消息:第3个文本消息
消费的消息:第4个文本消息
消费的消息:第5个文本消息
web后台显示有一个消费者处于连接状态,且已消费了5个message,而该条队列已没有message待消费了

4)当运行两个消费者类,消息是怎么被消费的呢?
是两个消费者都能收到生产者生产的message,还是只有其中一个消费者能消费呢?
先运行两个消费者,在运行一个生产者对目标队列生产message,会发现有以下情况
// Consumer1控制台
消费的消息:第1个文本消息
消费的消息:第3个文本消息
消费的消息:第5个文本消息
// Consumer2控制台
消费的消息:第2个文本消息
消费的消息:第4个文本消息

即队列中的数据会平均的分给每一个消费者消费,且每一条数据只能被消费一次
5)以上基于队列点对点的传输类型,以下是基于发布/订阅模式传输的类型测试

public class MyProducerForTopic {
    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打开连接
        connection.start();
        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
        Destination destination = session.createTopic("topicTest");
        // 创建一个生产者
        MessageProducer producer = session.createProducer(destination);
        // 向队列推送10个文本消息数据
        for (int i = 1 ; i <= 10 ; i++){
            // 创建文本消息
            TextMessage message = session.createTextMessage("第" + i + "个文本消息");
            //发送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("已发送的消息:" + message.getText());
        }
        //关闭连接
        connection.close();
    }

}


public class MyConsumerForTopic {
    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打开连接
        connection.start();
        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
        Destination destination = session.createTopic("topicTest");
        // 创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        // 创建消费的监听
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消费的消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

现在如果先启动生产者,再启动消费者
会发现消费者是无法接收到之前生产者之前所生产的数据
只有消费者先启动,再让生产者消费才可以正常接收数据
这也是发布/订阅的主题模式与点对点的队列模式的一个明显区别。
如果启动两个消费者
那么每一个消费者都能完整的接收到生产者生产的数据
即每一条数据都被消费了两次
这是发布/订阅的主题模式与点对点的队列模式的另一个明显区别