侧边栏壁纸
  • 累计撰写 781 篇文章
  • 累计创建 1 个标签
  • 累计收到 1 条评论
标签搜索

Java Message Service(JMS)

Dettan
2021-04-10 / 0 评论 / 0 点赞 / 115 阅读 / 7,299 字
温馨提示:
本文最后更新于 2022-04-30,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。
SUN公司提出的用以统一各个MOM系统的接口.
Java Message Service(JMS)是SUN 提出的旨在统一各种MOM(Message Oriented Middleware面向消息的中间件) 系统接口的规范,它包含点对点(Point to Point,PTP)和发布/订阅(Publish/Subscribe,pub/sub)两种消息模型,提供可靠消息传输、事务和消息过滤等机制。


术语
PTP:Point to Point,即点对点的消息模型;
Pub/Sub:Publish/Subscribe,即发布/订阅的消息模型;
Topic:主题目标;
Acknowledge:签收;


ConnectionFactory 对象创建一个连接,向消息服务发送消息以及从消息服务接收消息均是通过此连接来进行。
Connection是客户端与消息服务的活动连接。创建连接时,将分配通信资源以及验证客户端。这是一个相当重要的对象,大多数客户端均使用个连接来进行所有的消息传送。连接用于创建会话。
Session 是一个用于生成和使用消息的单线程上下文。它用于创建发送的生产者和接收消息的消费者,并为所发送的消息定义发送顺序。

生产者
客户端使用 MessageProducer 向指定的物理目标(在 API 中表示为目标身份对象)发送消息。
生产者可指定一个默认传送模式(持久性消息与非持久性消息)、优先级有效期值,以控制生产者向物理目标发送的所有消息。

消费者
客户端使用 MessageConsumer 对象从指定的物理目标(在 API 中表示为目标对象)接收消息。
消费者可使用消息选择器,借助它,消息服务可以只向消费者发送与选择标准匹配的那些消息。
消费者可以支持同步或异步消息接收。异步使用可通过向消费者注册MessageListener 来实现。
消费方式:
同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。
异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。 实现MessageListener接口,在MessageListener()方法中实现消息的处理逻辑。



会话通过大量确认选项或通过事务来支持可靠传送。


JMS 对象是否支持并发
Destination 是
ConnectionFactoRy 是
Connection 是
Session 否
MessageProducer 否
MessageConsumer 否
模式
PTP 点对点
多对一,一对一
目标是一个队列

Pub/Sub 发布与订阅(Topic )
多对多
目标是一个主题
支持持久订阅,即消息一定会发送到持久订阅者. 即会保存消息
topic没有订阅者在线&&没有持久订阅者是丢掉接收到的消息.

恢复和重新派送(Recovery
andRedelivery)
非持久订阅状态下,不能恢复或重新派
送一个未签收的消息。只有持久订阅才
能恢复或重新派送一个未签收的消息。


消息头(Header)
消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性如:
JMSDestination,JMSMessageID 等。

由谁设置
JMSDestination send方法
JMSDeliveryMode send方法
JMSExpiration send方法
JMSPriority send方法
JMSMessageID send方法
JMSTimestamp 客户端
JMSCorrelationID 客户端
JMSReplyTo 客户端
JMSType 客户端
JMSRedelivered JMS Provider
标准的 JMS 消息头包含以下属性:
消息头+描述 +分配方式
JMSDestination
消息发送的目的地:主要是指Queue和Topic。
自动
JMSDeliveryMode
传送模式有两种模式 :持久模式和非持久模式。一条持久性的消息应该被传送“一次仅仅一次”,这就意味者如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。一条非持久的消息最多会传送一次,这意味这服务器出现故障,该消息将永远丢失。
自动
JMSExpiration
消息过期时间,等于Destination 的send 方法中的timeToLive值加上发送时刻的GMT 时间值。如果timeToLive值等于零,则JMSExpiration 被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。
自动
JMSPriority 消息优先级,
从 0-9 十个级别,0-4 是普通消息,5-9 是加急消息。JMS 不要求JMS Provider严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达。默认是4级。
自动
JMSMessageID
唯一识别每个消息的标识,由JMS Provider 产生。
自动
JMSTimestamp
一个JMS Provider在调用send()方法时自动设置的。它是消息被发送和消费者实际接收的时间差。
自动
JMSCorrelationID
用来连接到另外一个消息,典型的应用是在回复消息中连接到原消息。在大多数情况下,用于将一条消息标记为对JMSMessageID标示的上一条消息的应答,不过,JMSCorrelationID可以是任何值,不仅仅是JMSMessageID。
开发者设置
JMSTimestamp
一个消息被提交给JMS 自动Provider 到消息被发出的时间。
JMSReplyTo
提供本消息回复消息的目的地址。
JMSType
消息类型的识别符。
开发者设置
JMSRedelivered
如果一个客户端收到一个设置了 JMSRedelivered 属性的消息,则表示可能客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged)。如果该消息被重新传送,JMSRedelivered=true 反之,JMSRedelivered =false。

