Rabbitmq系列之1--基礎概念

Rabbitmq系列之1--基礎概念

一、安裝

1 下載erlang安裝包、下載rabbmq安裝包。

2、安裝erlang設置 erlang_home 和 bin

3、安裝mangement

D:Program FilesRabbitMQ Server
abbitmq_server-3.7.4sbin>rabbitmq-plugins enable rabbitmq_management

Enabling plugins on node rabbit@DESKTOP-3OB02JB:

rabbitmq_management

The following plugins have been configured:

rabbitmq_management

rabbitmq_management_agent

rabbitmq_web_dispatch

Applying plugin configuration to rabbit@DESKTOP-3OB02JB...

The following plugins have been enabled:

rabbitmq_management

rabbitmq_management_agent

rabbitmq_web_dispatch

set 3 plugins.

Offline change; changes will take effect at broker restart.

4、重啟服務

5、訪問http://localhost:15672/

輸入:guest guest

二、使用

producer:消息生產者

consumer:消息消費者

virtual host:虛擬主機,在RabbitMQ中,用戶只能在虛擬主機的層面上進行一些許可權設置,比如我可以訪問哪些隊列,我可以處理哪些請求等等;

broker:消息轉發者,也就是我們RabbitMQ服務端充當的功能了,那麼消息是按照什麼規則進行轉發的呢?需要用到下面幾個概念;

exchange:交換機,他是和producer直接進行打交道的,有點類似於路由器的功能,主要就是進行轉發操作的唄,那麼producer到底用哪個exchange進行路由呢?這個取決於routing key(路由鍵),每個消息都有這個鍵,我們也可以自己設定,其實就是一字元串;

queue:消息隊列,用於存放消息,他接收exchange路由過來的消息,我們可以對隊列內容進行持久化操作,那麼queue到底接收那個exchange路由的消息呢?這個時候就要用到binding key(綁定鍵)了,綁定鍵會將隊列和exchange進行綁定,至於綁定方式,RabbitMQ提供了多種方式,大家可以看看鴻洋大神的RabbitMQ博客系列(點擊查看);

以上就是RabbitMQ涉及到的一些概念了,用一張圖表示這些概念之間的關係就是:

三、任務分配機制

1、 Round-robin 轉發

RabbitMQ會一個一個的發送信息給下一個消費者(consumer),而不考慮每個任務的時長等等,且是一次性分配,並非一個一個分配。平均的每個消費者將會獲得相等數量的消息。這樣分發消息的方式叫做round-robin

2、 消息應答(message acknowledgments)

就是消費者在處理完消息後告訴MQ消息已經處理完成,之後MQ才將消息從隊列移除。

如果消費者被殺死而沒有發送應答,RabbitMQ會認為該信息沒有被完全的處理,然後將會重新轉發給別的消費者。通過這種方式,你可以確認信息不會被丟失,即使消者偶爾被殺死。

這種機制並沒有超時時間這麼一說,RabbitMQ只有在消費者連接斷開是重新轉發此信息。如果消費者處理一個信息需要耗費特別特別長的時間是允許的。

3、 消息持久化(Message durability)

消息持久化:將消息持久化到機器硬碟,下次mq重啟後還會重新連接上。

4、 公平轉發(Fair dispatch)

或許會發現,目前的消息轉發機制(Round-robin)並非是我們想要的。例如,這樣一種情況,對於兩個消費者,有一系列的任務,奇數任務特別耗時,而偶數任務卻很輕鬆,這樣造成一個消費者一直繁忙,另一個消費者卻很快執行完任務後等待。

造成這樣的原因是因為RabbitMQ僅僅是當消息到達隊列進行轉發消息。並不在乎有多少任務消費者並未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。

為了解決這樣的問題,我們可以使用basicQos方法,傳遞參數為prefetchCount = 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。換句話說,只有在消費者空閑的時候會發送下一條信息。

1. int prefetchCount = 1;

2. channel.basicQos(prefetchCount);

