python多線程 + 批量插入 資料庫 健壯你的小爬蟲
來自專欄 一隻爬蟲的轉行之路
參考鏈接:
多線程的理解:python 多線程就這麼簡單 - 蟲師 - 博客園
隊列的理解:Python Queue模塊詳解
批量插入資料庫的操作:Python連接MySQL資料庫 - conanwang - 博客園
爬取目標網站:證券時報網-證券時報官方網站,創業板指定信息披露平台,財苑社區
獲取 url,標題以及時間。。。。(很簡單的奧,xpath一下子就可以提取到了)
(這次主要是練習多線程和批處理存入資料庫的,所以只是簡單的解析,獲取頁面元素)
重要思路: 開啟多線程時
1)首先將爬取的url 放入到數據結構的隊列里,保證數據安全。
2)將爬取到的結果,全部存入到一個結果集隊列里,進行下一步的操作。
3)隊列里的get和put方法不要混淆,put是向隊列里添加元素,get是取出或者是踢出並返回這個元素!!!!
第一步: 創建線程以及存儲隊列:
def main(): start_url = Queue.Queue() # 存放url的隊列 result_queue = Queue.Queue() # 結果集隊列 for i in range(1, 3):# 網站分頁 page_url = http://data.stcn.com/list/djsj_%s.shtml % i start_url.put(page_url) # 將值添加到start_url隊列中 # 構建線程 thread_list = [] # 存放線程的容器 for n in range(4): # 一次運行4 個線程 # 創建線程,target調用get_news_url方法,args傳入參數 t_t = threading.Thread(target=get_news_url, args=(start_url, result_queue)) thread_list.append(t_t) for t in thread_list: t.start() # 啟動線程
第二步:解析網頁,獲取目標元素(不多介紹了哈)
def get_news_url(start_url, result_queue): # 在main方法里傳入參數 result = [] while start_url.qsize(): page_url = start_url.get() # 從隊列中取出並返回這個數據 try: response = requests.get(page_url) except Exception as e: print "抓取網頁錯誤,錯誤為:%s" % e return None if response.status_code == 200: selector = etree.HTML(response.text) web_content = selector.xpath(//p[@class="tit"]) for news in web_content: item_result = {} item_result[href] = news.xpath(a/@href)[0] item_result[title] = news.xpath(a/text())[0] item_result[date_news] = news.xpath(span/text())[0] result.append(item_result) if len(result) > 0:#如果result里有數據的話, result_queue.put(result) # put是向結果集 隊列里添加元素result start_url.task_done() #是指這個任務結束 else: time.sleep(5)
第三步: 存,批量插入Mysql資料庫
到這一步我們獲取得到的數據結構是形如這樣的[{},{},{},{}]:
而插入資料庫的關鍵,就是獲取插入的值!!!這裡遍歷出是個字典格式的數據,所以需要用dict的方法獲取元素!!!
核心代碼就兩行!!!!
data = [item.values() for item in result] #遍歷得到每個{}里的values值 cur.executemany(sql2, tuple(data)) #記得一定要轉化成元組 print insert sucessful
多條記錄的插入,需要用executemany(速度哦,真的比之前快好多好多~~~)
不要忘記大前提
鏈接資料庫 和 創建資料庫 和表(這部分 可以手動創建 也可以代碼創建)
代碼鏈接資料庫 和創建 數據表
def save_news_mysql(result): con = MySQLdb.connect(host= 127.0.0.1, user= root, passwd= 123456 ,charset=utf8,port = 3306) cur = con.cursor() sql = create database if not exists cstn_database default charset utf8 cur.execute(sql) con.select_db(cstn_database) # 以上是連接和創建 資料庫 sql = create table if not exists news_cstn+"(id int auto_increment, href varchar(255), title varchar(255), date_news varchar(255), primary key(ID))" cur.execute(sql) # 創建表結構 這部分代碼以後直接在mysql里創建就可以了 sql2 = insert into news_cstn (href,title,date_news) VALUES (%s,%s,%s) data = [item.values() for item in result] cur.executemany(sql2, tuple(data)) print insert sucessful
資料庫名:cstn_database
表名:cstn
欄位:id,href,title,data_news
看似簡單,其實虐我好久丫!!!
敬上完整代碼:
# -*- coding: UTF-8 -*-import requestsfrom lxml import etreeimport csvimport MySQLdbimport xlwtimport Queueimport threadingimport timeimport sysreload(sys)sys.setdefaultencoding(utf-8)# from util.crawler import Header, Proxy 代理 請求頭 我放在另個文件夾# from database.db import Database## con = Database.getConnection() # 連接資料庫# cur = con.cursor() # 游標對象def get_news_url(start_url, result_queue): result = [] while start_url.qsize(): page_url = start_url.get() # 從隊列中移除並返回這個數據 try: response = requests.get(page_url) except Exception as e: print "抓取網頁錯誤,錯誤為:%s" % e return None if response.status_code == 200: selector = etree.HTML(response.text) web_content = selector.xpath(//p[@class="tit"]) for news in web_content: item_result = {} item_result[href] = news.xpath(a/@href)[0] item_result[title] = news.xpath(a/text())[0] item_result[date_news] = news.xpath(span/text())[0] result.append(item_result) if len(result) > 0: result_queue.put(result) # put是向結果集 隊列里添加元素 start_url.task_done() else: time.sleep(5)def save_to_excel(result): workbook = xlwt.Workbook() sheet = workbook.add_sheet(result3) title = [href,title,date] for i ,item in enumerate(title): sheet.write(0, i, item) data = [item.values() for item in result] print data for row, item in enumerate(data): for i, info in enumerate(item): print row+1, i ,info sheet.write(row+1 , i , info) workbook.save(Myresult.xls)def save_news_mysql(result): con = MySQLdb.connect(host= 127.0.0.1, user= root, passwd= 123456 ,charset=utf8,port = 3306) cur = con.cursor() sql = create database if not exists cstn_database default charset utf8 cur.execute(sql) con.select_db(cstn_database) sql = create table if not exists news_cstn+"(id int auto_increment, href varchar(255), title varchar(255), date_news varchar(255), primary key(ID))" cur.execute(sql) # 創建表結構 這部分代碼以後直接在mysql里創建就可以了 sql2 = insert into news_cstn (href,title,date_news) VALUES (%s,%s,%s) data = [item.values() for item in result] cur.executemany(sql2, tuple(data)) print insert sucessful # for elem in result: # 單條插入 # sql = insert into news_cstn (href,title,date_news) VALUES (\%s,\%s,\%s) % (elem[href],elem[title],elem[date_news]) # cur.execute(sql) con.commit() cur.close() con.close()def main(): start_url = Queue.Queue() # 存放url的隊列 result_queue = Queue.Queue() # 結果集隊列 for i in range(1, 3): page_url = http://data.stcn.com/list/djsj_%s.shtml % i start_url.put(page_url) # 將值插入隊列中 # 構建線程 thread_list = [] for n in range(4): # 創建4 個線程 t_t = threading.Thread(target=get_news_url, args=(start_url, result_queue)) # 創建線程,調用get_news_url方法,args傳入參數 thread_list.append(t_t) for t in thread_list: t.start() start_url.join() # 就是當所有的url全部獲取完,放入到結果集里才開始存入資料庫,防止出現 插入資料庫報錯的情況 while result_queue.qsize(): # 返回隊列的大小 save_news_mysql(result_queue.get()) # 將結果存入資料庫中if __name__ == "__main__": main()
推薦閱讀:
※十. 內核線程
※線程間的通訊[基礎]
※使用Spring ThreadPoolTaskExecutor實現多線程任務
※多線程效率測試