原创

ActiveMQ概念及java使用demo

一、概念:

消息队列(Message Queue)是分布式系统中重要的组件,通用使用场景可以简单地描述为当不需要立即获得结果,但是并发量需控制时就需要使用消息队列。消息列队有两种消息模式,一种是点对点的消息模式,另一种是订阅\发布的消息模式

官方文档

1.点对点的消息模式

点对点的模式主要建立在一个队列上,当连接一个列队时,发送方不需要知道接收方是否正在接收消息,可以直接向ActiveMQ发送消息,而发送的消息将直接进入队列中,如果接收方启动着监听,则会向接收方发送消息,若接收方没有接收到消息,则会保存在ActiveMQ服务器中,直到接收方接收消息为止。点对点的消息模式可以有多个接收方和发送方,但是一条消息只会被一个接收方接收到,先连上ActiveMQ接收方,则会先接收到消息,而之后的接收方则接收不到已被接收过的消息。

2.订阅/发布的消息模式

订阅/发布模式有多个接收方和发送方,但是接收方与发送方存在时间上的依赖,如果发送方发送消息时接收方没有监听消息,那么ActiveMQ将不会保存该消息,认为消息已经发送。这个模式还有一个特点就是发送方发送的消息会被所有的接收方接收到,与点对点模式恰恰相反。

二、队列模式和主题模式的区别

1.是否需要提前订阅

队列模式:消费者不需要提前订阅也可以消费消息

主题模式:只有提前进行订阅的消费者才能成功消费消息

2.多个消费者如何分配消息

队列模式:只能平均消费消息,被别的消费者消费的消息不能重复被其他的消费者消费

主题模式:每个订阅者都可以消费主题模式中的每一条消息

三、TransportConnector支持的协议

ActiveMQ通过网络连接器这种连接机制来实现客户端与服务端之间的通信。

ActiveMQ支持的client-broker通讯协议有:TCPNIOUDPSSLHttp(s)VM。  其中配置Transport  Connector的文件在activeMQ安装目录的conf/activemq.xml中的<transportConnectors>标签之内。

1.TCP协议

该协议是ActiveMQ默认的Broker配置,TCPClient监听端口61616。连接的URI格式为tcp://hostname:portkey=value&key=value(参数可选)。

使用该协议,在网络传输数据前,必须要实现序列化,消息是通过一个叫wire protocol的来序列化成字节流。默认情况下,ActiveMQwire protocol叫做OpenWire

2. NIO协议

NIO协议和TCP协议类似,但NIO的性能比TCP更好,因此建议使用NIO协议。

NIO连接的URI格式为nio://hostname:portkey=value&key=value(参数可选)

3.UDP协议

UDP协议是不保证数据的可靠性的,因此UDP通常用在快速数据传递和不怕数据丢失的场景中。

UDP连接的URI格式为udp://hostname:portkey=value&key=value(参数可选)。

4.SSL协议

连接的URL形式: ssl://hostname:port?key=value

5.HTTP协议

连接的URL形式: http://hostname:port?key=value

6.VM协议

如果客户端和broker在一个虚拟机内的话,通过VM协议通讯在VM内通讯,从而减少网络传输的开销。

VM协议的连接的URI形式: vm//brokerName?key=value

四、java使用demo

注:下面demo未处理异常,只是作为简单的演示

官网helloWorld地址:http://activemq.apache.org/hello-world

ActiveMQProducerTest.java

