ScrapyRedis源碼解析

作者:崔慶才 請勿轉載

配套免費視頻教程:Python3爬蟲三大案例實戰分享:貓眼電影、今日頭條街拍美圖、淘寶美食 Python3爬蟲三大案例實戰分享

爬取知乎所有用戶詳細信息 edu.hellobi.com/course/ 知乎源碼:Germey/Zhihu

ScrapyRedis 這個庫已經為我們提供了 Scrapy 分散式的隊列、調度器、去重等功能,其 GitHub 地址為:github.com/rmax/scrapy-

本節我們來分析一下它的源碼,深入了解一下利用 Redis 怎樣實現 Scrapy 分散式。

1. 獲取源碼

首先我們可以把源碼 Clone 下來,執行如下命令:

git clone https://github.com/rmax/scrapy-redis.git

核心源碼在 scrapy-redis/src/scrapy_redis 目錄下。

2. 爬取隊列

首先我們從爬取隊列入手,看下它的具體實現,源碼文件為 queue.py,在這裡它有三個隊列的實現,首先它實現了一個父類 Base,提供一些基本方法和屬性:

class Base(object): """Per-spider base queue class""" def __init__(self, server, spider, key, serializer=None): if serializer is None: serializer = picklecompat if not hasattr(serializer, loads): raise TypeError("serializer does not implement loads function: %r" % serializer) if not hasattr(serializer, dumps): raise TypeError("serializer %s does not implement dumps function: %r" % serializer) self.server = server self.spider = spider self.key = key % {spider: spider.name} self.serializer = serializer def _encode_request(self, request): obj = request_to_dict(request, self.spider) return self.serializer.dumps(obj) def _decode_request(self, encoded_request): obj = self.serializer.loads(encoded_request) return request_from_dict(obj, self.spider) def __len__(self): """Return the length of the queue""" raise NotImplementedError def push(self, request): """Push a request""" raise NotImplementedError def pop(self, timeout=0): """Pop a request""" raise NotImplementedError def clear(self): """Clear queue/stack""" self.server.delete(self.key)

首先看一下 _encode_request() 和 _decode_request() 方法,因為我們需要把一 個Request 對象存儲到資料庫中,但資料庫無法直接存儲對象,所以需要將 Request 序列化轉成字元串再存儲,而這兩個方法就分別是序列化和反序列化的操作,利用 pickle 庫來實現,一般在調用 push() 將 Request 存入資料庫時會調用 _encode_request() 方法進行序列化,在調用 pop() 取出 Request 的時候會調用 _decode_request() 進行反序列化。

在父類中 len()、push() 和 pop() 方法都是未實現的,會直接拋出 NotImplementedError,因此這個類是不能直接被使用的,所以必須要實現一個子類來重寫這三個方法,而不同的子類就會有不同的實現,也就有著不同的功能。

那麼接下來就需要定義一些子類來繼承 Base 類,並重寫這幾個方法,那在源碼中就有三個子類的實現,它們分別是 FifoQueue、PriorityQueue、LifoQueue,我們分別來看下它們的實現原理。

首先是 FifoQueue:

class FifoQueue(Base): """Per-spider FIFO queue""" def __len__(self): """Return the length of the queue""" return self.server.llen(self.key) def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.brpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.rpop(self.key) if data: return self._decode_request(data)

可以看到這個類繼承了Base類,並重寫了 len()、push()、pop() 這三個方法,在這三個方法中都是對 server 對象的操作,而 server 對象就是一個 Redis 連接對象,我們可以直接調用其操作 Redis 的方法對資料庫進行操作,可以看到這裡的操作方法有 llen()、lpush()、rpop() 等,那這就代表此爬取隊列是使用的 Redis的列表,序列化後的 Request 會被存入列表中,就是列表的其中一個元素,len() 方法是獲取列表的長度,push() 方法中調用了 lpush() 操作,這代表從列表左側存入數據,pop() 方法中調用了 rpop() 操作,這代表從列表右側取出數據。

所以 Request 在列表中的存取順序是左側進、右側出,所以這是有序的進出,即先進先出,英文叫做 First Input First Output,也被簡稱作 Fifo,而此類的名稱就叫做FifoQueue。

