Rabbitmq系列之4--Springboot延遲隊列實現
/** * 申明死信隊列 * * @author:xiongyongjie * @date:2018年3月17日 * @description: */@Componentpublic class RabbmqDlxConfiguration { // 死信隊列 private static String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange"; private static String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; // 死信隊列 private static final String DL_QUEUE = "DL_QUEUE"; private static final String DL_EXCHANGE = "DL_EXCHANGE"; private static final String REDIRECT_QUEUE = "REDIRECT_QUEUE"; private static final String KEY_R = "KEY_R"; private static final String DL_KEY = "DL_KEY"; /** * 聲明一個死信隊列.DL_QUEUE x-dead-letter-exchange 對應 死信交換機 * x-dead-letter-routing-key 對應 死信隊列 * * @return the queue */ @Bean("deadLetterQueue") public Queue deadLetterQueue() { Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 聲明 死信交換機 args.put(DEAD_LETTER_QUEUE_KEY, DL_EXCHANGE); // x-dead-letter-routing-key 聲明 死信路由鍵 args.put(DEAD_LETTER_ROUTING_KEY, KEY_R); return QueueBuilder.durable(DL_QUEUE).withArguments(args).build(); } /** * 定義死信隊列轉發隊列. * * @return the queue */ @Bean("redirectQueue") public Queue redirectQueue() { return QueueBuilder.durable(REDIRECT_QUEUE).build(); } /** * 死信隊列跟交換機類型沒有關係 不一定為directExchange 不影響該類型交換機的特性. * * @return the exchange */ @Bean("deadLetterExchange") public Exchange deadLetterExchange() { return ExchangeBuilder.directExchange(DL_EXCHANGE).durable(true).build(); } /** * 死信路由通過 DL_KEY 綁定鍵綁定到死信隊列上. * * @return the binding */ @Bean public Binding deadLetterBinding() { return new Binding(DL_QUEUE, Binding.DestinationType.QUEUE, DL_EXCHANGE, DL_KEY, null); } /** * 死信路由通過 KEY_R 綁定鍵綁定到死信隊列上. * * @return the binding */ @Bean public Binding redirectBinding() { return new Binding(REDIRECT_QUEUE, Binding.DestinationType.QUEUE, DL_EXCHANGE, KEY_R, null); }}
死信隊列數據發送
@GetMapping("dlxMsg") public void deadQueue() { for (int i = 0; i < 200; i++) { // CorrelationData correlationData = new // CorrelationData(UUID.randomUUID().toString()); // 聲明消息處理器 這個對消息進行處理 可以設置一些參數 對消息進行一些定製化處理 我們這裡 來設置消息的編碼 以及消息的過期時間 // 因為在.net 以及其他版本過期時間不一致 這裡的時間毫秒值 為字元串 DlxMsg messagePostProcessor = new DlxMsg(10000l); String p = "dlx msg:" + i; // 向DL_QUEUE 發送消息 10*1000毫秒後過期 形成死信 template.convertAndSend("DL_EXCHANGE", "DL_KEY", p, messagePostProcessor); } }
死信隊列消息體定義
/** * 延遲消息定義 * * @author:xiongyongjie * @date:2018年3月16日 * @description: */public class DlxMsg implements MessagePostProcessor { private final Long ttl; // 毫秒 public DlxMsg(Long ttl) { this.ttl = ttl; } @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(ttl.toString()); // 設置per-message的失效時間 return message; }}
處理流程
1. 數據進入死信隊列中
2. 死信隊列超時後通過配置的轉發路由和路由key轉發到後續處理隊列
3. 後續處理隊列接受數據
4. 監聽後續處理隊列的消息並處理
安安熊:Rabbitmq系列之1--基礎概念安安熊:Rabbitmq系列之2--springboot整合安安熊:Rabbitmq系列3--延遲隊列安安熊:Rabbitmq系列之4--Springboot延遲隊列實現
推薦閱讀:
※Spring Cloud架構教程 (一)Hystrix監控面板
※Spring(3):AOP
※Spring 數據訪問(DAO層) 總結
※Springmvc之接受請求參數
※初識Spring ^_^ !