package base.jms;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
*
* @Description:
* @author: www.gameboys.cn(1084038709@qq.com)
* @date:2019年7月30日 上午11:44:16
*/
public class ActiveMQProducerTest {
// activemq服务器的url地址,默认通信端口为61616
// private static final String URL = "tcp://192.168.1.1:61616";
private static final String URL = "nio://192.168.1.1:61616";
private static final String userName = "gameboys";
private static final String password = "gameboys";
// 队列的名称
private static final String QUEUE_NAME = "queue-test";
// 队列的名称
private static final String TOPIC_NAME = "topic-test";
public static void main(String[] args) throws JMSException, Exception {
queueTest();
// topicTest();
}
public static void queueTest() throws JMSException, Exception {
// 1.创建连接工厂对象(ConnectionFactory)
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, password, URL);
// 2.创建连接对象(Connection)
Connection connection = connectionFactory.createConnection();
// 3.启动连接
connection.start();
// 4.创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建目的地(destination)
Destination destination = session.createQueue(QUEUE_NAME);
// 6.创建生产者
MessageProducer producer = session.createProducer(destination);
// 循环发送消息
for (int i = 0; i < 10000; i++) {
// 7.创建消息,这里创建的是简单的文本消息体
TextMessage textMessage = session.createTextMessage("test" + i);
// 8.使用消息生产者往目的地发送消息
producer.send(destination, textMessage);
System.out.println("消息发送成功:" + textMessage.getText());
Thread.currentThread().sleep(500);
}
// 9.关闭连接
connection.close();
}
public static void topicTest() throws JMSException, Exception {
// 1.创建连接工厂对象(ConnectionFactory)
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, password, URL);
// 2.创建连接对象(Connection)
Connection connection = connectionFactory.createConnection();
// 3.启动连接
connection.start();
// 4.创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建目的地(destination)
Destination destination = session.createTopic(TOPIC_NAME);
// 6.创建生产者
MessageProducer producer = session.createProducer(destination);
// 循环发送消息
for (int i = 0; i < 10000; i++) {
// 7.创建消息,这里创建的是简单的文本消息体
TextMessage textMessage = session.createTextMessage("test" + i);
// 8.使用消息生产者往目的地发送消息
producer.send(destination, textMessage);
Thread.currentThread().sleep(500);
System.out.println("消息发送成功:" + textMessage.getText());
}
// 9.关闭连接
connection.close();
}
}

ActiveMQConsumerTest.java

package base.jms;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
*
* @Description:
* @author: www.gameboys.cn(1084038709@qq.com)
* @date:2019年7月30日 上午11:44:16
*/
public class ActiveMQConsumerTest {

// activemq服务器的url地址,默认通信端口为61616
// private static final String URL = "tcp://192.168.1.1:61616";
private static final String URL = "nio://192.168.1.1:8261?jms.useAsyncSend=true&jms.prefetchPolicy.queuePrefetch=1";
private static final String userName = "gameboys";
private static final String password = "gameboys";
// 队列的名称
private static final String QUEUE_NAME = "queue-test";
// 队列的名称
private static final String TOPIC_NAME = "topic-test";
public static void main(String[] args) throws Exception {
queueTest1();
// topicTest1();
}
/**
* 使用监听器
*
* @throws Exception
*/
public static void queueTest1() throws Exception {
// 连接工厂
// 使用默认用户名、密码、路径
// 路径 tcp://host:61616
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, password, URL);
// ActiveMQConnectionFactory
// 获取一个连接
Connection connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 建立会话
// 第一个参数,是否使用事务,如果设置true,操作消息队列后,必须使用 session.commit();
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建队列或者话题对象
Queue queue = session.createQueue(QUEUE_NAME);
// 创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
// 每次接收消息,自动调用 onMessage
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
Thread.currentThread().sleep(1000);
System.out.println(1 / 0);
session.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
});
while (true) {
// 不能让junit线程死掉
}
}

/**
* 使用监听器
*
* @throws Exception
*/
public static void topicTest1() throws Exception {
// 1.创建连接工厂对象(ConnectionFactory)
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, password, URL);
// 2.创建连接对象(Connection)
Connection connection = connectionFactory.createConnection();
// 3.启动连接
connection.start();
// 4.创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建目的地(destination)
Destination destination = session.createTopic(TOPIC_NAME);
// 6.创建消费者
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() {
// 每次接收消息,自动调用 onMessage
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
Thread.currentThread().sleep(1000);
} catch (JMSException e) {
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
}

pom.xml添加

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.0</version>
</dependency>





正文到此结束
本文目录