另外還有一個與之相反的實現類,叫做 LifoQueue,實現如下:

class LifoQueue(Base): """Per-spider LIFO queue.""" def __len__(self): """Return the length of the stack""" return self.server.llen(self.key) def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.blpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.lpop(self.key) if data: return self._decode_request(data)

與 FifoQueue 不同的就是它的 pop() 方法,在這裡使用的是 lpop() 操作,也就是從左側出,而 push() 方法依然是使用的 lpush() 操作,是從左側入。那麼這樣達到的效果就是先進後出、後進先出,英文叫做 Last In First Out,簡稱為 Lifo,而此類名稱就叫做 LifoQueue。同時這個存取方式類似棧的操作,所以其實也可以稱作 StackQueue。

另外在源碼中還有一個子類實現,叫做 PriorityQueue,顧名思義,它叫做優先順序隊列,實現如下:

class PriorityQueue(Base): """Per-spider priority queue abstraction using redis sorted set""" def __len__(self): """Return the length of the queue""" return self.server.zcard(self.key) def push(self, request): """Push a request""" data = self._encode_request(request) score = -request.priority self.server.execute_command(ZADD, self.key, score, data) def pop(self, timeout=0): """ Pop a request timeout not support in this queue class """ pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute() if results: return self._decode_request(results[0])

在這裡我們可以看到 len()、push()、pop() 方法中使用了 server 對象的 zcard()、zadd()、zrange() 操作,可以知道這裡使用的存儲結果是有序集合 Sorted Set,在這個集合中每個元素都可以設置一個分數,那麼這個分數就代表優先順序。

在 len() 方法里調用了 zcard() 操作,返回的就是有序集合的大小,也就是爬取隊列的長度,在 push() 方法中調用了 zadd() 操作,就是向集合中添加元素,這裡的分數指定成 Request 的優先順序的相反數,因為分數低的會排在集合的前面,所以這裡高優先順序的 Request 就會存在集合的最前面。pop() 方法是首先調用了 zrange() 操作取出了集合的第一個元素,因為最高優先順序的 Request 會存在集合最前面,所以第一個元素就是最高優先順序的 Request,然後再調用 zremrangebyrank() 操作將這個元素刪除,這樣就完成了取出並刪除的操作。

此隊列是默認使用的隊列,也就是爬取隊列默認是使用有序集合來存儲的。

3. 去重過濾

我們在前面說過 Scrapy 中的去重實現就是利用集合這個數據結構,但是在 Scrapy 分散式中去重就需要利用一個共享的集合了,那麼在這裡使用的就是 Redis 中的集合數據結構,我們來看下它的去重類是怎樣實現的,源碼文件是 dupefilter.py,其內實現了一個 RFPDupeFilter 類,實現如下:

