async/await使用rabbitMQ
目標:重新實現RabbitMQ官網上的「Hello World」和「work queues」兩個例子。
動機:官網上的tutorial上的兩個例子用的都是callback api,其實amqplib提供了promise api,並且提供了例子,使用promise api的例子,但是promise的用法看起來還不是很簡潔,尤其是在promise內部涉及流程式控制制時,會顯得很奇怪,比如下面的代碼。
開發環境搭建
首先安裝npm包
npm install amqplibn
然後搭建RabbitMQ環境,這裡使用docker toolbox中的Kitematic(beta),因此需要先安裝docker toolbox。
搜索官方鏡像,創建即可。
因為RabbitMQ默認跑在5672埠,但是ACCESS URL卻是192.168.99.100:32769,這個會讓後面寫程序需要額外的設置,因此,修改一下映射到的埠。點一下save容器重啟。
Hello World
send.js
var amqp = require(amqplib);namqp.connect(amqp://192.168.99.100).then(n (conn) => {n return connn .createChannel()n .then(n async(ch) => {n var q = hello;n var msg = Hello World!;n let ok = await ch.assertQueue(q, {durable: false});n console.log(ok);n ch.sendToQueue(q, new Buffer(msg));n console.log(" [x] Sent %s", msg);n return ch.close();n }n ).finally(function() { conn.close(); });n },n (err) => {n console.log("connection error")n }n).catch(console.warn);n
需要提一下的就是connection close的時機,發現寫在console.log(" [x] Sent %s", msg);之後並不能成功發送到queue。比如像這樣:
// 並不work的寫法nvar amqp = require(amqplib);namqp.connect(amqp://192.168.99.100).then(n async (conn) => {n try {n let ch = await conn.createChannel();n let q = hello, msg = Hello World!;n let queueInfo = await ch.assertQueue(q, {durable: false});n console.log(queueInfo);n await ch.sendToQueue(q, new Buffer(msg));n console.log(" [x] Sent %s", msg);n conn.close();n }n catch(err){n console.log(err);n }n }n).catch(console.warn);n
產生的原因沒有找到,猜測是因為sendToQueue沒有返回promsie因此await無法阻塞代碼執行,所以過早關閉了connection。。。所以只能妥協一下,像turtorial里一樣將close放在finally里。
receive.js
var amqp = require(amqplib);namqp.connect(amqp://192.168.99.100).then(n async (conn) => {n try {n let ch = await conn.createChannel();n let q = hello;n let ok = await ch.assertQueue(q, {durable: false});n console.log(ok);n console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);n let mm = await ch.consume(q, (msg)=>{n console.log(" [x] Received %s", msg.content.toString());n } ,{noAck: true});n console.log(mm);n conn.close();n }catch(err){n console.log(err);n conn.close();n }n})n.catch(console.warn);n
receive.js會一直處於監聽狀態,因此不涉及connection close的問題。
運行結果:
Work Queues
new_task.js
var amqp = require(amqplib);nnamqp.connect(amqp://192.168.99.100).then(async (conn) => {n return conn.createChannel().then(async(ch) => {n var q = task_queue;n let ok = await ch.assertQueue(q, {durable: true});n let msg = process.argv.slice(2).join( ) || "Hello World!";n console.log(" [x] Sent %s", msg);n ch.sendToQueue(q, new Buffer(msg), {deliveryMode: true});n return ch.close();n }).finally(function() { conn.close(); });n}).catch(console.warn);n
worker.js
var amqp = require(amqplib);nnamqp.connect(amqp://192.168.99.100).then((conn) => {n return conn.createChannel().then(async(ch) => {n let ok = await ch.assertQueue(task_queue, {durable: true});n ch.prefetch(1);n ch.consume(task_queue, doWork.bind(ch), {noAck: false});n console.log(" [*] Waiting for messages. To exit press CTRL+C");n });n}).catch(console.warn);nnfunction doWork(msg) {n var body = msg.content.toString();n console.log(" [x] Received %s", body);n var secs = body.split(.).length - 1;n console.log(" [x] Task takes %d seconds", secs);nn const doneAck = () => {n console.log(" [x] Done");n this.ack(msg); // this is chn }n setTimeout(doneAck, secs * 1000);n}n
prefetch(1)可以實現控制每次取的信息的個數,這裡設置成1。
worker.js中分離出了doWork函數,因此在ch.consume(task_queue, doWork.bind(ch), {noAck: false});需要bind一下,否則在doWork函數里無法調用channel的ack函數。分離的目的就是為了將來可以模塊化的引入處理函數,方便項目的開發。
此外doneAck函數定義的時候this就是它定義時候上下文中的this,也就是ch。推理過程如下,因為doWork.bind(ch),所以在doWork函數作用域中this即是ch;又doneAck函數定義在該作用域內,因此doneAck中this就是ch。
運行結果:
Bonus: manage system插件的使用
rabbitmq_management Management Plugin 是rabbitmq的一個插件,提供web UI,方便查看rabbitmq以及隊列信息等。
首先修改docker的埠映射,將15672埠映射出來,因為該服務默認運行在15672埠。
然後打開終端
運行以下命令以啟動插件
rabbitmq-plugins enable rabbitmq_managementn
然後瀏覽器登錄192.168.99.100:15672, 用戶名和密碼都是guest,然後就能登陸進去了。
參考程序
wk633/duanziMonitorSystem
推薦閱讀: