消息处理

package activemq.listener;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 使用监听器的方式,实现消息处理
*/
public class ConsumerListener {

/**
* 处理消息
* @return
*/
public void consumMessage() {
// 连接工厂
ConnectionFactory factory = null;
// 连接对象
Connection connection = null;
// 目的地
Destination destination = null;
// 会话
Session session = null;
// 消息消费者,用于接收消息的对象
MessageConsumer consumer = null;

try {
// 创建连接ActiveMQ服务的连接工厂
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");

// 通过工厂,创建连接对象
connection = factory.createConnection();
// 消息的消费者必须启动连接,否则无法处理消息
connection.start();

// 通过连接对象,创建会话对象,必须绑定目的地
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 创建目的地。参数是目的地名称(唯一标识)
destination = session.createQueue("test-listener");

// 创建消息消费者对象,在指定目的地中获取消息
consumer = session.createConsumer(destination);

// 注册监听器。注册成功后,队列中的消息变化会自动触发监听器代码。接收消息并处理
consumer.setMessageListener(new MessageListener() {

/*
* 监听器一旦注册,永久有效(consumer线程不关闭)
* 处理消息的方式:只要有消息未处理,自动调用onMessage方法处理消息
* 监听器可以注册若干个,注册多个监听器,类似于集群
* ActiveMQ自动循环调用多个监听器,处理队列中的消息,实现并行处理
*
* 处理消息的方法,就是监听方法
* 监听的事件是:消息、消息未处理
* 要处理的具体内容:消息处理
* 参数message:未处理的消息
*/
public void onMessage(Message message) {
try {
// 确认方法,代表consumer已经收到消息。确认后,MQ删除对应的消息
message.acknowledge();

ObjectMessage om = (ObjectMessage)message;
Object data = om.getObject();

System.out.println(data);
} catch (JMSException e) {
e.printStackTrace();
}
}
});

// 阻塞当前代码
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 回收资源
// 回收消息消费者
if (consumer!=null) {
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
// 回收会话对象
if (session!=null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
// 回收连接对象
if (connection!=null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
ConsumerListener temp = new ConsumerListener();
temp.consumMessage();
}

}



消息生产者

package activemq.listener;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消息生产者
*/
public class ProducerObject {

/**
* 发送消息
* @param obj
*/
public void sendMessage(Object obj) {
// 连接工厂
ConnectionFactory factory = null;
// 连接对象
Connection connection = null;
// 目的地
Destination destination = null;
// 会话
Session session = null;
// 消息发送者
MessageProducer producer = null;
// 消息对象
Message message = null;

try {
// 创建连接ActiveMQ服务的连接工厂
factory = new ActiveMQConnectionFactory("guest", "guest", "tcp://127.0.0.1:61616");

// 通过工厂,创建连接对象
connection = factory.createConnection();

// 通过连接对象,创建会话对象
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 创建目的地。参数是目的地名称(唯一标识)
destination = session.createQueue("test-listener");

// 通过会话对象,创建消息的发送者
producer = session.createProducer(destination);

connection.start();

for (int i=1; i<=100; i++) {
Integer data = i;
// 创建消息对象,消息中的数据载体是一个可序列化的对象
message = session.createObjectMessage(data);
producer.send(message);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 回收资源
// 回收消息发送者
if (producer!=null) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
// 回收会话对象
if (session!=null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
// 回收连接对象
if (connection!=null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
ProducerObject temp = new ProducerObject();
temp.sendMessage(null);
}

}


回到顶部