一 简介
(1)异步消息:
所谓异步消息,跟RMI远程调用、webservice调用是类似的,异步消息也是用于应用程序之间的通信。但是它们之间的区别是:
- RMI、Hession/Burlap、webservice等远程调用机制是同步的。也就是说,当客户端调用远程方法时,客户端必须等到远程方法响应后才能继续执行
- 异步消息,顾名思义消息是异步发送,消息发送者不需要等待消息消费者处理消息,甚至不需要等待消息投递完成就可以继续发送消息。这是因为消息发送者默认消息接收最终可以收到这条消息并进行处理
(2)Java消息服务(JMS):
Java消息服务(Java Message Service,即:JMS)是Java中关于面向消息中间件(MOM)的API,用于在两个应用程序之间或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
JMS是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,其作用类似于JDBC
(3)消息代理(message broker)和目的地(destination):
在异步消息中有两个重要的概念,分别是:消息代理和目的地
当我们在一个应用中发送一条消息时,会将该消息移交给一个消息代理(PS:一般是一些消息中间件,如:ActiveMQ )。在这里,消息代理就类似于邮局,消息代理可以确保消息被投递到指定的目的地,同时解放消息发送者,使其能够继续进行其他业务
同样,每条异步消息在被消息发送者发送时都要指定一个目的地(PS:用于区别不同类型的消息),然后消息接收者就可以根据自己的业务需求从指定的目的地(PS:消息还是在消息中间件存放,目的是是用于区别不同类型的消息)获取自己所需的消息并进行处理
(4)队列(queue)与主题(topic):
i)队列(queue):
队列也即是:点对点消息模型
在点对点模型中,每条消息分别只有一个发送者和接收者。也就是说,当消息代理得到发送者发送的消息时,它会将该消息放入到一个队列中。当某一个消息接收者(PS:同一目的地的消息接收者可能存在多个)请求队列中的下一条消息时,消息会从队列中取出并投递给该接收者。之后该条消息将会从队列中删除,这样就可以保证一条消息只投递给一个接收者
ii)主题(topic):
主题也即是:发布——订阅消息模型
在发布——订阅消息模型中,每条消息可以由多个接收者接收。也就是说,消息不再是只投递给一个接收者,而是主题的所有订阅者都会收到该消息的副本
(5)异步消息的优点:
i)无需等待:
在同步通信中,如果客户端与远程服务频繁通信,或者远程服务响应很慢,就会对客户端应用的性能带来负面影响
当使用JMS发送消息时,客户端不必等待消息被处理,甚至是被投递,客户端只需要将消息发送给消息代理,就可以确信消息会被投递给相应的目的地
因为不需要等待,所以客户端可以继续执行其他任务,这种方式可以有效的节省时间,客户端的性能能够得到极大的提高
ii)面向消息与解耦:
在同步通信中,客户端通过服务接口与远程服务相耦合,如果服务的接口发生变化,那么此服务的所有客户端都需要做相应的改变
当使用JMS发送消息时,发送异步消息是以数据为中心的。这意味着客户端并没有与特定的方法签名绑定,任何可以处理数据的队列或主题订阅者都可以处理由客户端发送的消息,而客户端不必了解远程服务的任何规范
iii)位置独立:
同步RPC服务通常需要网络地址来定位。这意味着客户端无法灵活地适应网络拓扑的改变。如果服务的IP地址改变了,或者服务被配置为监听其他端口,客户端必须进行相应的调整,否则无法访问服务。
与之相反,消息客户端不必知道谁会处理它们的消息,或者服务的位置在哪里。客户端只需要了解需要通过哪个队列或主题来发送消息。因此,只服务能够从队列或主题中获取即可,消息客户端根本不需要关注服务来自哪里
在点对点模型中,可以利用这种位置的独立性来创建消息服务集群。如果客户端不知道服务的位置,并且服务的唯一要求就是可以访问消息代理,那么我们就可以配置多个服务从同一个队列中接收消息。如果服务过载,处理能力不足,我们只需要添加一些新的的服务(接收者)实例来监听相同的队列即可平滑增强其处理能力
在发布一订阅模型中,位置独立性会产生另一种有趣的效应。多个服务可以订阅同一个主题,接收相同消息的副本。但是每一个服务对消息的处理方式却可能不同。例如,假设我们有一组可以共同处理描述新员工信息的消息。一个服务可能会在工资系统中增加该员工,另一个服务则会将新员工增加到公司交流群中,同时还有一个服务为新员工分配内网系统的访问权限。在这里,每一个服务都是基于相同的数据(都是从同一个主题接收而来),但是却各自对数据进行了不同的处理
在上面的内容中,我介绍了一些关于异步消息的基本概念。下面我将介绍基于ActiveMQ框架的JMS消息的发送与接收以及ActiveMQ在Spring框架中的一些常用用法
二 基于JMS的消息发送与接收
(1)ActiveMQ的下载与启动:
在正式开始编写测试实例之前,我们需要做的是ActiveMQ的下载。其官方下载地址是:http://activemq.apache.org/download.html
然后运行:apache-activemq-5.14.1/bin/win64/activemq.bat ,接着保持控制台窗口不关闭,访问:http://127.0.0.1:8161/admin/
注:默认账号密码是:admin/admin
此时,我们可以通过访问菜单中的“ Queues”或者“ Topics”来实时监控队列或者主题类型的消息使用情况
当然,此时因为没有任何消息存在,所以界面是空白的
(2)使用JMS发送消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | package cn.zifangsky.test.base; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息生产者 * @author zifangsky * */ public class JMSProducer { //默认连接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默认连接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默认连接地址 private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { //连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL); try { //连接 Connection connection = connectionFactory.createConnection(); //启动连接 connection.start(); //创建session Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //消息目的地 Destination destination = session.createQueue("hello"); //消息生产者 MessageProducer producer = session.createProducer(destination); //发送消息 for(int i=0;i<10;i++){ //创建一条文本消息 TextMessage message = session.createTextMessage("ActiveMQ:这是第 " + i + " 条消息"); //生产者发送消息 producer.send(message); } session.commit(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } |
运行上面的代码之后可以发现ActiveMQ的队列监控界面出现了变化:
很显然,生成了10条目的地为“hello”的未被消费的消息
(3)使用JMS接收消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | package cn.zifangsky.test.base; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者 * @author zifangsky * */ public class JMSConsumer { //默认连接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默认连接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默认连接地址 private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { //连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL); try { //连接 Connection connection = connectionFactory.createConnection(); //启动连接 connection.start(); //创建session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //消息目的地 Destination destination = session.createQueue("hello"); //消息消费者 MessageConsumer consumer = session.createConsumer(destination); while(true){ TextMessage message = (TextMessage) consumer.receive(); if(message != null){ System.out.println("接收到消息: " + message.getText()); }else{ break; } } session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } |
运行代码之后,输出如下:
1 2 3 4 5 6 7 8 9 10 11 | 接收到消息: ActiveMQ:这是第 0 条消息 接收到消息: ActiveMQ:这是第 1 条消息 接收到消息: ActiveMQ:这是第 2 条消息 接收到消息: ActiveMQ:这是第 3 条消息 接收到消息: ActiveMQ:这是第 4 条消息 接收到消息: ActiveMQ:这是第 5 条消息 接收到消息: ActiveMQ:这是第 6 条消息 接收到消息: ActiveMQ:这是第 7 条消息 接收到消息: ActiveMQ:这是第 8 条消息 接收到消息: ActiveMQ:这是第 9 条消息 |
当然,此时观察ActiveMQ的队列监控界面,可以发现这10条消息已经被消费了
注:上面的代码很简单,并且其思路跟JDBC很类似,因此这里就不做过多解释了
三 Spring框架整合ActiveMQ的常见用法
如果写过很多的JDBC代码的话,可以发现使用基本的JMS来发送和接收消息就跟JDBC代码一样,需要每次写很多冗长重复的代码。
针对如何消除冗长和重复的JMS代码,Spring给出的解决方案是JmsTemplate。JmsTemplate可以创建连接、获得会话以及发送和接收消息。这使得我们可以专注于构建要发送的消息或者处理接收到的消息
另外,JmsTemplate可以处理所有抛出的笨拙的JMsException异常。如果在使用JmsTemplate时抛出JMsException异常,JmsTemplate将捕获该异常,然后抛出一个非检查型异常,该异常是Spring自带的JmsException异常的子类
(1)一个简单的队列类型的消息发送和接收实例:
i)activemq.properties:
1 2 3 | activemq.ip=127.0.0.1 activemq.username=admin activemq.passwd=admin |
这个文件配置了ActiveMQ的地址以及认证的账号密码
ii)在Spring的配置文件中加载上面的配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | <bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="locations"> <list> <value>classpath:jdbc.properties</value> <value>classpath:activemq.properties</value> </list> </property> </bean> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PreferencesPlaceholderConfigurer"> <property name="properties" ref="configProperties" /> </bean> |
iii)ActiveMQ的配置文件context_activemq.xml:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.14.1.xsd"> <context:component-scan base-package="cn.zifangsky.activemq" /> <!-- ActiveMQ 连接工厂 --> <!-- <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://${activemq.ip}:61616" userName="${activemq.username}" password="${activemq.passwd}" /> --> <bean id="amqConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://${activemq.ip}:61616"/> <property name="userName" value="${activemq.username}" /> <property name="password" value="${activemq.passwd}" /> <!-- <property name="trustAllPackages" value="true"/> --> <property name="trustedPackages"> <list> <value>java.lang</value> <value>javax.security</value> <value>java.util</value> <value>org.apache.activemq</value> <value>cn.zifangsky.activemq</value> <value>cn.zifangsky.model</value> </list> </property> </bean> <!-- Spring Caching连接工厂 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="amqConnectionFactory" /> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="100" /> </bean> <!-- 定义Queue类型的JmsTemplate --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory" /> <!-- 非pub/sub模型(发布/订阅),即:队列模型 --> <property name="pubSubDomain" value="false" /> </bean> <!-- 定义Queue监听器 --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="test.queue" ref="testQueueReceiver1"/> <jms:listener destination="test.queue" ref="testQueueReceiver2"/> </jms:listener-container> </beans> |
当然,在web.xml中需要加载该配置文件才行:
1 2 3 4 5 6 7 | <context-param> <param-name>contextConfigLocation</param-name> <param-value> classpath:context/context.xml classpath:context/context_*.xml </param-value> </context-param> |
在上面的context_activemq.xml文件中,首先是定义了自动扫描cn.zifangsky.activemq 这个包下面的注解,在后面配置的两个接收者:testQueueReceiver1、testQueueReceiver2 的bean就是这样被加载进来的
接着,amqConnectionFactory这个bean配置了ActiveMQ的连接参数(PS:通过配置文件加载进来),以及可信任的可以被序列化的类的包路径
再往后,jmsQueueTemplate这个bean配置了一个JmsTemplate的实例,当然这里定义的是一个队列模型
最后,使用jms:listener-container配置了两个消息监听器,其监听的目的地都是“test.queue”,处理的接收者分别是:testQueueReceiver1 和 testQueueReceiver2
iv)消息发送者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | package cn.zifangsky.activemq.producer; import javax.annotation.Resource; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; @Component("queueSender") public class QueueSender { @Resource(name="jmsQueueTemplate") private JmsTemplate jmsTemplate; /** * 发送一条消息到指定队列 * @param queueName 队列名称 * @param message 消息内容 */ public void send(String queueName,final String message){ jmsTemplate.send(queueName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } } |
从上面的代码可以看出,这里仅仅只是使用JmsTemplate的send( )方法创建了一条文本消息
v)两个消息接收者:
QueueReceiver1.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | package cn.zifangsky.activemq.consumer; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.springframework.stereotype.Component; @Component("testQueueReceiver1") public class QueueReceiver1 implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("QueueReceiver1收到消息: " + ((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } } |
QueueReceiver2.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | package cn.zifangsky.activemq.consumer; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.springframework.stereotype.Component; @Component("testQueueReceiver2") public class QueueReceiver2 implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("QueueReceiver2收到消息: " + ((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } } |
vi)测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | package cn.zifangsky.test.springjms; import javax.annotation.Resource; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import cn.zifangsky.activemq.producer.QueueSender; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"classpath:/context/context.xml","classpath:/context/context_activemq.xml"}) public class TestQueue { private final String QUEUENAME = "test.queue"; @Resource(name="queueSender") private QueueSender queueSender; @Test public void test(){ for(int i=0;i<10;i++){ queueSender.send(QUEUENAME, "Hi,这是第 " + (i+1) + " 条消息!"); } } } |
运行这个单元测试方法之后,可以发现输出结果如下:
1 2 3 4 5 6 7 8 9 10 | QueueReceiver2收到消息: Hi,这是第 1 条消息! QueueReceiver1收到消息: Hi,这是第 2 条消息! QueueReceiver2收到消息: Hi,这是第 3 条消息! QueueReceiver1收到消息: Hi,这是第 4 条消息! QueueReceiver2收到消息: Hi,这是第 5 条消息! QueueReceiver1收到消息: Hi,这是第 6 条消息! QueueReceiver2收到消息: Hi,这是第 7 条消息! QueueReceiver1收到消息: Hi,这是第 8 条消息! QueueReceiver2收到消息: Hi,这是第 9 条消息! QueueReceiver1收到消息: Hi,这是第 10 条消息! |
从上面的输出结果可以看出,队列类型的消息只能被某一个接收者接收并处理
(2)简化代码:
上面的例子很显然在发送和接收消息的时候写的代码要比纯粹的JMS要少很多,那么是不是就真的没有更简洁的代码了呢?
Pages: 1 2