標籤:

以溯源為目的蜜罐系統建設

0x00、前言&思路

一直以來,蜜罐系統建立對溯源系統來說有雞肋之嫌,基於公網的蜜罐系統最早的產品形式是: wooyun當時做的一套蜜罐系統,主要是模擬底層服務弱口令(ssh、rdp、telnet)、資料庫服務弱口令(mysql、postgresql、redis)、web應用(nginx、apache、tomcat等)、應用漏洞(OpenSSL心臟出血、Java RMI 反序列化代碼執行、Struts2遠程代碼執行等)但是,這種主要是針對具有掃描器功能漏洞滲透自動化程序其作用,獲取攻擊源IP地址,獲取植入惡意樣本,然後沙盒運行樣本做C&C地址分析。但是為了獲取蜜罐樣本的全面性,我們需要投入大量的基礎設施建設成本,同時針對,我們需要DDoS定向溯源的地址還不一定有幫助。

那有沒有更好的方式方法,可以第一時間發現DDoS呢?我們能不能通過一種更簡便更輕資產的辦法獲得目前公網存活的惡意樣本?根據我們的DDoS溯源的以往的經驗上看,很多黑客喜歡通過http共享惡意文件方式,HFS是他們首選,,那我們是不是可以把全網HFS掃描出來,然後把這些惡意程序放到一個養雞場,當說發起DDoS攻擊的時候,我們養雞場中的Agent也接到攻擊命令,我們就知道哪個C&C Server發起功能,攻擊的目標是誰。在這裡我們引入一個概念:養雞場,就是把認為是惡意樣本都放到一起,正常飼養起來。

0x01、概念驗證&PoC

完成以上想法需要業務模塊:

1. 快速識別全國HFS伺服器

(1)現有網路空間搜索引擎系統調研

① shodan

a. 免費版只能翻兩頁 20條記錄。

② censys

a. 是免費也提供API操作,但是準確性存疑,免費的東西。。。

③ zoomeye

a. 要求實名註冊,國內的限制,這個。。。

④ FOFA

a. 專業版1個月3000提供API,在線查詢10000條/次,基礎版在線1000條。窮。。。

(2)掃描技術原理調研

① masscan

a. banner識別有限

② zmap+grab

a. censys就使用這種方式,結果不準確

③ nmap

a. 服務指紋識別準確但是速度是短板

④ Zoomeye

a. 前端:vue

b. 後端:Django

c. 資料庫:MongoDB+Elasticsearch

d. 掃描Worker/調度:Lucifer,

e. 組件指紋識別引擎Wmap/Xmap

⑤ 某朋友的掃描系統

a. 客戶端:masscan+nmap+nginx+redis+haproxy

b. 伺服器端:Elasticsearch+ celery + django+ spark

(3)我的解決方案

① 目前系統處在驗證階段先簡單架構,先掃描一下國內IP端。

② 埠掃描:masscan

③ 服務指紋識別:nmap

④ 資料庫:postgresql

⑤ 惡意文件驗證:下載HFS伺服器文件通過API發送virustotal校驗。

2. 養雞場部署

(1)蜜罐系統選擇

① 更少的資源跑更多的蜜罐(杜鵑)

(2)外網監控

① IDS監控IP和DNS對外連接情況(Suricata+ES)

(3)DDoS監控

① 外網發包pps監控告警

0x02、代碼實現

在這之前寫過一篇有關掃描器的文章(Freebuf bt0sea),當時的需求只是監控本公司IP段高危埠連接。沒幾個B段。但是現在監控的是全中國IP地址段,這有點多,直接nmap猴年馬月也掃不完呀。

Problem 1:如果準確的拿到國內IP段

主要第一次做先把範圍縮小一點。

Apanic提供了每日更新的亞太地區IPv4,IPv6,AS號分配的信息表,訪問url是ftp.apnic.net/apnic/sta我們過濾其中的ipv4數據即可。

apnic|CN|ipv4|42.156.64.0|16384|20110412|allocatedapnic|CN|ipv4|42.156.128.0|32768|20110316|allocatedapnic|CN|ipv4|42.157.0.0|65536|20110316|allocatedapnic|CN|ipv4|42.158.0.0|65536|20110224|allocatedapnic|CN|ipv4|42.159.0.0|65536|20110224|allocatedapnic|CN|ipv4|42.160.0.0|1048576|20110216|allocatedapnic|CN|ipv4|42.176.0.0|524288|20110222|allocatedapnic|CN|ipv4|42.184.0.0|131072|20110228|allocatedapnic|CN|ipv4|42.186.0.0|65536|20110316|allocatedapnic|CN|ipv4|42.187.0.0|16384|20110412|allocateddef getip(f,ipinfo=None): if ipinfo is None: ipinfo = [] for i in (f): if i.find("apnic") > -1 and i.find("ipv4") > -1 and i.find("summary") == -1: iplist = i.split("|") country = iplist[1] if country == CN: ipinfo.append(i) return(ipinfo)

然後入庫sqlite。

Problem 2:埠掃描

啟動masscan 30個進程

def Scan(): try: global g_queue while not g_queue.empty(): item = g_queue.get() result = "result"+item+".json" p = subprocess.Popen("/root/masscan/masscan "+item+" -p80 -oJ "+result, shell=True) p.wait() if g_queue.qsize() == 0: print (u公網IP埠掃描結束) return "ok" except Exception,e: print e return e if __name__ == __main__: reload(sys) sys.setdefaultencoding(utf-8) listThread = [] ### 獲取ip列表加入全局變數中 conn = sqlite3.connect("ipwhois.db") cur = conn.cursor() print "Opened database successfully" cursor = cur.execute("SELECT inetnum from ipwhois") for row in cursor: tmpstring = re.split( - ,row[0]) # print tmpstring[0], tmpstring[1], tmp=tmpstring[0]+-+tmpstring[1] g_queue.put(tmp) print tmp conn.close() for i in xrange(g_threadNum): thread = ScanThread(Scan) thread.start() listThread.append(thread) for thread in listThread: thread.join() print thread