class RFPDupeFilter(BaseDupeFilter): """Redis-based request duplicates filter. This class can also be used with default Scrapys scheduler. """ logger = logger def __init__(self, server, key, debug=False): """Initialize the duplicates filter. Parameters ---------- server : redis.StrictRedis The redis server instance. key : str Redis key Where to store fingerprints. debug : bool, optional Whether to log filtered requests. """ self.server = server self.key = key self.debug = debug self.logdupes = True @classmethod def from_settings(cls, settings): """Returns an instance from given settings. This uses by default the key ``dupefilter:<timestamp>``. When using the ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as it needs to pass the spider name in the key. Parameters ---------- settings : scrapy.settings.Settings Returns ------- RFPDupeFilter A RFPDupeFilter instance. """ server = get_redis_from_settings(settings) key = defaults.DUPEFILTER_KEY % {timestamp: int(time.time())} debug = settings.getbool(DUPEFILTER_DEBUG) return cls(server, key=key, debug=debug) @classmethod def from_crawler(cls, crawler): """Returns instance from crawler. Parameters ---------- crawler : scrapy.crawler.Crawler Returns ------- RFPDupeFilter Instance of RFPDupeFilter. """ return cls.from_settings(crawler.settings) def request_seen(self, request): """Returns True if request was already seen. Parameters ---------- request : scrapy.http.Request Returns ------- bool """ fp = self.request_fingerprint(request) added = self.server.sadd(self.key, fp) return added == 0 def request_fingerprint(self, request): """Returns a fingerprint for a given request. Parameters ---------- request : scrapy.http.Request Returns ------- str """ return request_fingerprint(request) def close(self, reason=): """Delete data on close. Called by Scrapys scheduler. Parameters ---------- reason : str, optional """ self.clear() def clear(self): """Clears fingerprints data.""" self.server.delete(self.key) def log(self, request, spider): """Logs given request. Parameters ---------- request : scrapy.http.Request spider : scrapy.spiders.Spider """ if self.debug: msg = "Filtered duplicate request: %(request)s" self.logger.debug(msg, {request: request}, extra={spider: spider}) elif self.logdupes: msg = ("Filtered duplicate request %(request)s" " - no more duplicates will be shown" " (see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {request: request}, extra={spider: spider}) self.logdupes = False

在這裡我們注意到同樣實現了一個 request_seen() 方法,和 Scrapy 中的 request_seen() 方法實現極其類似,不過在這裡集合使用的是 server 對象的 sadd() 操作,也就是集合不再是簡單的一個簡單數據結構了,在這裡直接換成了資料庫的存儲方式。

鑒別重複的方式還是使用指紋,而指紋的獲取同樣是使用 request_fingerprint() 方法完成的。獲取指紋之後就直接嘗試向集合中添加這個指紋,如果添加成功,那麼就代表這個指紋原本不存在於集合中,返回值就是 1,而最後的返回結果是判定添加結果是否為 0,如果為 1,那這個判定結果就是 False,也就是不重複,否則判定為重複。

這樣我們就成功利用 Redis 的集合完成了指紋的記錄和重複的驗證。

4. 調度器

ScrapyRedis 還幫我們實現了一個配合 Queue、 DupeFilter 使用的調度器 Scheduler,源文件名稱是 scheduler.py。

在這裡指定了一些配置,如 SCHEDULER_FLUSH_ON_START 即是否在爬取開始的時候清空爬取隊列,SCHEDULER_PERSIST 即是否在爬取結束後保持爬取隊列不清除,我們可以在 settings.py 裡面自由配置,而此調度器很好的實現了對接。

接下來我們再看下兩個核心的存取方法,實現如下:

def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc_value(scheduler/enqueued/redis, spider=self.spider) self.queue.push(request) return Truedef next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value(scheduler/dequeued/redis, spider=self.spider) return request

enqueue_request() 就是調度器向隊列中添加 Request,在這裡做的核心操作就是調用 Queue 的 push() 操作,同時還有一些統計和日誌操作,next_request() 就是從隊列中取 Request,核心操作就是調用 Queue 的 pop() 操作,那麼此時如果隊列中還有 Request,則會直接取出來,接著爬取,否則當隊列為空時,則會重新開始爬取。

5. 總結

那麼到現在為止我們就把三個分散式的問題解決了,總結如下:

  • 爬取隊列的實現,在這裡提供了三種隊列,使用了Redis的列表或有序集合來維護。
  • 去重的實現,使用了 Redis 的集合來保存 Request 的指紋來提供重複過濾。
  • 中斷後重新爬取的實現,中斷後 Redis 的隊列沒有清空,再次啟動時調度器的 next_request() 會從隊列中取到下一個 Request,繼續爬取。

6. 結語

以上便是 ScrapyRedis 的核心源碼解析,另外 ScrapyRedis 中還提供了 Spider、Item Pipeline 的實現,不過並不是必須要使用的,如有興趣可以研究。

自己動手,豐衣足食!Python3網路爬蟲實戰案例 自己動手,豐衣足食!Python3網路爬蟲實戰案例

雙11全場5折瘋搶12天 雙11全場5折瘋搶12天11.11狂歡

關鍵字:已有1200人學習 貓眼電影、今日頭條街拍、淘寶商品美食、微信文章、知乎用戶信息等案例,結合反爬策略,例例實戰 已連載完畢。

推薦閱讀:

如何用scrapy爬取搜房網上小區的坐標值?
小甲魚第63課scrapy程序代碼
[python]scrapy框架構建(2.7版本)
Scrapy爬蟲框架教程(三)-- 調試(Debugging)Spiders
Scrapy中xpath如何提取細節標籤

TAG:scrapy | Python | Python库 |