rabbitmq-延遲隊列
最近有一個需要實現類似微信支付的回調,即對後台通知交互時,如果微信收到商戶的應答不是成功或超時,微信認為通知失敗,微信會通過一定的策略定期重新發起通知,儘可能提高通知的成功率,但微信不保證通知最終能成功。
(通知頻率為15/15/30/180/1800/1800/1800/1800/3600,單位:秒)那麼如何做到通知頻率的呢?我們首先想到的是rabbitmq的延遲隊列,rabbitmq是個成熟的產品,而且可以做到集群,高並發,持久化等各項特點。
那麼如何實現呢?
AMQP協議,以及RabbitMQ本身沒有直接支持延遲隊列的功能,但是可以通過TTL和DLX模擬出延遲隊列的功能。
TTL(Time To Live)
RabbitMQ可以針對Queue和Message設置 x-message-tt,來控制消息的生存時間,如果超時,則消息變為dead letter。
DLX (Dead-Letter-Exchange)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數,如果隊列內出現了dead letter,則按照這兩個參數重新路由。
x-dead-letter-exchange:出現dead letter之後將dead letter重新發送到指定exchange 。
x-dead-letter-routing-key:指定routing-key發送。
注意事項:
設置了 x-dead-letter-exchange 和 x-dead-letter-routing 後的隊列是根據隊列入隊的順序進行消費,即使到了過期時間也不會觸發x-dead-letter-exchange。因為過期時間是在消息出隊列的時候進行判斷的所以當隊列沒有設過期時間時,插入一個沒有過期時間的消息會導致 x-dead-letter-exchange 隊列永遠不會被消費。
PHP代碼:(rabbitmq的具體安裝方法請移步官網)
send.php 第一次投遞,直接投遞到 x-dead-letter-exchange,不用創建一個0毫秒的延遲隊列。
<?phprequire_once __DIR__ . /vendor/autoload.php;use PhpAmqpLibConnectionAMQPStreamConnection;use PhpAmqpLibMessageAMQPMessage;use PhpAmqpLibWireAMQPTable;$connection = new AMQPStreamConnection(127.0.0.1, 5672, guest, guest);$channel = $connection->channel();$expiration = 0;$channel->exchange_declare(delay_exchange, direct,false,false,false);$channel->queue_declare(delay_queue,false,true,false,false,false);$channel->queue_bind(delay_queue, delay_exchange,delay_exchange);$msg = new AMQPMessage($expiration,array( delivery_mode => AMQPMessage::DELIVERY_MODE_PERSISTENT,));//直接投遞到delay_exchange$channel->basic_publish($msg,delay_exchange,delay_exchange);echo date(Y-m-d H:i:s)." [x] 發送一條0毫秒後執行的數據! ".PHP_EOL;$channel->close();$connection->close();
receive.php
<?phprequire_once __DIR__ . /vendor/autoload.php;use PhpAmqpLibConnectionAMQPStreamConnection;use PhpAmqpLibMessageAMQPMessage;use PhpAmqpLibWireAMQPTable;$connection = new AMQPStreamConnection(127.0.0.1, 5672, guest, guest);$channel = $connection->channel();$channel->exchange_declare(delay_exchange, direct,false,false,false);$channel->queue_declare(delay_queue,false,true,false,false,false);$channel->queue_bind(delay_queue, delay_exchange,delay_exchange);echo [*] Waiting for message. To exit press CTRL+C .PHP_EOL;//重點在callback$callback = function ($msg) { //定義時間 單位毫秒 //$second = [15000,15000,30000,180000,1800000,1800000,1800000,3600000]; $second = [0,1500,1500,3000,18000,180000,180000,180000,360000]; $msg->delivery_info[channel]->basic_ack($msg->delivery_info[delivery_tag]); $connection = new AMQPStreamConnection(127.0.0.1, 5672, guest, guest); $channel = $connection->channel(); //重發 $callback_time = $msg->body; echo date(Y-m-d H:i:s)." [x] 接收一條".$second[$callback_time]."毫秒後的數據! ".PHP_EOL; $callback_time++; if (!isset($second[$callback_time])) return; $expiration = $second[$callback_time]; $cache_exchange_name = cache_exchange_ . $expiration; $cache_queue_name = cache_queue_ . $expiration; $tale = new AMQPTable(); //設置x-dead-letter-exchange $tale->set(x-dead-letter-exchange, delay_exchange); //設置x-dead-letter-routing-key $tale->set(x-dead-letter-routing-key, delay_exchange); //設置隊列過期時間 $tale->set(x-message-ttl, $expiration); $channel->queue_declare($cache_queue_name,false,true,false,false,false,$tale); $channel->queue_bind($cache_queue_name, $cache_exchange_name,); $msg = new AMQPMessage($callback_time, array( delivery_mode => AMQPMessage::DELIVERY_MODE_PERSISTENT, )); //發送至延時隊列 $channel->basic_publish($msg, $cache_exchange_name,); echo date(Y-m-d H:i:s)." [x] 發送一條".$expiration."毫秒後執行的數據! ".PHP_EOL; $channel->close(); $connection->close();};//只有consumer已經處理並確認了上一條message時queue才分派新的message給它$channel->basic_qos(null, 1, null);$channel->basic_consume(delay_queue,,false,false,false,false,$callback);while (count($channel->callbacks)) { $channel->wait();}$channel->close();$connection->close();
推薦閱讀:
※Spring 整合JMS 基於ActiveMQ 實現消息的發送接收
※螞蟻消息中間件 (MsgBroker) 在 YGC 優化上的探索
※Kafka基礎概念