Spring Boot使用Redis進行消息的發布訂閱

由David發表在天碼營

今天來學習如何利用Spring Data對Redis的支持來實現消息的發布訂閱機制。發布訂閱是一種典型的非同步通信模型,可以讓消息的發布者和訂閱者充分解耦。在我們的例子中,我們將使用StringRedisTemplate來發布一個字元串消息,同時基於MessageListenerAdapter使用一個POJO來訂閱和響應該消息。

提示

事實上,RedisRedis 不僅提供一個NoSQL資料庫,同時提供了一套消息系統。

環境準備

開發環境:

  • IDE+Java環境(JDK 1.7或以上版本)
  • Maven 3.0+(Eclipse和Idea IntelliJ內置,如果使用IDE並且不使用命令行工具可以不安裝)

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">n <modelVersion>4.0.0</modelVersion>nn <groupId>com.tianmaing</groupId>n <artifactId>redis-message</artifactId>n <version>0.0.1-SNAPSHOT</version>n <packaging>jar</packaging>nn <name>redis-message</name>n <description>Demo of message processing by redis</description>nn <parent>n <groupId>org.springframework.boot</groupId>n <artifactId>spring-boot-starter-parent</artifactId>n <version>1.2.5.RELEASE</version>n <relativePath/>n </parent>nn <properties>n <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>n <java.version>1.8</java.version>n </properties>nn <dependencies>n <dependency>n <groupId>org.springframework.boot</groupId>n <artifactId>spring-boot-starter</artifactId>n </dependency> n <dependency>n <groupId>org.springframework.boot</groupId>n <artifactId>spring-boot-starter-redis</artifactId>n </dependency>n </dependencies>nn <build>n <plugins>n <plugin>n <groupId>org.springframework.boot</groupId>n <artifactId>spring-boot-maven-plugin</artifactId>n </plugin>n </plugins>n </build>nn</project>n

通過配置spring-boot-starter-redis依賴,把Spring Boot對Redis的相關支持引入進來。

創建Redis消息的接收者

在任何一個基於消息的應用中,都有消息發布者和消息接收者(或者稱為消息訂閱者)。創建消息的接收者,我們只需一個普通POJO,在POJO中定義一個接收消息的方法即可:

package com.tianmaying.springboot.redisdemo;nnimport java.util.concurrent.CountDownLatch;nnimport org.slf4j.Logger;nimport org.slf4j.LoggerFactory;nimport org.springframework.beans.factory.annotation.Autowired;nnpublic class Receiver {n private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);nn private CountDownLatch latch;nn @Autowiredn public Receiver(CountDownLatch latch) {n this.latch = latch;n }nn public void receiveMessage(String message) {n LOGGER.info("Received <" + message + ">");n latch.countDown();n }n}n

這個Receiver類將會被註冊為一個消息監聽者時。處理消息的方法我們可以任意命名,我們有相當大的靈活性。

我們給Receiver的構造函數通過@AutoWired標註注入了一個CountDownLatch實例,當接收到消息時,調用cutDown()方法。

註冊監聽者和發送消息

Spring Data Redis提供基於Redis發送和接收消息的所有需要的組件,我們只需要配置好三個東西:

  • 一個連接工廠(connection factory)
  • 一個消息監聽者容器(message listener container)
  • 一個Redis的模板(redis template)

我們將通過Redis模板來發送消息,同時將Receiver註冊給消息監聽者容器。連接工廠將兩者連接起來,使得它們可以通過Redis伺服器通信。如何連接呢? 我們將連接工廠實例分別注入到監聽者容器和Redis模板中即可。

package com.tianmaying.springboot.redisdemo;nnimport java.util.concurrent.CountDownLatch;nnimport org.slf4j.Logger;nimport org.slf4j.LoggerFactory;nimport org.springframework.boot.SpringApplication;nimport org.springframework.boot.autoconfigure.SpringBootApplication;nimport org.springframework.context.ApplicationContext;nimport org.springframework.context.annotation.Bean;nimport org.springframework.data.redis.connection.RedisConnectionFactory;nimport org.springframework.data.redis.core.StringRedisTemplate;nimport org.springframework.data.redis.listener.PatternTopic;nimport org.springframework.data.redis.listener.RedisMessageListenerContainer;nimport org.springframework.data.redis.listener.adapter.MessageListenerAdapter;nn@SpringBootApplicationnpublic class App {nn private static final Logger LOGGER = LoggerFactory.getLogger(App.class);nn @Beann RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,n MessageListenerAdapter listenerAdapter) {nn RedisMessageListenerContainer container = new RedisMessageListenerContainer();n container.setConnectionFactory(connectionFactory);n container.addMessageListener(listenerAdapter, new PatternTopic("chat"));nn return container;n }nn @Beann MessageListenerAdapter listenerAdapter(Receiver receiver) {n return new MessageListenerAdapter(receiver, "receiveMessage");n }nn @Beann Receiver receiver(CountDownLatch latch) {n return new Receiver(latch);n }nn @Beann CountDownLatch latch() {n return new CountDownLatch(1);n }nn @Beann StringRedisTemplate template(RedisConnectionFactory connectionFactory) {n return new StringRedisTemplate(connectionFactory);n }nn public static void main(String[] args) throws InterruptedException {nn ApplicationContext ctx = SpringApplication.run(App.class, args);nn StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class);n CountDownLatch latch = ctx.getBean(CountDownLatch.class);nn LOGGER.info("Sending message...");n template.convertAndSend("chat", "Hello from Redis!");nn latch.await();nn System.exit(0);n }n}n

連接工程我們使用Spring Boot默認的RedisConnectionFactory,是Jedis Redis庫提供的JedisConnectionFactory實現。

我們將在listenerAdapter方法中定義的Bean註冊為一個消息監聽者,它將監聽chat主題的消息。

因為Receiver類是一個POJO,要將它包裝在一個消息監聽者適配器(實現了MessageListener介面),這樣才能被監聽者容器RedisMessageListenerContainer的addMessageListener方法添加到連接工廠中。有了這個適配器,當一個消息到達時,就會調用receiveMesage()`方法進行響應。

就這麼簡單,配置好連接工廠和消息監聽者容器,你就可以監聽消息啦!

發送消息就更簡單了,我們使用StringRedisTemplate來發送鍵和值均為字元串的消息。在main()方法中我們創建一個Spring應用的Context,初始化消息監聽者容器,開始監聽消息。然後獲取StringRedisTemplate的實例,往chat主題發送一個消息。我們看到,消息可以被成功的接收到並列印出來,搞定!

歡迎關注天碼營微信公眾號: TMY-EDU

小編重點推薦:

Spring MVC實戰入門訓練

Spring Data JPA實戰入門訓練

Java Web實戰訓練

Node.js全棧開發

更多精彩內容請訪問天碼營網站
推薦閱讀:

Redis服務支持5000萬的QPS,有什麼好的思路?
如何評價360開源的pika項目?
redis+mysql有幾種用法?
redis使用消息隊列的場合?

TAG:Spring | SpringBoot | Redis |