Publish & Subscribe,消息的发布 / 订阅模型

消息生产(发布)者将消息发布到topic中,同时有多个消息消费(订阅)者消费该消息

和点对点方式不同,发布到topic中的消息会被所有订阅者消费

当生产者发布消息后,不管是否有消费者,都不会保存消息

一定要先有消息的消费者,再有消息的生产者




消息消费者

package activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消息消费者
*/
public class ConsumerTopic {

public String receiveTextMessage() {
String returnMsg = "";
// 连接工厂
ConnectionFactory factory = null;
// 连接对象
Connection connection = null;
// 目的地
Destination destination = null;
// 会话
Session session = null;
// 消息消费者,用于接收消息的对象
MessageConsumer consumer = null;
// 消息对象
Message message = null;

try {
// 创建连接ActiveMQ服务的连接工厂
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");

// 通过工厂,创建连接对象
connection = factory.createConnection();
// 消息的消费者必须启动连接,否则无法处理消息
connection.start();

// 通过连接对象,创建会话对象,必须绑定目的地
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地。参数是目的地名称(唯一标识)
destination = session.createTopic("test-topic");

// 创建消息消费者对象,在指定目的地中获取消息
consumer = session.createConsumer(destination);

// 获取队列中的消息
// receive方法是一个主动获取消息的方法。执行一次,拉取一个消息,开发少用
message = consumer.receive();

// 处理文本消息
returnMsg = ((TextMessage)message).getText();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 回收资源
// 回收消息消费者
if (consumer!=null) {
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
// 回收会话对象
if (session!=null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
// 回收连接对象
if (connection!=null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

return returnMsg;
}

public static void main(String[] args) {
ConsumerTopic temp = new ConsumerTopic();
String returnMsg = temp.receiveTextMessage();

System.out.println(returnMsg);
}

}


消息生产者

package activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消息生产者
*/
public class ProducerTopic {

/**
* 发送一个字符串文本消息到ActiveMQ中
* @param args
*/
public void sendTextMessage(String msg) {
// 连接工厂
ConnectionFactory factory = null;
// 连接对象
Connection connection = null;
// 目的地
Destination destination = null;
// 会话
Session session = null;
// 消息发送者
MessageProducer producer = null;
// 消息对象
Message message = null;

try {
// 创建连接ActiveMQ服务的连接工厂
factory = new ActiveMQConnectionFactory("guest", "guest", "tcp://127.0.0.1:61616");

// 通过工厂,创建连接对象
connection = factory.createConnection();
// 消息的发送者可以不启动连接(建议启动连接),消息的消费者必须启动连接
// producer在发送消息的时候,会检查是否启动了连接。如果未启动,则自动启动
// 如果没有特殊的配置,建议配置完毕后再启动连接
connection.start();

/*
* 通过连接对象,创建会话对象,必须绑定目的地
*
* transacted:是否支持事务
* true:支持事务,第二个参数默认无效,建议传递的数据是Session.SESSION_TRANSACTED
* false:不支持事务,常用参数,第二个参数必须传递,且必须有效
*
* acknowledgeMode:如何确认消息的处理,使用确认机制实现
* AUTO_ACKNOWLEDGE:自动确认消息,消息的消费者处理消息后,自动确认
* CLIENT_ACKNOWLEDGE:客户端手动确认,消息的消费者处理消息后,必须手动确认
* DUPS_OK_ACKNOWLEDGE:有副本的客户端手动确认。一个消息可以多次处理;可以降低Session的消耗,在可以容忍重复消息时使用(不推荐使用)
*/
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 创建目的地。参数是目的地名称(唯一标识)
destination = session.createTopic("test-topic");

// 通过会话对象,创建消息的发送者
// 创建producer的时候,可以不提供目的地,而在发送消息的时候指定目的地
producer = session.createProducer(destination);

// 创建文本消息对象,作为具体数据内容的载体
message = session.createTextMessage(msg);

// 使用producer,发送消息到目的地
producer.send(message);

System.out.println("消息已发送");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 回收资源
// 回收消息发送者
if (producer!=null) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
// 回收会话对象
if (session!=null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
// 回收连接对象
if (connection!=null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
ProducerTopic temp = new ProducerTopic();
temp.sendTextMessage("我是来测试的");
}

}


回到顶部