首先编写消息驱动bean
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/myQueue") })
public class MessageConsumer implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage tm = (TextMessage) message;
try {
System.out.println(tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
这个bean会监听jndi地址queue/myQueue上的消息
发送jms消息步骤:
1、首先从jndi上下文获取一个连接工厂,有TopicConnectionFactory/QueueConnectionFactory两种
2、从连接工厂获取一个连接,同理有QueueConnction/TopicConnection两种
3、通过连接建立一个会话,同理有QueueSession/TopicSession两种
4、通过session创建消息发送者,同理有QueueSender/TopicPublisher两种
5、发送jms消息,有TextMessage,MapMessage,ObjectMessage等
public void jmsQueue(){
QueueConnectionFactory factory = null ;
QueueConnection conn = null;
QueueSender sender = null;
QueueSession session = null ;
Queue queue = null;
try {
factory = (QueueConnectionFactory)this.context.lookup("ConnectionFactory");
conn = factory.createQueueConnection();
session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
queue = (Queue)this.context.lookup("queue/myQueue");
sender = session.createSender(queue);
TextMessage message = session.createTextMessage();
message.setText("hello world");
sender.send(message);
session.close();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(session!=null){
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "topic/myTopic") })
public class TopicMessageConsumer1 implements MessageListener{
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage tm = (TextMessage)message;
try {
System.out.println(this.getClass().getName()+":"+tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "topic/myTopic") })
public class TopicMessageConsumer implements MessageListener{
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage tm = (TextMessage)message;
try {
System.out.println(this.getClass().getName()+":"+tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
public void jmsTopic(){
TopicConnectionFactory factory = null ;
TopicConnection conn = null ;
TopicSession session = null ;
TopicPublisher sender = null ;
TextMessage message = null ;
Topic topic = null ;
try {
factory = (TopicConnectionFactory)this.context.lookup("ConnectionFactory");
conn = factory.createTopicConnection();
session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
topic = (Topic)this.context.lookup("topic/myTopic");
sender = session.createPublisher(topic);
message = session.createTextMessage();
message.setText("hello world");
sender.send(message);
session.close();
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
session.close();
} catch (Exception e2) {
}
}
}
<?xml version="1.0" encoding="UTF-8" ?>
<server>
<mbean code="org.jboss.mq.server.jmx.Queue"
name="jboss.org.destination:server=Queue,name=accpQueue" >
<attribute name="JNDIName" >queue/myQueue </attribute>
<depends optional-attribute-name = "DestinationManager" >
jboss.mq:service=DestinationManager
</depends>
</mbean>
<mbean code="org.jboss.mq.server.jmx.Topic" name="jboss.mq.destination:service=Topic,name=myTopic">
<attribute name="JNDIName" >topic/myTopic </attribute>
<depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
</mbean>
</server>
分享到:
相关推荐
前端开源库-promise-queue-plusPromise Queue Plus,基于Promise的队列。支持超时、重试等。
前端开源库-promise-queue承诺队列,基于承诺的队列
tp5.1安装使用think-queue,处理任务,在readme.md文件里有详细的步骤
Queue-Queue-Queue
Laravel开发-laravel-queue-aws-batch 用于AWS批处理的Laravel队列,允许用户将作业提交到批处理队列进行处理。
离线安装包,亲测可用
Laravel开发-laravel-async-queue Laravel的异步队列驱动程序(推到后台)
前端项目-d3-queue,使用可配置的并发性评估异步任务。
资源来自pypi官网。 资源全名:redis-queue-tool-4.1.5.tar.gz
离线安装包,亲测可用
资源来自pypi官网。 资源全名:atomic-queue-1.0.1.tar.gz
matrix-toolkits-java.zip,矩阵数据结构、线性解算器、最小二乘法的综合集合,
php-queue 是 PHP开发的磁盘存储消息队列服务,基于leveldb和swoole ,在4核机器上处理能力可以达到2.5W/s 。leveldb: ...
Amp-yii2-queue.zip,YII 2.0非阻塞队列扩展。,amp是php的一个非阻塞并发框架。它提供事件循环、承诺和流,作为异步编程的基础。
Linux Block IO Introducing Multi-queue SSD Access on Multi-core Systems
安装$ npm install p-queue用法在这里,我们当时只兑现了一个承诺。 例如,将concurrency设置为4可以concurrency运行四个promise。 import PQueue from 'p-queue' ;import got from 'got' ;const queue = new PQueue...
另一个含queue和topic的例子,接收端如果与mq服务器断开连接支持重连 博文链接:https://sosuny.iteye.com/blog/509846
如果在大型数组上执行大量Array#push()和Array#shift() ,则应使用此包而不是数组,因为Array#shift()具有O(n),而Queue#dequeue()具有O(1) 。 对于大型阵列,这有很大的不同。 是元素的有序列表,其中元素插入...
离线安装包,亲测可用