爬蟲入門到精通-如何讓爬蟲更快

本文章屬於爬蟲入門到精通系統教程第十一講

在前面的教程中,我們已經學會了如何抓取一個網頁,可是,當我需要抓取的數據足夠多的時候,應該如何讓我抓取的速度更快呢?

最簡單的方法就是使用多線程

什麼是多線程

多線程(英語:multithreading),是指從軟體或者硬體上實現多個線程並發執行的技術。具有多線程能力的計算機因有硬體支持而能夠在同一時間執行多於一個線程,進而提升整體處理性能。

使用多線程的好處

  • 使用線程可以把佔據時間長的程序中的任務放到後台去處理
  • 用戶界面可以更加吸引人,這樣比如用戶點擊了一個按鈕去觸發某些事件的處理,可以彈出一個進度條來顯示處理的進度
  • 程序的運行速度可能加快
  • 在一些等待的任務實現上如用戶輸入、文件讀寫和網路收發數據等,線程就比較有用了。在這種情況下可以釋放一些珍貴的資源如內存佔用等等。
  • 多線程技術在IOS軟體開發中也有舉足輕重的位置。

在python中使用多線程

安裝方法:

pip install futures

注意:由於本系列教程是基於python2的,所以需要安裝,如果你使用的是python3 ,在標準庫里已經有這個模塊了。直接用就行

concurrent的介紹

concurrent.futures 模塊為非同步執行可調用的對象提供了一個高級的介面。

非同步執行可以通過線程來實現,使用 ThreadPoolExecutor 模塊,或者使用 ProcessPoolExecutor 模塊通過分離進程來實現。兩種實現都有同樣的介面,他們都是通過抽象類 Executor 來定義的。

Executor 對象

class concurrent.futures.Executor

這是一個抽象類,用來提供方法去支持非同步地執行調用,它不應該被直接調用,而是應該通過具體的子類來使用。

submit(fn, *args, **kwargs)

可調用對象的調度器,fn參數將會以fn(*args, **kwargs)的形式來調用,同時返回一個 Future 對象代表了可調用對象的執行情況。

with ThreadPoolExecutor(max_workers=1) as executor:

future = executor.submit(pow, 323, 1235)

print(future.result())

map(func, *iterables, timeout=None, chunksize=1)

和map(func, *iterables)函數的作用基本相同,除了func是被非同步執行的,而且幾個對於func調用可能是同時執行的。這個函數返回的迭代器調用__next__()方法的時候,如果在timeout秒內結果不可用,那麼迭代器將會從原始調用的函數向Executor.map()拋出一個concurrent.futures.TimeoutError的異常。timeout既能是一個整數,也能是一個浮點數。如果timeout沒有指定的話或者等於 None 的話,那麼等待時間就沒有限制。如果調用函數拋出了一個異常,那麼當迭代器取到這個函數的時候,異常將會被拋出。

當使用ProcessPoolExecutor的時候,這個方法將iterables切成許多塊,然後將這些內容作為分離的任務提交到進程池中。每個塊的大概的尺寸能夠通過chunksize(大於0的正整數)的參數來指定。當iterables非常大的時候,和chunksize默認等於1相比,將chunksize設置為一個很大的值,將會顯著地提升性能。在使用ThreadPoolExecutor的情況下,chunksize的大小沒有影響。

Python 3.5新增功能:添加了chunksize參數

shutdown(wait=True)

告訴執行器,噹噹前阻塞的 futures 執行完了以後,它應該釋放所有它使用的資源。在shutdown函數之後再來調用Executor.submit()和Executor.map()將會拋出RuntimeError

如果wait等於 True 的話,這個方法不會立即返回,而直到所有阻塞的 futures 都返回,而且和這個執行器所有相關的資源都被釋放以後,這個函數才會返回。 如果wait設置為 False ,那麼這個方法會立刻返回,而和這個執行器所有相關的資源只有等到所有阻塞的 futures 都執行完以後才會被釋放。而無論wait參數的值是什麼,整個 Python 程序都會等到所有阻塞的 futures 執行完畢以後才會退出。

通過with語句,可以避免明確地來調用這個方法,它在執行完以後將會自動關閉Executor。(調用 Executor.shutdown() 時wait會被設置為True,這將會等待所有 future 執行完畢)

import shutil

with ThreadPoolExecutor(max_workers=4) as e:

e.submit(shutil.copy, src1.txt, dest1.txt)

e.submit(shutil.copy, src2.txt, dest2.txt)

e.submit(shutil.copy, src3.txt, dest3.txt)

e.submit(shutil.copy, src4.txt, dest4.txt)

ThreadPoolExecutor(重點)

ThreadPoolExecutor是Executor的子類,使用一個線程池去非同步地執行調用。

class concurrent.futures.ThreadPoolExecutor(max_workers=None)

一個Executor的子類,使用線程池中最多max_workers個線程去非同步地執行回調。

Python 3.5中的改變:如果max_workers參數為None或者沒有給定,那麼它將會被默認設置成為機器的CPU核數乘5。這裡假設ThreadPoolExecutor經常被用來執行IO密集型的工作而不是CPU密集型的工作,工作者的個數應該比ProcessPoolExecutor的工作者的個數要多。

ThreadPoolExecutor 例子

import concurrent.futuresnimport urllib.requestnnURLS = [Fox News,n CNN - Breaking News, U.S., World, Weather, Entertainment & Video News,n http://europe.wsj.com/,n BBC - Home,n http://some-made-up-domain.com/]nn# 獲取一個單頁,同時報告URL和內容ndef load_url(url, timeout):n with urllib.request.urlopen(url, timeout=timeout) as conn:n return conn.read()nn# 我們通過with語句來確保線程能夠被及時地清理,n# 這邊max_workers=5,表示最多同時有5個線程去執行nwith concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:n # 字典生成器,使用的方法是`executor.submit()`n future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}n n 字典生成器用for循環實現的話,如下n future_to_url = {}n for url in URLS:n future = executor.submit(load_url, url, 60)n future_to_url[future] = urln n for future in concurrent.futures.as_completed(future_to_url):n url = future_to_url[future]n # 這邊的future就是 通過`Executor.submit()`函數來創建的。n #有以下常用方法方法n # future.result(),返回由相關回調產生的結果,在本案列中,返回函數`load_url`的結果n # future.exception() 返回由相關回調拋出的異常,如果沒有異常則返回`None`n # 更多future對象介紹請看下文n if future.exception() is not None:n print(%r generated an exception: %s % (url,future.exception()))n else:n print(%r page is %d bytes % (url, len(future.result())))n

Future 對象

Future 類封裝了一個可調用對象的非同步執行過程,Future 對象是通過Executor.submit()函數來創建的。

class concurrent.futures.Future

封裝了一個可調用對象的非同步執行過程。Future 實例是通過Executor.submit()方法來創建的,而且不應該被直接創建,除非用來測試。

cancel()

嘗試去取消相關回調,如果這個回調正在被執行,而且不能被取消,那麼這個方法將會返回False,否則這個方法將會取消相應的回調並且返回True

cancelled()

如果相關回調被成功取消了,那麼這個方法將會返回True

running()

如果相關回調當前正在被執行而且無法取消,那麼將會返回True

done()

如果相關的回調被成功地取消或者已經運行完畢那麼將返回True

result(timeout=None)

返回由相關回調產生的結果。如果這個回調還沒有被完成那麼這個方法將會等待timeout秒。如果這個回調在timeout秒內還沒有返回,一個concurrent.futures.TimeoutError的異常將會被拋出。timeout能夠被設置成一個整數或者一個浮點數。如果timeout沒有被設置或者其值為None,那麼等待時間將沒有限制。

如果這個 future 在完成之前被取消了,那麼將會拋出一個CancelledError的異常。

如果相關的回調拋出了一個異常,那麼這個方法也會相應地拋出這個異常。

exception(timeout=None)

返回由相關回調拋出的異常。如果相關回調還沒有被完成那麼這個方法將會等待timeout秒。如果相關回調在timeout秒內還沒有被完成,那麼將會拋出一個concurrent.futures.TimeoutError的異常。timeout能夠被設置成一個整數或者一個浮點數。如果timeout沒有被設置或者其值為None,那麼等待時間將沒有限制。

如果這個 future 在完成之前被取消了,那麼將會拋出一個CancelledError的異常。

如果相關回調被完成了且沒有拋出異常,None將會被返回。

add_done_callback(fn)

將可調用對象fn連接到這個 future 上,fn將會在 future 被取消或者結束運行時被調用,而且僅有相關 future 這一個參數。

添加的可調用對象將會以它們被添加的順序來調用,而且總是在添加它們的那個進程的所屬的線程中調用(譯者注,可以參考這段代碼)。如果相關調用fn拋出了一個Exception子類的異常,它將會被記錄和忽略。如果相關調用fn拋出了一個BaseException子類的異常,那麼行為是未定義的。

如果相關的 future 已經被完成了或者取消了,fn將會被立刻調用。

實例

import cPickle as picklenimport requestsnfrom concurrent.futures import ThreadPoolExecutornfrom concurrent import futuresnfrom pymongo import MongoClientnn# 連接資料庫nclient = MongoClient()ndb = client.locncollection = db.mobai1nndef load_url(url, params, timeout, headers=None):n return requests.get(url, params=params, timeout=timeout, headers=headers).json()nndef getloc():n u"""利用高德地圖api獲取上海所有的小區坐標n 搜索-API文檔-開發指南-Web服務 API | 高德地圖APIn """n allloc = []n # 同時最多並發請求為5n with ThreadPoolExecutor(max_workers=5) as executor:n url = http://restapi.amap.com/v3/place/textn param = {n key: 22d6f93f929728c10ed86258653ae14a,n keywords: u小區,n city: 021,n citylimit: true,n output: json,n page: ,n }n # 這邊使用的是executor.submit(),函數是load_url,參數有4個(url,param,timeout,headers)n # 說下param,這裡使用的是`merge_dicts`拼接而成n # headers默認為None,所以這邊並沒有傳n # 為啥`range(1,46)`呢,是因為我看過了所有小區列表返回一共45頁...n future_to_url = {executor.submit(load_url, url, merge_dicts(param, {page: i}), 60): page for i in range(1, 46)}n # 可能有人說,你為什麼不這樣寫呢,看著還清楚。n n def load_url(url, page, timeout, headers=None):n params = {n key: 22d6f93f929728c10ed86258653ae14a,n keywords: u小區,n city: 021,n citylimit: true,n output: json,n page: ,n }n params[page] = pagen return requests.get(url, params=params, timeout=timeout, headers=headers).json()n future_to_url = {executor.submit(load_url, url, i, 60): page for i in range(1, 46)}n 不這樣的寫的原因是因為下面還有一個函數,也使用了`load_url`n n for future in futures.as_completed(future_to_url):n if future.exception() is not None:n print future.exception()n else:n # 存入列表n data = future.result()[pois]n allloc.extend([x[location] for x in data])n # 使用pickle 保存到本地(因為這個一直要用,保存到本地後,就不用每次都重新爬取)n with open(allloc1.pk, wb) as f:n pickle.dump(allloc, f, True)nndef merge_dicts(*dict_args):n un 可以接收1個或多個字典參數n n result = {}n for dictionary in dict_args:n result.update(dictionary)n return resultnndef mobai(loc): n allmobai = []n with ThreadPoolExecutor(max_workers=5) as executor:n url = https://mwx.mobike.com/mobike-api/rent/nearbyBikesInfo.don headers = {n User-Agent: Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Mobile/14E304 MicroMessenger/6.5.7 NetType/WIFI Language/zh_CN,n Content-Type: application/x-www-form-urlencoded,n Referer: https://servicewechat.com/wx80f809371ae33eda/23/page-frame.html,n }n data = {n longitude: ,n latitude: ,n citycode: 021,n }n # 這邊就多了一個headers...其他的和上面的沒有區別n future_to_url = {n executor.submit(load_url, url, merge_dicts(data, {longitude: i.split(,)[0]}, {latitude: i.split(,)[1]}),n 60,headers): url for i in loc}n for future in futures.as_completed(future_to_url):n if future.exception() is not None:n print future.exception()n else:n data = future.result()[object]n allmobai.extend(data)n # 存入mongodbn result = collection.insert_many(data)nif __name__ == __main__:n # 這個運行一次就行.n getloc()n # 讀取保存到本地的值n f = open(allloc.pk,rb)n allloc = pickle.load(f)n f.close()n # 獲取mobai單車的信息.n mobai(allloc)n

總結

看完本編文章,你應該學會「如何使用多線程抓取網頁」

最後所有代碼都在github.com/jin10086/pac

歡迎關注本人的微信公眾號獲取更多Python爬蟲相關的內容

(可以直接搜索「Python爬蟲分享」)


推薦閱讀:

TAG:Python | 爬虫计算机网络 |