Rabbitmq系列之1--基礎概念
一、安裝
1 下載erlang安裝包、下載rabbmq安裝包。
2、安裝erlang設置 erlang_home 和 bin
3、安裝mangement
D:Program FilesRabbitMQ Server
abbitmq_server-3.7.4sbin>rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@DESKTOP-3OB02JB:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@DESKTOP-3OB02JB...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
set 3 plugins.
Offline change; changes will take effect at broker restart.
4、重啟服務
5、訪問http://localhost:15672/
輸入:guest guest
二、使用
producer:消息生產者
consumer:消息消費者
virtual host:虛擬主機,在RabbitMQ中,用戶只能在虛擬主機的層面上進行一些許可權設置,比如我可以訪問哪些隊列,我可以處理哪些請求等等;
broker:消息轉發者,也就是我們RabbitMQ服務端充當的功能了,那麼消息是按照什麼規則進行轉發的呢?需要用到下面幾個概念;
exchange:交換機,他是和producer直接進行打交道的,有點類似於路由器的功能,主要就是進行轉發操作的唄,那麼producer到底用哪個exchange進行路由呢?這個取決於routing key(路由鍵),每個消息都有這個鍵,我們也可以自己設定,其實就是一字元串;
queue:消息隊列,用於存放消息,他接收exchange路由過來的消息,我們可以對隊列內容進行持久化操作,那麼queue到底接收那個exchange路由的消息呢?這個時候就要用到binding key(綁定鍵)了,綁定鍵會將隊列和exchange進行綁定,至於綁定方式,RabbitMQ提供了多種方式,大家可以看看鴻洋大神的RabbitMQ博客系列(點擊查看);
以上就是RabbitMQ涉及到的一些概念了,用一張圖表示這些概念之間的關係就是:
三、任務分配機制
1、 Round-robin 轉發
RabbitMQ會一個一個的發送信息給下一個消費者(consumer),而不考慮每個任務的時長等等,且是一次性分配,並非一個一個分配。平均的每個消費者將會獲得相等數量的消息。這樣分發消息的方式叫做round-robin
2、 消息應答(message acknowledgments)
就是消費者在處理完消息後告訴MQ消息已經處理完成,之後MQ才將消息從隊列移除。
如果消費者被殺死而沒有發送應答,RabbitMQ會認為該信息沒有被完全的處理,然後將會重新轉發給別的消費者。通過這種方式,你可以確認信息不會被丟失,即使消者偶爾被殺死。
這種機制並沒有超時時間這麼一說,RabbitMQ只有在消費者連接斷開是重新轉發此信息。如果消費者處理一個信息需要耗費特別特別長的時間是允許的。
3、 消息持久化(Message durability)
消息持久化:將消息持久化到機器硬碟,下次mq重啟後還會重新連接上。
4、 公平轉發(Fair dispatch)
或許會發現,目前的消息轉發機制(Round-robin)並非是我們想要的。例如,這樣一種情況,對於兩個消費者,有一系列的任務,奇數任務特別耗時,而偶數任務卻很輕鬆,這樣造成一個消費者一直繁忙,另一個消費者卻很快執行完任務後等待。
造成這樣的原因是因為RabbitMQ僅僅是當消息到達隊列進行轉發消息。並不在乎有多少任務消費者並未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。
為了解決這樣的問題,我們可以使用basicQos方法,傳遞參數為prefetchCount = 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。換句話說,只有在消費者空閑的時候會發送下一條信息。
1. int prefetchCount = 1;
2. channel.basicQos(prefetchCount);
註:如果所有的工作者都處於繁忙狀態,你的隊列有可能被填充滿。你可能會觀察隊列的使用情況,然後增加工作者,或者使用別的什麼策略。
四、 發布/訂閱機制
1、 轉發器(Exchanges)
前面的博客中我們主要的介紹都是發送者發送消息給隊列,接收者從隊列接收消息。下面我們會引入Exchanges,展示RabbitMQ的完整的消息模型。
RabbitMQ消息模型的核心理念是生產者永遠不會直接發送任何消息給隊列,一般的情況生產者甚至不知道消息應該發送到哪些隊列。
相反的,生產者只能發送消息給轉發器(Exchange)。轉發器是非常簡單的,一邊接收從生產者發來的消息,另一邊把消息推送到隊列中。轉發器必須清楚的知道消息如何處理它收到的每一條消息。是否應該追加到一個指定的隊列?是否應該追加到多個隊列?或者是否應該丟棄?這些規則通過轉發器的類型進行定義。
下面列出一些可用的轉發器類型:
Direct
Topic
Headers
Fanout
目前我們關注最後一個fanout,聲明轉發器類型的代碼:
channel.exchangeDeclare("logs","fanout");
fanout類型轉發器特別簡單,把所有它介紹到的消息,廣播到所有它所知道的隊列。不過這正是我們前述的日誌系統所需要的。
2、 匿名轉發器(nameless exchange)
前面說到生產者只能發送消息給轉發器(Exchange),但是我們前兩篇博客中的例子並沒有使用到轉發器,我們仍然可以發送和接收消息。這是因為我們使用了一個默認的轉發器,它的標識符為」」。之前發送消息的代碼:
channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
第一個參數為轉發器的名稱,我們設置為」」 : 如果存在routingKey(第二個參數),消息由routingKey決定發送到哪個隊列。
現在我們可以指定消息發送到的轉發器:
channel.basicPublish( "logs","", null, message.getBytes());
3、 臨時隊列(Temporary queues)
前面的博客中我們都為隊列指定了一個特定的名稱。能夠為隊列命名對我們來說是很關鍵的,我們需要指定消費者為某個隊列。當我們希望在生產者和消費者間共享隊列時,為隊列命名是很重要的。
不過,對於我們的日誌系統我們並不關心隊列的名稱。我們想要接收到所有的消息,而且我們也只對當前正在傳遞的數據的感興趣。為了滿足我們的需求,需要做兩件事:
第一, 無論什麼時間連接到Rabbit我們都需要一個新的空的隊列。為了實現,我們可以使用隨機數創建隊列,或者更好的,讓伺服器給我們提供一個隨機的名稱。
第二, 一旦消費者與Rabbit斷開,消費者所接收的那個隊列應該被自動刪除。
Java中我們可以使用queueDeclare()方法,不傳遞任何參數,來創建一個非持久的、唯一的、自動刪除的隊列且隊列名稱由伺服器隨機產生。
String queueName = channel.queueDeclare().getQueue();
一般情況這個名稱與amq.gen-JzTY20BRgKO-HjmUJj0wLg 類似。
4、 綁定(Bindings)
我們已經創建了一個fanout轉發器和隊列,我們現在需要通過binding告訴轉發器把消息發送給我們的隊列。
channel.queueBind(queueName, 「logs」, 」」)參數1:隊列名稱 ;參數
5、 完整例子
RabbitMQ (三) 發布/訂閱 - CSDN博客
生產者將消息發送至轉發器,轉發器決定將消息發送至哪些隊列,消費者綁定隊列獲取消息。
Rabbitmq系列之1--基礎概念
Rabbitmq系列之2--springboot整合
rabbitmq系列3--延遲隊列
Rabbitmq系列之4--Springboot延遲隊列實現
推薦閱讀:
※zooKeeper基礎
※分散式系統設計:服務(多節點)模式
※分散式系統論文筆記目錄
※分散式系統設計:批處理模式之事件驅動的批處理
※分散式系統測試的應用方法——場景注入測試