以生產消費者模型為基礎用原生協程的爬虎撲步行街前100頁的爬蟲
#!/usr/bin/env python# -*- coding: utf-8 -*-import uvloopimport asyncioimport aiohttpfrom pyquery import PyQuery as pqimport codecsimport jsonpost_lists = []async def get_post_url(client, main_url_queue, queue): # print(url) 得到每一頁的所有鏈接 while True: url = await main_url_queue.get() print(url) try: if url["retries"] < 3: async with client.get(url["url"]) as response: assert response.status == 200 body = await response.text(encoding="utf-8") doc = pq(body) links_item = doc(table[id="pl"]).find( tbody).find(tr[mid]) for link_item in links_item.items(): post_link = link_item.find( td.p_title).find(a).attr(href) if "html" not in post_link: continue post_link = "https://bbs.hupu.com" + post_link post_link_dict = { "url": post_link, "retries": 0 } await queue.put(post_link_dict) except AssertionError: if 200 < response.status <= 400: url["retries"] += 1 main_url_queue.put_nowait(url) finally: main_url_queue.task_done()async def get_post_info(client, queue): 根據鏈接得到標題 作者 發帖時間等內容 while True: url = await queue.get() try: if url["retries"] < 3: async with client.get(url["url"]) as response: assert response.status == 200 body = await response.text(encoding="utf-8") print(url) post_lists.append(url) post_info = paser_post(body) if post_info is not None: post_info["url"] = url["url"] print(post_info) post_lists.append(post_info) except AssertionError: if 200 < response.status <= 400: url["retries"] += 1 queue.put_nowait(url) finally: queue.task_done()def paser_post(html): 解析帖子內容 post_info = {} doc = pq(html) main_post = doc(div#tpc) post_author = main_post.find(div.author a.u).text() post_time = main_post.find(div.author span.stime).text() post_title = doc(h1#j_data).text() post_info["title"] = post_title post_info[time] = post_time post_info[author] = post_author if not post_title and not post_time and not post_author: return None return post_infoasync def run(): main_url_queue = asyncio.Queue() queue = asyncio.Queue() # 前100頁url列表 async with aiohttp.ClientSession() as client: urls = [] for i in range(1, 101): url_dict = { "url": "https://bbs.hupu.com/bxj-postdate-{}".format(i), "retries": 0 } urls.append(url_dict) await asyncio.wait( [main_url_queue.put(url) for url in urls]) # 開啟1000個協程的消費者協程 workers1 = [loop.create_task(get_post_info(client, queue)) for i in range(1000)] # 開啟得到url的生產者協程, 與消費者協程通過queue通信. workers2 = [loop.create_task(get_post_url( client, main_url_queue, queue)) for i in range(100)] # 堵塞直到所有鏈接分析完成 await main_url_queue.join() await queue.join() print(1) # 終止消費者協程的無限循環 for consumer in workers1 + workers2: consumer.cancel()loop = uvloop.new_event_loop()asyncio.set_event_loop(loop)loop.run_until_complete(run())loop.close()post_dicts = {"posts": post_lists, "lenth": len(post_lists)}with codecs.open("post.json", "w", "utf-8") as f: f.write(json.dumps(post_dicts, indent=True))
不解析和存儲的花, 用大概60秒,
增加解析和存儲用344秒.
我的能力就到這裡了.
求大神優化.
直接把網頁源碼存到redis裡面, 開始速度可以, 後面就很慢了.
推薦閱讀:
※爆款遊戲《貪吃蛇大作戰》的 Python 實現
※python高效編程實踐-如何為元組中的每個元素命名, 提高程序可讀性(2/50)
※python多進程進程間通信疑問,求大神指教?(主進程獲取不到子進程變數)
※怎樣才能寫出pythonic的代碼?
※python中通過RF預測紅酒質量初探