侧边栏壁纸
  • 累计撰写 793 篇文章
  • 累计创建 1 个标签
  • 累计收到 1 条评论
标签搜索

目 录CONTENT

文章目录

ActiveMQ

Dettan
2021-04-10 / 0 评论 / 0 点赞 / 160 阅读 / 781 字
温馨提示:
本文最后更新于 2022-07-23,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。
非阻塞式的服务间项目通信.



对一推送 ,一对多订
queue 多对一
topic 多对多
问题
MQ使用中意外重启或崩掉,MQ发送方可以重连,MQ接收方必须重启进程才可以重连。解决方案:新建一个断链重连Listen接口,需要重连的接收方注册监听器(实现重新注册MQ),服务端启动一个Monitor线程(守护)轮询MQ服务器,发现断链,调用所有监听器。
<bean id="mqFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <constructor-arg value="tcp://localhost:61616" />
</bean>
<bean id="mqConnection" factory-bean="mqFactory" factory-method="createConnection" init-method="start"/>
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* Hello world!
*/
public class JavaClient {
public static void main(String[] args) throws Exception {
		thread(new HelloWorldProducer(), false);
		thread(new HelloWorldProducer(), false);
		thread(new HelloWorldConsumer(), false);
		Thread.sleep(1000);
		thread(new HelloWorldConsumer(), false);
		thread(new HelloWorldProducer(), false);
		thread(new HelloWorldConsumer(), false);
		thread(new HelloWorldProducer(), false);
		Thread.sleep(1000);
		thread(new HelloWorldConsumer(), false);
		thread(new HelloWorldProducer(), false);
		thread(new HelloWorldConsumer(), false);
		thread(new HelloWorldConsumer(), false);
		thread(new HelloWorldProducer(), false);
		thread(new HelloWorldProducer(), false);
		Thread.sleep(1000);
		thread(new HelloWorldProducer(), false);
		thread(new HelloWorldConsumer(), false);
		thread(new HelloWorldConsumer(), false);
		thread(new HelloWorldProducer(), false);
		thread(new HelloWorldConsumer(), false);
		thread(new HelloWorldProducer(), false);
		thread(new HelloWorldConsumer(), false);
		thread(new HelloWorldProducer(), false);
		thread(new HelloWorldConsumer(), false);
		thread(new HelloWorldConsumer(), false);
		thread(new HelloWorldProducer(), false);
}
public static void thread(Runnable runnable, boolean daemon) {
		Thread brokerThread = new Thread(runnable);
		brokerThread.setDaemon(daemon);
		brokerThread.start();
}
//生产者
public static class HelloWorldProducer implements Runnable {
		public void run() {
				try {
						// Create a ConnectionFactory
						ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
						// Create a Connection
						Connection connection = connectionFactory.createConnection();
						connection.start();
						// Create a Session
						Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
						// Create the destination (Topic or Queue)
						Destination destination = session.createQueue("TEST.FOO");
						// Create a MessageProducer from the Session to the Topic or Queue
						MessageProducer producer = session.createProducer(destination);
						producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
						// Create a messages
						String text = "Hello world! From: " + Thread.currentThread().getName() + " :" + this.hashCode();
						TextMessage message = session.createTextMessage(text);
						// Tell the producer to send the message
						System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
						producer.send(message);
						// Clean up
						session.close();
						connection.close();
				}
				catch (Exception e) {
						System.out.println("Caught: " + e);
						e.printStackTrace();
				}
		}
}
//消费者
public static class HelloWorldConsumer implements Runnable, ExceptionListener {
		public void run() {
				try {
				// Create a ConnectionFactory
				ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
				// Create a Connection
				Connection connection = connectionFactory.createConnection();
				connection.start();
				connection.setExceptionListener(this);
				// Create a Session
				Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
				// Create the destination (Topic or Queue)
				Destination destination = session.createQueue("TEST.FOO");
				// Create a MessageConsumer from the Session to the Topic or Queue
				MessageConsumer consumer = session.createConsumer(destination);
				// Wait for a message
				Message message = consumer.receive(1000);
				if (message instanceof TextMessage) {
						TextMessage textMessage = (TextMessage) message;
						String text = textMessage.getText();
						System.out.println("Received: " + text);
				} else {
						System.out.println("Received: " + message);
				}
				consumer.close();
				session.close();
				connection.close();
				} catch (Exception e) {
						System.out.println("Caught: " + e);
						e.printStackTrace();
				}
		}
		public synchronized void onException(JMSException ex) {
				System.out.println("JMS Exception occured. Shutting down client.");
		}
}
0

评论区