註:如果所有的工作者都處於繁忙狀態,你的隊列有可能被填充滿。你可能會觀察隊列的使用情況,然後增加工作者,或者使用別的什麼策略。

四、 發布/訂閱機制

1、 轉發器(Exchanges)

前面的博客中我們主要的介紹都是發送者發送消息給隊列,接收者從隊列接收消息。下面我們會引入Exchanges,展示RabbitMQ的完整的消息模型。

RabbitMQ消息模型的核心理念是生產者永遠不會直接發送任何消息給隊列,一般的情況生產者甚至不知道消息應該發送到哪些隊列。

相反的,生產者只能發送消息給轉發器(Exchange)。轉發器是非常簡單的,一邊接收從生產者發來的消息,另一邊把消息推送到隊列中。轉發器必須清楚的知道消息如何處理它收到的每一條消息。是否應該追加到一個指定的隊列?是否應該追加到多個隊列?或者是否應該丟棄?這些規則通過轉發器的類型進行定義。

下面列出一些可用的轉發器類型:

Direct

Topic

Headers

Fanout

目前我們關注最後一個fanout,聲明轉發器類型的代碼:

channel.exchangeDeclare("logs","fanout");

fanout類型轉發器特別簡單,把所有它介紹到的消息,廣播到所有它所知道的隊列。不過這正是我們前述的日誌系統所需要的。

2、 匿名轉發器(nameless exchange)

前面說到生產者只能發送消息給轉發器(Exchange),但是我們前兩篇博客中的例子並沒有使用到轉發器,我們仍然可以發送和接收消息。這是因為我們使用了一個默認的轉發器,它的標識符為」」。之前發送消息的代碼:

channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

第一個參數為轉發器的名稱,我們設置為」」 : 如果存在routingKey(第二個參數),消息由routingKey決定發送到哪個隊列。

現在我們可以指定消息發送到的轉發器:

channel.basicPublish( "logs","", null, message.getBytes());

3、 臨時隊列(Temporary queues)

前面的博客中我們都為隊列指定了一個特定的名稱。能夠為隊列命名對我們來說是很關鍵的,我們需要指定消費者為某個隊列。當我們希望在生產者和消費者間共享隊列時,為隊列命名是很重要的。

不過,對於我們的日誌系統我們並不關心隊列的名稱。我們想要接收到所有的消息,而且我們也只對當前正在傳遞的數據的感興趣。為了滿足我們的需求,需要做兩件事:

第一, 無論什麼時間連接到Rabbit我們都需要一個新的空的隊列。為了實現,我們可以使用隨機數創建隊列,或者更好的,讓伺服器給我們提供一個隨機的名稱。

第二, 一旦消費者與Rabbit斷開,消費者所接收的那個隊列應該被自動刪除。

Java中我們可以使用queueDeclare()方法,不傳遞任何參數,來創建一個非持久的、唯一的、自動刪除的隊列且隊列名稱由伺服器隨機產生。

String queueName = channel.queueDeclare().getQueue();

一般情況這個名稱與amq.gen-JzTY20BRgKO-HjmUJj0wLg 類似。

4、 綁定(Bindings)

我們已經創建了一個fanout轉發器和隊列,我們現在需要通過binding告訴轉發器把消息發送給我們的隊列。

channel.queueBind(queueName, 「logs」, 」」)參數1:隊列名稱 ;參數

5、 完整例子

RabbitMQ (三) 發布/訂閱 - CSDN博客

生產者將消息發送至轉發器,轉發器決定將消息發送至哪些隊列,消費者綁定隊列獲取消息。

Rabbitmq系列之1--基礎概念

Rabbitmq系列之2--springboot整合

rabbitmq系列3--延遲隊列

Rabbitmq系列之4--Springboot延遲隊列實現

推薦閱讀:

zooKeeper基礎
分散式系統設計:服務(多節點)模式
分散式系統論文筆記目錄
分散式系統設計:批處理模式之事件驅動的批處理
分散式系統測試的應用方法——場景注入測試

TAG:RabbitMQ | 分散式系統 | Redis |