一 简介
我们常听说的MQ中间件通常有ActiveMQ、RabbitMQ、Kafka等等。除此之外,Redis也可以用作基于“发布/订阅”模型的消息推送,不过Redis实现的是一种简单的消息队列,不仅在可靠性方面比不上其他专业的消息中间件,而且Redis的消息推送也不支持Topic分组、点对点模型的消息队列。
然而,如果我们已经在项目中使用Redis作数据缓存,同时我们的消息推送数量也不大,对可靠性要求也不是特别高,那么我们就可以使用Redis来实现消息队列了。
注:
- 关于消息队列的基本概念可以参考我之前的这篇文章:https://www.zifangsky.cn/815.html
- 关于Kafka实现消息推送的基本用法可以参考我的这篇文章:https://www.zifangsky.cn/1231.html
二 Spring Data Redis实现基于“发布/订阅”模型的消息队列
(1)在pom.xml文件中添加相关依赖:
1 2 3 4 | <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> |
(2)添加一个消息接收者:
这个消息接收者只是一个简单的POJO,主要包含一个怎么处理接收到的消息的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | package cn.zifangsky.model; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.MessageFormat; /** * Redis message receiver * * @author zifangsky * @date 2018/10/15 * @since 1.0.0 */ public class Receiver { private final Logger logger = LoggerFactory.getLogger(getClass()); public void receiveMessage(User message) { logger.info(MessageFormat.format("Received Message: {0}", message)); } } |
需要注意的是,receiveMessage方法中cn.zifangsky.model.User类型的参数“message”表示我们在后面发送到Redis中的消息类型也是cn.zifangsky.model.User类型,二者需要一一对应(或者也使用Object接收)。
(3)Redis相关的JavaConfig配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | package cn.zifangsky.config; import cn.zifangsky.model.Receiver; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisClusterConfiguration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; import java.util.Arrays; /** * Redis相关配置 * * @author zifangsky * @date 2018/7/30 * @since 1.0.0 */ @Configuration @ConditionalOnClass({JedisCluster.class}) public class RedisConfig { @Value("${spring.redis.timeout}") private String timeOut; @Value("${spring.redis.cluster.nodes}") private String nodes; @Value("${spring.redis.cluster.max-redirects}") private int maxRedirects; @Value("${spring.redis.jedis.pool.max-active}") private int maxActive; @Value("${spring.redis.jedis.pool.max-wait}") private int maxWait; @Value("${spring.redis.jedis.pool.max-idle}") private int maxIdle; @Value("${spring.redis.jedis.pool.min-idle}") private int minIdle; @Bean public JedisPoolConfig jedisPoolConfig(){ JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(maxActive); config.setMaxIdle(maxIdle); config.setMinIdle(minIdle); config.setMaxWaitMillis(maxWait); return config; } @Bean public RedisClusterConfiguration redisClusterConfiguration(){ RedisClusterConfiguration configuration = new RedisClusterConfiguration(Arrays.asList(nodes)); configuration.setMaxRedirects(maxRedirects); return configuration; } /** * JedisConnectionFactory */ @Bean public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration configuration,JedisPoolConfig jedisPoolConfig){ return new JedisConnectionFactory(configuration,jedisPoolConfig); } /** * 使用Jackson序列化对象 * @author zifangsky * @date 2018/7/30 16:16 * @since 1.0.0 * @return org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer<java.lang.Object> */ @Bean public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){ Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(objectMapper); return serializer; } /** * RedisTemplate */ @Bean public RedisTemplate<String, Object> redisTemplate(JedisConnectionFactory factory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){ RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(factory); //字符串方式序列化KEY StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringRedisSerializer); redisTemplate.setHashKeySerializer(stringRedisSerializer); //JSON方式序列化VALUE redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * Redis message receiver */ @Bean public Receiver redisMessageReceiver(){ return new Receiver(); } /** * 消息监听器 */ @Bean MessageListenerAdapter messageListenerAdapter(Receiver receiver, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){ //消息接收者以及对应的默认处理方法 MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver, "receiveMessage"); //消息的反序列化方式 messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer); return messageListenerAdapter; } /** * message listener container */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory , MessageListenerAdapter messageListenerAdapter){ RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //添加消息监听器 container.addMessageListener(messageListenerAdapter, new PatternTopic("topic-test")); return container; } } |
从上面代码可以看出,这里添加了一个消息监听器,监听的Topic是“topic-test”,对应的消息处理就是我们上面定义的cn.zifangsky.model.Receiver类。
注:上面代码中使用的参数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 | spring: #redis redis: cluster: nodes: namenode22:6379,datanode23:6379,datanode24:6379 max-redirects: 6 timeout: 300000 jedis: pool: max-active: 8 max-wait: 100000 max-idle: 8 min-idle: 0 |
(4)测试“发送/接收”消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | package cn.zifangsky.test; import cn.zifangsky.mapper.UserMapper; import cn.zifangsky.model.User; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.test.context.junit4.SpringRunner; /** * 测试Redis的基本操作 * * @author zifangsky * @date 2018/7/30 * @since 1.0.0 */ @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class TestRedis { @Autowired private UserMapper userMapper; @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 测试基于Redis的消息队列 */ @Test public void testMessagingwithRedis() throws InterruptedException { //查询用户 User user = userMapper.selectByPrimaryKey(1); User user2 = userMapper.selectByPrimaryKey(2); //发送消息 redisTemplate.convertAndSend("topic-test", user); redisTemplate.convertAndSend("topic-test", user2); //暂停10秒钟,观察效果 Thread.sleep(10 * 1000); } } |
运行测试用例后,效果如下:
参考: