十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
一、JMS基本概念
创新互联-专业网站定制、快速模板网站建设、高性价比漳平网站开发、企业建站全套包干低至880元,成熟完善的模板库,直接使用。一站式漳平网站制作公司更省心,省钱,快速模板网站建设找我们,业务覆盖漳平地区。费用合理售后完善,十年实体公司更值得信赖。
二、 jms的消息结构
消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性如下
1.JMS Destination:由send方法设置
2.JMSDeliveryMode:由send方法设置
3.JMSExpiration:由send方法设置
4.JMSPriority:由send方法设置
5.JMSMessageID:由send方法设置
6.JMSTimestamp:由客户端设置
7.JMSCorre1ationID:由客户端设置
8.JMSReplyTo:由客户端设置
9.JMSType:由客户端设置
10.JMSRedelivered:由JMS Provider设置
标准的JMS消息头包含以下属性:
1:JMSDestination:消息发送的目的地:主要是指Queue和Topic,自动分配
2:JMSDe1iveryMode:传送模式。有两种:持久模式和非持久模式。一条持久性的消息应该被传送“一次仅仅一次”,这就意味者如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。一条非持久的消息最多会传送一次,这味这服务器出现故障,该消息将永远丢失。自动分配
3:JMSExpiration:消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。如果timeToLive值等于零,则JMSExpiration被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。自动分配
4:JMSPriority:消息优先级,从0-9十个级别,0到是普通消息,5一9是加急消息。JMS不要求JMS Provider严格接照这十个优先级发送消息,但必须保证加寻消息要先于普通消息到达。默认是4级。自动分配
5:JMSMessageID:唯一识别每个消息的标识,由JMS Provider产生。自动分配
6:JMSTimestamp:一个JMS Provider在调用send()方法时自动设置的,它是消息被发送和消费者实际接收的时间差。自动分配
7:JMSCorre1ationID:用来连接到另外一个消息,典型的应用是在回复消息中连接到原消息。在大多数情况下,JMSCorre1ationID用于将一条消息标记为对JMSMessageID标示的上条消息的应答,不过,JMSCorre1ationID可以是任何值,不仅仅是JMSMessageID。由开发者设置
8:JMSReplyTo:提供本消息回复消息的目的地。由开发者设置
9:JMSType:消息类类型的识别符。由开发者设置
10:JHSRedelivered:如果一个客户端收到一个设置了JMSRede1ivered属性的消息,则表示可能客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged),如果该消息被重新传送,JMSRede1ivered=true反之,
JMSRedelivered=false。自动设置
JMS定义的属性如下:
1:JMSXUserID:发送消息的用户标识,发送时提供商设置
2:JMSXAppID:发送消息的应用标识,发送时提供商设置
3:JMSXDe1iveryCount:转发消息重试次数,第一次是1,第二次是2,...。发送时提供商没置
4:JMSXGroupID:消息所在消息组的标识,由客户端设置
5:JMSXGroupSeq::组内消息的序号,第一个小时是1,第二个是2,...。由客户端设置
6:JMSXProducerTXID:产生消息的事务的事务标识,发送时提供商设置。
7:JMSXConsumerTXID:消费消息的事务的事务标识,接收时提供商设置
8:JMSXRcvTimestamp:JMS转发消息到消费者的时间,接收时提供商设置
9:JMSXState:假定存在一个消息仓库,它存储了每个消息的的单独拷贝,且这些消息从原始消息被发送时开始。每个拷贝的状态有:1(等待),2(准备〕,3(到期)或,4(保留)。由于状态与生产者和消费者无关,所以它不是由它们来提供。它只和在仓库中查找消息相关,因此JMS没有提以这种API。由提供商设置。
三、JMS的可靠性机制
持久订阅
首先消息生产者必须使用PERSISTENT提交消息。客户可以通过会话上的
createDurab1eSubscriber方法来创建一个持久订阅,该方法的第一个参数必须
是一个topic。第二个参数是订阅的名称。
JMS provider会存储发布到持久订阅对应的topic上的消息。如果最初创建
持久订阅的客户或者任何其它客户,使用相同的连接工厂和连接的客户ID,相同的主题和相同的到订阅名,再次调用会话上的createDurab1eSubscriber方法,那么持久订阅就会被激活。JMS provider会向客户发送客户处于非激活状态时所发布的消息。
持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一直保留,直到应用程序调用会话上的unsubscribe方法。
四、JMS的PTP模型
五、JMS的Pub/Sub模型
七、JMS应用开发的基本步骤
八、非持久的Topic消息示例
对于非持久的Topic消息的发送
基本跟前面发送队列信息是一样的,只是把创建Destination的地方,由创
建队列替换成创建Topic,例如:
Destination destination = session.createTopic(MyTopic");
3:由于不知道客户端发送多少信息,因此改成while循环的方式了,例如:
Message message = consumer.receive();
while(message!=null){
TextMessage txtMsg= (TextMessage)message;
System.out.println(“收到消息:”+txtMsg.getText());
message = consumer.receive(1000L);
}
完整示例
public class NoPersistenceSender {
public static void main(String[] args) throws Exception{
//连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
//带事务的session
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("myTopic");
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("message---"+i);
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
}
public class NoPersistenceReceive {
public static void main(String[] args) throws Exception{
//连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
//带事务的session
final Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("myTopic");
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive();
while(message!=null){
TextMessage txtMsg= (TextMessage)message;
System.out.println("收到消息:"+txtMsg.getText());
message = consumer.receive(1000L);
}
session.commit();
session.close();
connection.close();
}
}
注意:对于非持久性的Topic消息,则需要接收者保持运行状态,不然消息发送者发出的消息会接收不到。
九、持久的Topic消息示例
1:需要在连接上设置消费者ID,用来识别消费者
2:需要创建TopicSubscriber来订阅
3:要设置好了过后再start这个connnection
4:消费者一定要先运行一次,等于向消息服务中间件注册这个消费者,然后再运行客户端发送消息,这个时候无论消费者是否在线,都会接收到,下次连接的时候,会把没有收过的消息都接收下来。
public class PersistenceSender {
public static void main(String[] args) throws Exception{
//连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
//带事务的session
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("myTopic");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("message---"+i);
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
}
public class PersistenceReceive {
public static void main(String[] args) throws Exception{
//连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.setClientID("cc1");
//带事务的session
final Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic("myTopic");
TopicSubscriber ts = session.createDurableSubscriber(destination,"T1");
connection.start();
Message message = ts.receive();
while(message!=null){
TextMessage txtMsg= (TextMessage)message;
System.out.println("收到消息:"+txtMsg.getText());
message = ts.receive(1000L);
}
session.commit();
session.close();
connection.close();
}
}
十、总结