標籤:

構建一個比較滿意的分散式掃描平台(上)

構建一個比較滿意的分散式掃描平台(上)

來自專欄嘶吼RoarTalk

0x00、前言

在雲安全內部安全能力建設中,對雲資產的埠掃描是一個必須要做的事情,因為開放一個埠對外提供一個服務都是擴大了您在雲上攻擊面。對於這種危險需要儘早的通知雲上用戶。那麼如何對幾萬甚至幾十萬雲主機做有效的埠掃描和精確的服務識別?這需要一套分散式的掃描系統來支撐。

0x01、實踐的認知

在此之前,做了一些的小實踐

方向3的代碼如下:

# coding=utf-8#!/usr/env/bin python//存儲到redisdef store(result): r=redis.Redis(host=127.0.0.1,port=6379,decode_responses=True,password=xxxx) with open(result,r) as f: for line in f: if line.startswith({ }: try: temp = json.loads(line[:-2]) tmp1=temp["ports"][0] r.append(temp["ip"],str(tmp1["port"])+",") except: continue return r//masscan掃描模塊def Scan(): try: global g_queue while not g_queue.empty(): item = g_queue.get() result = "result"+item+".json" p = subprocess.Popen("/root/masscan/bin/masscan "+item+" -p T:21-23,25,80,81,88,110,143,443,1080,1433,1521,1158,3306-3308,3389,3690,5432,5900,6379,7001,8000,8080,9000,9418,27017-27019,50060,111,11211,2049 -oJ "+result, shell=True) p.wait() if p.returncode==0: print (ok) if os.path.getsize(result) != 0: print item store(result) if g_queue.qsize() == 0: print (u公有雲高危埠掃描結束) return "ok" except Exception,e: print e return eif __name__ == __main__:// ip地址壓入隊列 csvfile2 = file(xxx_public_ip.csv, r) reader = csv.reader(csvfile2) for x in reader: ips = IP(x[0]) for y in ips: g_queue.put(y.strNormal(0))

Nmap掃描模塊

def NmapScan(): try: global g_queue while not g_queue.empty(): item = g_queue.get() filename = item.split( )[1]+"_"+item.split( )[0] result = "result"+filename.strip()+".xml" print result p = subprocess.Popen("/usr/bin/nmap -oX "+result+" -sV -p"+item, shell=True) p.wait() if p.returncode==0: nmap_report = NmapParser.parse_fromfile(result) for scanned_hosts in nmap_report.hosts: print scanned_hosts.address for serv in scanned_hosts.services: if serv.state == "open": m = serv.service_dict.get(extrainfo, ) print m if m.find() != -1: pass else: writer.writerow([scanned_hosts.address,str(serv.port),serv.service,serv.service_dict.get(product, ),serv.service_dict.get(version, ),serv.service_dict.get(extrainfo, )]) print "size = ", g_queue.qsize() if g_queue.qsize() == 0: print (u公網服務指紋掃描結束) return "ok" except Exception,e: print e return e

//從隊列中讀取掃描目標

pool = redis.ConnectionPool(host=127.0.0.1,port=6379,decode_responses=True,password=xxxx,db=0) r = redis.Redis(connection_pool=pool) pipe = r.pipeline() pipe_size = 100000 len = 0 key_list = [] print r.pipeline() keys = r.keys() for key in keys: key_list.append(key) pipe.get(key) if len < pipe_size: len += 1 else: for (k, v) in zip(key_list, pipe.execute()): print k, v len = 0 key_list = [] for (k, v) in zip(key_list, pipe.execute()): for sport in v.split(,): if sport!=: item = sport+" "+k print item g_queue.put(item)

還是出現阻塞的情況。

不得不要從架構上考慮一下了。

0x02 分散式掃描架構探討

思維是循序漸進的,否則你在認知的路上永遠是抄襲別人,但是不知道為什麼這麼做。其實核心需要解決的是阻塞的事情,還有通過分散式擴展計算和帶寬的能力以滿足大規模掃描的安全能力。

思考1:阻塞問題那通過非同步IO處理

1、web控制層

·使用Nginx搭建,解析VUE生成的html文件。做界面展示

·使用uWSGI搭建web中間件伺服器,解析Django開發的python程序,做OpenAPI層。

·資料庫使用postgresql。存儲掃描配置掃描任務等信息

2、Web數據展示層

·底層掃描數據直接通過logstash進入Elastic Search中,所有的報表都都通過API訪問ES,這樣方便做變化對比。

3、任務調度層

· 消息隊列使用自建的Redis做。

·非同步IO使用tornado做。

4、Worker執行者

·控制參數通過redis獲取

·掃描插件使用python編寫,那麼對於埠掃描其實就兩個一個masscan和nmapscan插件。

·掃描結果通過logstash傳入到Elastic Search

5、數據層

打算用kafka實現,這個要看會不會有突然大量數據湧入到ES中。

思考2:有沒有成熟的組件完成上述事情呢?

最近在朋友圈傳的百度陸奇《如何成為一名優秀的工程師》提到了上述觀點,然後我豁然開朗,非同步IO處理,在Django生態中有celery呀。它其實就是把redis+tornado的事做了。同時也借鑒了github同行的Demo程序,確實非同步任務處理起來比較麻煩,為什麼不用成熟的組件呢?

最終的技術架構圖:

當然要考慮架構ROI投入產出比,後期會加入web漏洞掃描、web指紋識別、暴力破解、子域名暴力破解、目錄文件列舉、Web爬蟲、SQLi、XSS這些耗時長的任務。

0x03 總結

本篇主要討論了架構設計方便的問題,下篇會著重在系統搭建、代碼實現等方面說明分散式掃描系統的那些事。

本文為 bt0sea 原創稿件,授權嘶吼獨家發布, 如若轉載,請註明原文地址: 4hou.com/technology/117 更多內容請關注「嘶吼專業版」——Pro4hou

推薦閱讀:

11月安卓系統漏洞小結:31個不得不修復的漏洞
超市刷卡輸密碼時旁邊一個大叔一直盯著我看,我就看了他兩眼,他好像認為我把他當賊了,我做錯了嗎?
TRITON惡意軟體攻擊工業安全系統
農夫安全,依然在路上
「中國菜刀」出海:有人用它從澳大利亞軍方偷了30GB絕密數據

TAG:信息安全 |