一共大約3億2000萬IP,當然沒計算已經取消分配的IP段。大約掃描了3天時間。需要優化,我發現有的一個進程掃描一個/8的ip段需要一天的時間,2000多萬IP,所以可以切割成多個B段進行掃描。

Problem 3:服務指紋識別掃描

(1)提取IP

def store(result): fw = open(iplist.txt, a) with open(result,r) as f: for line in f: if line.startswith({ }: try: temp = json.loads(line[:-2]) fw.writelines(temp["ip"]+n) print temp["ip"] except: continue def list_dir_names(urPath): dirNames = os.listdir(urPath) for ones in dirNames: ones.decode(gbk) return dirNames if __name__ == __main__: reload(sys) sys.setdefaultencoding(utf-8) filelist=[] filelist=glob.glob("*.json") for item in filelist: if os.path.getsize(item)!=0: print item store(item) else: continue

提取後大約開80埠IP為4685409。

def Scan(): try: global g_queue while not g_queue.empty(): item = g_queue.get() result = "result"+item.strip()+".xml" p = subprocess.Popen("/usr/bin/nmap -oX "+result+" -sV -p80 "+item, shell=True) p.wait() print "size = ", g_queue.qsize() if g_queue.qsize() == 0: print (u公網IP掃描結束) return "ok" except Exception,e: print e return e def ImportiIPlist(): storejson_file=open(iplist.txt,rb) lines = storejson_file.readlines() for line in lines: print line g_queue.put(line) g_totalsize = g_queue.qsize() print g_totalsize if __name__ == __main__: reload(sys) sys.setdefaultencoding(utf-8) listThread = [] ImportiIPlist() for i in xrange(g_threadNum): thread = ScanThread(Scan) thread.start() listThread.append(thread) for thread in listThread: thread.join() print thread

掃描大約5-6天時間。

Problem 4:掃描結果入庫

def Scan(): try: timeout = 300 tableName = "%s_%s" % ("scanresult", time.strftime("%Y%m%d")) conn1 = psycopg2.connect(database="postgres", user="postgres", password="Qwe123!@#", host="127.0.0.1", port="5432") cur1 = conn1.cursor() g_totalsize = g_queue.qsize() while not g_queue.empty(): item = g_queue.get() print item ctime = strftime("%Y-%m-%d %H:%M:%S", gmtime()) print ctime nmap_report = NmapParser.parse_fromfile(item) for scanned_hosts in nmap_report.hosts: print scanned_hosts.address for serv in scanned_hosts.services: if serv.state == "open": m = serv.service_dict.get(extrainfo, ) print m if m.find()!=-1: pass else: sql = "insert into %s (task_id,ctime, address,port,service,product,product_version,product_extrainfo,os) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)" sqlCmd = sql%(tableName,g_task_id,ctime,scanned_hosts.address,str(serv.port),serv.service,serv.service_dict.get(product, ),serv.service_dict.get(version, ),serv.service_dict.get(extrainfo, ),NULL) print sqlCmd cur1.execute(sqlCmd) conn1.commit() print "size = ", g_queue.qsize() g_size = g_queue.qsize() num = "%.2f" %(float(g_size)/float(g_totalsize)) if g_queue.qsize() == 0: print (u公網IP掃描結束) return "ok" except Exception,e: print e pass def CreateTable(): curC = connC.cursor() sqlCreate = "create table if not exists %s ( task_id TEXT, ctime TEXT, address TEXT, port TEXT, service TEXT, product TEXT , product_version TEXT, product_extrainfo TEXT, os TEXT )" tableName = "%s_%s"%("scanresult", time.strftime("%Y%m%d")) sqlCmd = sqlCreate%tableName curC.execute(sqlCmd) curC.close() connC.commit() connC.close() def SetTask(): filelist = [] filelist = glob.glob("*.xml") for item in filelist: if os.path.getsize(item) != 0: print item g_queue.put(item) else: continue def main(): listThread = [] SetTask() CreateTable() for i in xrange(g_threadNum): thread = ScanThread(Scan) thread.start() listThread.append(thread) for thread in listThread: thread.join() print thread if __name__ == __main__: reload(sys) sys.setdefaultencoding(utf-8) main()

SELECT DISTINCT(address),port,product,product_version,product_extrainfo FROM scanresult_20171231 WHERE product LIKE %HttpFileServer%

以下是部分結果

Virustotal

威脅情報查詢結果(360):

對外發包監控用開源IDS就好。

0x02、總結

通過以上的概念驗證,我們可以獲得我們自己想到的樣本。只要稍微整理一下驗證的代碼。就可以快速建立起一個養雞場,實時監控DDOS趨勢,為DDoS溯源添磚加瓦。

本文為 bt0sea 原創稿件,授權嘶吼獨家發布,未經許可禁止轉載,如若轉載,請聯繫嘶吼編輯:4hou.com/technology/968 更多內容請關注「嘶吼專業版」——Pro4hou

推薦閱讀:

記一枚可能被誇大的「數百萬物聯網設備遠程劫持」漏洞
解密:全球超過400個知名網站正在記錄你的每一個擊鍵過程
凱悅酒店再次泄露用戶數據,中國區域酒店受傷最深
參加 DEF CON、Black Hat 這樣的大會是一種怎樣的體驗?
簡單的事情,你都不如別人用心,最後拿何來競爭?

TAG:信息安全 |