消息体

JMS API 定义了5 种消息体格式,也叫消息类型,可以使用不同形式发送接收数
据并可以兼容现有的消息格式,下面描述这5 种类型:

1.
TextMessage java.lang.String 对象,如xml 文件内容。
2.
MapMessage 名/值对的集合,名是String 对象,值类型可以是Java 任何基本类型。
3.
BytesMessage 字节流。
4.
StreamMessage Java 中的输入输出流。
5.
ObjectMessage Java 中的可序列化对象。
6.
Message 没有消息体,只有消息头和属性。



消息属性
包括以下三中类型的属性
A. 应用程序特定的属性。例如:
TextMessage message=session.createTextMessage();
Message.setStringProperty(“username”,username);
还有一些自动设置的属性.


消息的确认
1.
如果会话是事务性的,那么消息确认自动由commit 处理,且恢复自动由rollback 处理。
1.创建事务createSession(paramA,paramB); paramA是设置事务的,paramB设置acknowledgment mode(应答模式) paramA设置为false时:paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE其中一个。 2.事务的应答确认 A)paramA设置为true时: paramB的值忽略, acknowledgment mode被jms服务器设置 SESSION_TRANSACTED 。 当一个事务被提交的时候,消息确认就会自动发生。 B) paramA设置为false时: Session.AUTO_ACKNOWLEDGE为自动确认,当客户成功的从receive方法返回的时候,或者从 MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。 Session.CLIENT_ACKNOWLEDGE 为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的 acknowledge方法。jms服务器才会删除消息。(默认是批量确认) DUPS_OK_ACKNOWLEDGE 允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会 话对象就会确认消息的接收,而且允许重复确认。如果是重复的消息,那么JMS provider必须把消息头 的JMSRedelivered字段设置为true。
2. 如果会话不是事务性的,有三个确认选择,且手工处理恢复。
 DUPS_OK_ACKNOWLEDGE——这个选项告诉会话懒惰确认消息的传递。如果JMS失败,这很可能造成传递重复消息,因此这个选项只用于可以忍受重复消息的消费者。它的好处是减少了会话为防止重复所要做的工作。
 AUTO_ ACKNOWLEDGE——使用这个选项,当消息被成功地从调用接收返回或处理消息的MessageListener 成功返回时,会话自动确认客户端的消息接收。
 CLIENT_ ACKNOWLEDGE——使用这个选项,客户端通过调用消息的acknowledge方法来确认消息。确认一个被消费的消息会自动确认被该会话转发的所有消息。当使用CLIENT_ ACKNOWLEDGE 模式时,客户端可以在处理它们时产生大量未确认消息。JMS 提供商应当为管理员提供限制客户端超量运行的途径,以便客户端不会造成资源耗尽并保证当它们使用的资源被临时阻塞时造成失败。会话的recover 方法用于停止一个会话然后使用第一个未确认消息来重新启动它。事实上,会话的被转发消息序列被重新设置到最后一个确认消息之后。现在转发的消息序列可以与起初转发的消息序列不同,因为消息到期和收到更高优先级的消息。会话必须设置消息的redelivered 标记,表示它是由于恢复而被重新转发。

通信协议选择
1.
TCP
稳定可靠
支持各种平台
高效,用字节流传输.
2.
NIO
NIO比tcp更好的性能
适用于有大量client的时候,NIO不会新建线程,不会被操作系统的最大线程数所限制.
3.
UDP
ActiveMQ 通过防火墙时,你只能用 UDP。 
如果你想尽可能的减少传递延迟,快速的传递数据。
4.
HTTP(S)
穿透防火墙
建立在TCP之上的协议
5.
VM
虚拟机内传输,不用网络直接本地调用.
6.
SSL
基于TCP之上的安全传输协议


集群建立



