與RabbitMQ結合的「吐槽」抓取

目標:完成抓取段子吐槽的爬蟲。

首先什麼是rabbitMQ?

RabbitMQ is open source message broker software (sometimes called message-oriented middleware) that implements the Advanced Message Queuing Protocol (AMQP)

簡單來說,rabbitMQ實現了一個隊列可以用於模塊解耦。

為什麼要用RabbitMQ?

由於煎蛋段子的吐槽是通過ajax非同步獲取的,並非在段子頁面上直接就有。

項目里用「段子爬蟲」用來抓取段子,那麼相應的就要有「吐槽爬蟲」來爬取吐槽。雖然也可以把這兩個工作寫到一個程序里,但這樣會導致段子抓取模塊和吐槽抓取模塊耦合了,不利用項目維護和拓展。而用RabbitMQ能解耦這兩個模塊,充當兩個模塊連接的橋樑。

如何最快的搭建rabbitMQ?

用docker toolbox中的Kitematic。具體介紹和使用參見本專欄的另一篇文章async/await使用rabbitMQ。

代碼主體

  • Part I 發送部分
  • Part II 抓取吐槽的爬蟲
  • part III 統一config

Part I 發送部分

首先改造es6+promise下使用rabbitMQ一文中的new_task.js,改造的地方有兩個

  1. 模塊化封裝
  2. 用confirmChannel,列印提示成功發送信息

在utils目錄下創建send.js

export function sendToQueue(queueName, msg){n if (typeof msg !== string) {n msg = JSON.stringify(msg);n }n var amqp = require(amqplib);n amqp.connect(amqp://192.168.99.100).then(async (conn) => {n return conn.createConfirmChannel().then(async(ch) => {nn let ok = await ch.assertQueue(queueName, {durable: true});n ch.sendToQueue(queueName, new Buffer(msg), {deliveryMode: true}, (err, ok)=>{n if (err !== null)n console.log(" [x] Sent %s failed!!", msg);n elsen console.log(" [x] Sent %s", msg);n });n await ch.waitForConfirms();n return ch.close();n }).finally(function() { conn.close(); });n }).catch(console.warn);n}n

然後在index.js中,引入sendToQueue函數

import {sendToQueue} from ./utils/send;n

在duanziExtraction函數調用後,用map函數提取出duanziId列表

let duanziStore = duanziExtraction($);nlet idArray = duanziStore.map(item=>item.duanziId); // 新增nsendToQueue("duanziList", idArray); // 新增n

同時為了調試方便暫時注釋了數據存儲的部分代碼。

運行命令

babel-node index.jsn

運行結果

同時rabbitmq management system也可以看出消息成功發送了。

Part II 抓取吐槽的爬蟲

這個爬蟲基於es6+promise下使用rabbitMQ一文中的worker.js來做,改造的地方主要就是doWork函數。

首先分析煎蛋段子的吐槽是如何載入的。

用chrome打開開發者模式,選中Network,然後點擊吐槽,發現發送了一個ajax請求,url是「jandan.net/tucao/352374」。

於是猜測就是請求 "jandan.net/tucao/<編號>" 就會返回需要的吐槽內容,選一個吐槽比較多的請求結果如下

所有吐槽就在tucao這個欄位里。

介面分析完畢,爬蟲的工作流程就是

  1. duanziList這個queue里取出標號列表,
  2. 拼接請求url,用async控制並發,
  3. 保存到mongodb

tucaoCrawler.js

import mapLimit from async/mapLimit;nimport superagent from superagent;nimport mongoConnect from ./utils/mongoConnect;nnvar amqp = require(amqplib);nnamqp.connect(amqp://192.168.99.100).then((conn) => {n return conn.createChannel().then(async(ch) => {n let ok = await ch.assertQueue(duanziList, {durable: true});n ch.prefetch(1);n ch.consume(duanziList, doWork.bind(ch), {noAck: false});n console.log(" [*] Waiting for messages. To exit press CTRL+C");n });n}).catch(console.warn);nnnfunction doWork(msg) {n let duanziList = JSON.parse(msg.content.toString());n console.log(" [x] One List Received");nn // async crawln var self = this;n mapLimit(duanziList, 3,n (id, callback)=>{n console.log("current duanziId is " + id);n superagent.get("http://jandan.net/tucao/" + id)n .end((err, response) => {n if (err) {n console.log("get tucao failed, id = " + id);n console.log(err);n let delay = Math.random()*2000;n setTimeout(function(){n callback(null, {n failed: true,n duanziId: idn });n },delay);nn }else {n let res = JSON.parse(response.text);n // create a random delay and call callback functionn let delay = Math.random()*2000;n setTimeout(function(){n callback(null, {n tucao: res[tucao],n duanziId: idn });n },delay);n }n })n },n (err, result)=>{n if (err) {n console.log("error happened");n console.log(err);n }else {n console.log("mapLimit done");n console.log(" [x] One Task Done");n // filter to find whether err happenedn if (result.filter((item)=>{if (item.failed == true) return true;}).length !== 0) {n console.log("error happened when request tucao");n self.nack(msg);n }else {n self.ack(msg); // this is ch, call ack to notifyn saveToMongoDB(result);n }nn }n })n}nnfunction saveToMongoDB(result){n mongoConnect("mongodb://localhost:27017/duanzi")n .then(n async (db)=>{n try {n const collection = db.collection(tucao);n const res = await collection.insertMany(result);n console.log("save tucao success");n }n catch(err) {n console.log("save error");n db.close();n throw err;n }n db.close();n },n (err)=>{n console.log("db connection error")n throw err;n }n )n .catch((err)=>{n console.log(err);n })n}n

對於存放吐槽的tucao collection,可以通過MongoHub設置一個index,保證unique,因為段子的id是可以當作primary key的。

調整index.js,設置只抓取前3頁,然後同時運行抓取段子和抓取吐槽

babel-node index.jsnbabel-node tucaoCrawler.jsn

運行結果

至此吐槽爬蟲的原型算是完成了。

Part III 統一config

最後再做一個完善,就是添加一個配置文件,方便後面做調試,比如,最大抓取頁數,抓取速度等

config.js

export const config = {n page: 2, // 爬去頁數n interval: 2, // 請求間歇時間n concurrent: 3 // 並發數n}n

然後在index.js以及tucaoCrawler.js中引入並修改掉原先hard code的數字。

參考程序

github.com/wk633/duanzi

推薦閱讀:

beanstalk和rabbitmq區別?
RabbitMQ ACK 機制的意義是什麼?
Rabbitmq 和 Celery 是怎樣工作的?

TAG:电影 | 集體宿舍 | 宿舍生活 | 各种想法 | 异国他乡 | 海外 | 墨西哥 | 恋爱 | 恋爱心理 | 恋爱技巧 | 力量举 | 健身 | 运动 | 精酿啤酒 | WMI | Docker | 王路的粽子铺 | 美容護膚 | 祛痘治療 | Coding | 设计 | 夜间模式 | WebGL | 计算机图形学 | 前端开发 | 选址 | 认知科学 | 语言学 | 慈善 | 眼鏡行業 | 眼鏡片 | 眼鏡選購 | 平面设计 | 市场营销 | 人工智能 | 余凯 | 芯片集成电路 | 电子游戏 | 电子游戏产业 | 雅达利 | 中老年人 | 20岁的生活方式,决定30岁的打开方式书籍 | 中年危机 | 心理學 | 父親 | 母親 | 父母 | X是種怎樣的體驗 | 旅行 | 马来西亚 | 美食 | Ajax | RabbitMQ | 爬虫计算机网络 |