二、关闭链接activeMQ发现TCP链接的关闭,最关键的代码在TcpBufferedInputStream类中的int n = in.read(buffer, position, buffer.length - position);
三、心跳 为了更好的维护TCP链路的使用,activeMQ采用了心跳机制作为判断双方链路的健康情况。activeMQ使用的是双向心跳,也就是activeMQ的Broker和Client双方都进行相互心跳,但不管是Broker或Client心跳的具体处理情况是完全一样的,都在InactivityMonitor类中实现,下面具体介绍。 心跳会产生两个线程“InactivityMonitor ReadCheck”和“InactivityMonitor WriteCheck”,它们都是Timer类型,都会隔一段固定时间被调用一次。ReadCheck线程主要调用的方法是readCheck(),当在等待时间内,有消息接收到,则该方法会返回true。WriteCheck线程主要调用的方法是writeCheck(),这有个小技巧,大家可以参考一下,那就是当WriteCheck线程休眠时,有任何数据发送成功,则该线程被唤醒后,不用通过TCP向对方真的发送心跳消息,这样可以从一定程度上减少网络传输的数据量。

ActiveMQ模型分析
首先介绍该模型中每个领域类的作用,然后再介绍它们之间的关系。 Broker:activeMQ的一个整体代表 RegionBroker:负责分发broker的操作到相应的消息区域 Region:activeMQ目前有四种主要消息区域:队列域(queueRegion)、主 题域(topicRegion)、临时队列域(tempQueueRegion)、临时主题域 (tempTopicRegion) TransportConnection:代表一个通讯连接 Destination:消息的目的地,主要包括两种Queue、Topic两种 Subscription:消息的消费者、订阅者 MessageStore:消息持久化存储,象比较复杂的Kaha存储机制就放在这 PendingMessageCursor:等待发给消费者的消息分发指针 ConnectionContext:用来维护发送请求所需的连接上下文


1.配置JMS连接最大闲置时间(消息服务器无消息) jmsBrokerURL = tcp://218.241.100.165:61616?wireFormat.maxInactivityDuration=90000 该wireFormat.maxInactivityDuration = 90000的默认值是30000ms wireFormat.maxInactivityDuration=0 这样的参数, wireFormat.maxInactivityDuration是心跳参数。 避免ActiveMQ在一段时间没有消息发送时抛出 "Channel was inactive for too long"异常。 2. maxReconnectDelay 最大重连间隔 failover:(tcp://127.0.0.1:61616?wireFormat.maxInactivityDuration=10000);maxReconnectDelay=10000 failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100 failover 失效备援 maxReconnectDelay=10000 最大重连间隔 3.设置异步发送消息 tcp://localhost:61616?jms.useAsyncSend=true tcp://localhost:61616?jms.prefetchPolicy.all=100&jms.redeliveryPolicy.maximumRedeliveries=5 4.客户端消息缓存的数量 tcp://localhost:61616?jms.prefetchPolicy.all=50 ##设置客户端最多缓存50条消息 5.客户端的预支取策略。 tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1


1.保障Jms连接 使用失效备援机制,和间隔自动重试机制,程序控制等方面来控制。 failover:(tcp://localhost:61616)?initialReconnectDelay=100&;maxReconnectAttempts=5 failover transport是一种重新连接机制,用于建立可靠的传输。此处配置的是一旦ActiveMQ broker中断,Listener端将每隔100ms自动尝试连接,直至成功连接或重试5次连接失败为止。 failover还支持多个borker同时提供服务,实现负载均衡的同时可增加系统容错性,格式:failover:(uri1,...,uriN)?transportOptions failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false failover:(uri1,...,uriN)?transportOptions failover:uri1,...,uriN failover:(tcp://localhost:61616) 2. JMSRedelivered如果这个值为true,表示消息是被重新发送了。因为有时消费者没有确认他已经收到消息或者JMS提供者不确定消费者是否已经收到。 3.JMSExpiration 允许消息过期, setTimeToLive()设置消息的有效期。


"failover:(tcp://IPAddress1:61616,tcp://IPAddress1:61616)?initialReconnectDelay=100&maxReconnectAttempts=5"; 后面的参数initialReconnectDelay=100&maxReconnectAttempts=5“对每一个连接URI是通用的。 如果没有指定URI的获取方式,activeMQ会自动选择其中的一个URI来尝试建立连接(randomize 指定随机),获取连接后,ActiveMQ会维护连接的暂停和恢复。 以上面的URL为例,说明failOver的重连机制: a. IPAddress1, IPAddress2上的broker1,broker2都正常运行,创建的Connection会使用IPAddress1的broker1来发送消息,这时不激活消费者。 b.关闭broker1,Connection会自动切换到broker2的URI上来发送消息。 c. 激活消费者,消费者会先尝试broker1,由于broker1不可用,使用broker2来收消息, 这时只能收到broker2上的消息。 d.再重新启动broker1,生产者,和消费者都仍然使用broker2来发送和接受消息。 e. 关闭broker2,生产者和消费者都会自动切换到broker1上,消费者就收到之前broker发送的消息了。
0

评论区