分散式系統經典論文概述(1)
此系列的論文來自於MIT分散式系統課程6.824所給出的paper list;具體到每一篇論文的概述側重點可能依個人的關注點而有所偏差,基本目的是,講清楚該論文所介紹的概念所屬的概念層級,對一些具體的細節可能會略過。
PS: 概述僅是筆者作為一個入門選手自學過程中的小筆記,所以大概率會有疏漏偏差,歡迎討論:)
本篇中所要概述的論文為如下三篇:
- MapReduce: Simplified Data Processing on Large Clusters
- The Google File System
- The Design of a Practical System for Fault-Tolerant Virtual Machines
MapReduce
首先是一些概述:
概念層級:
A programming model & an associated implementation for processing and generating large data sets;
即一種編程思路;同時Google實現了對應的對這種思路提供支持的可以供人實現並發編程的工具;
目的:
想以通用的平台解決分散式計算代碼中要解決的how-tos:(hide the messy details in a generalized framework)
- parallelize the computation
- distribute the data
- handle failuresload balancing
用戶只需提供map function & reduce function, map-reduce會幫其解決背後的如上問題;
一些細節:
Map & Reduce:
Inspired by the map and reduce primitives in Lisp;
input & output: key/value pairs
- Map: input pairs -> intermediate pairs;
- map((k1, v1)) -> list((k2, v2))
- (map-reduce library will do the grouping & passing work)
- Reduce: intermediate pairs with certain intermediate key value -> merge together -> smaller set of values;
- reduce((k2, list(v2))) -> list(v2)
Implementation
根據具體系統構成的不同可以有不同的實現;
論文中所實現的系統的物理描述
Google給出了其針對large clusters of commodity PCs connected together with switched Ethernet (特點:單機2~4G內存,常速網路,頻繁machine failure,存儲器為直接與單獨機器相連的IDE disk,文件系統:GFS)的實現;
流程
1. Split input & invoke copies of program
- MapReduce library splits the input files into M pieces (typically 16~64MB);
- Starts up many copies on clusters of machines; (fork) (One special master)
2. Master assigns works to idle workers.
- M map tasks, R reduce works (reduce works are distributed by partitioning the intermediate key space)
- Map workers: reads from corresponding input splits; parse k/v pairs, pass to map function, buffer the results in memory; Periodically write the buffered pairs into local disk & pass the locations back to master;
- Reduce workers: notified by the master about the locations of corresponding intermediate results; use RPCs to read data from map workers; Sort by intermediate keys(many keys may be assigned to the same reduce worker); Pass each key & its value lists to Reduce function; Append the output of reduce function.
3. Work finished: return to user program.
- output: R file: each reduce work one output file.
Data structure:
- For task: the state of task; worker ID;
- For transmitting data: locations & sizes of R(may < R if not contain certain keys) intermediate file regions of one single map task;
- Update when: map task finished;
- Push incrementally when: worker still have in-progress reduce task;
細節
- fault tolerance:
- worker failure: 周期性的ping以確認存活; -> 由於map結果存在單節點上,故單節點fail後其所完成的map task都需redo;
- master failure: trivial - 不常發生,發生重啟即可;—— periodical checkpoints;
- locality:
- master分配任務時儘可能就近;
- task granularity:
- master負擔:make O(M+R) scheduling decision; store O(M*R) states (~one byte of data per map/reduce pair):粒度越粗越好;
- load balancing & recovery speed up: 粒度越細越好;
- backup task:
- 遇到(因為某些原因而)拖後腿的機器時,可以直接分配另一個空閑worker也做這個任務;可以根據實際需求進行調整(「拖後腿」的判據之類
Refinements
- 關注實際應用場景,巧妙哈希
- reduce自帶sorting
- Combiner Function: map worker 先儘可能reduce一下結果,(比如Word counting時,傳一堆「我,1」 ——> 傳 「我,100」)
- basically 就是reduce function在map先做一下;
- difference:combiner function的輸出寫入intermediate file, 而reduce function的輸出寫入最終的output file。
- 跳過壞記錄:mapreduce library會跳過會引發deterministic crashes的record。
the Google File System
概念層級
A scalable distrubuted file system for large distributed data-intensive applications;
為什麼,要做什麼
* 傳統FS的需求: performance, scalability, reliability, availability;
* 新的變化:
- Component failure: from exception to norm; [We need constant monitoring, error detection, fault tolerance, automatic recovery]
- Data size: Traditional huge file (~GB) to small blocks (~KB) (working with fast growing, TB size, comprising billions of objects, data)
- File mutation: Appending rather than overwriting, seldom random writes - once written, only read, often only sequentially;
- Examples: mapreduce 里的reduce task的輸入等;
- appending: focus; atomicity guarantee;
Design
# Assumptions
* 物理組成:低價機器組成的大規模集群;
* 支持文件規模:modest number of large files; millions of ~100MB ~ GB files; 亦支持小文件,但不太care;
* workload 種類:
- large streaming read(~1MB, contiguous region of a file) & small random reads (~KB, random offset)
- large sequential writes (append, ~1MB), seldom modify / small arbitrary writes
* 並發支持:解決多client同時append的問題; Atomicity with minimal synchronization overhead is essential.
* High sustained bandwidth > low latency; 帶寬比延時重要;
# Interface Provided:
* FS Interface: create, delete, open, close, read, write
* More:
- snapshot: create a copy of a file / directory at low cost
- Record append: 支持並發的append,確保atomicity of each clients append
# Architecture:
* Single master:
- 元數據維護器:namespace, 訪問控制、文件-塊映射,塊地址維護;
- 系統級活動:垃圾回收、負載均衡...
- 與client聯繫,告知client去哪拿數據;
* multiple chunkservers;
- Chunks: fixed-sized;
- chunk handle: assigned by master, identify the chunk;
- Store chunk on local disk as Linux files;
- read & write specified by chunk handle & byte range;
- each chunk is replicated on multiple chunck servers;
read流程
- *應用* 將所需的文件名&byte offset轉換為文件名&chunk index,給master
- master返回chunk handle & locations of the replicas
- client, cache this information;
- client contact (likely the closest one) replica chunck server;
- chunk server reply with data;
數據布局的考量
- 64MB chunk size: 減少master與client交互次數;減少client與chunk server非data交互開銷;減少元數據規模(甚至可以讓master直接把元數據都放在內存中)
- 然而塊太大會帶來碎片化的問題——但由於前期假設,該影響問題不大
- 放新chunk時: :1.優先考慮存儲利用率低於平均水平的結點;2.限制單個結點同時創建副本的數量;3.副本盡量跨子網。
master的元數據;
1. file & chunk namespace
2. mapping from file to chunks *whats the difference between 1 & 2?*
3. locations of each chunks replicas
- 1&2會被放置於persistent storage中;——用log來記錄每一次的關鍵變化。一方面防止master宕機,一方面定義操作的先後順序;
- master僅周期性的輪詢chunk他們有的東西(3);——沒必要讓master一直維護這個信息...太難
- 無目錄樹結構:represents its namespace as a lookup table mapping full pathnames to metadata.
- 鎖機制:讀寫某目錄樹下的某個地方時:需要所有上級目錄的讀鎖和相關目錄的讀/寫鎖;File creation does not require a write lock on the parent directory because there is no 「directory」, or inode-like, data structure to be protected from modification.
- The read lock on the directory name suffices to prevent the directory from being deleted, renamed, or snapshotted.
- The write locks on file names serialize attempts to create a file with the same name twice.
緩存與預取
client會緩存元數據;master會預取臨近元數據一併發給client;
fault tolerance
- 數據完整性:
- 校驗和檢查錯誤:每chunk劃分為64KBblock,每block一個32位校驗和;分開存儲(校驗和也是元數據...);chunk server會查;如果不匹配,報告master,master會另複製新的replica並讓錯誤副本刪除;
- 一致性(如何並發寫):同一數據塊的所有副本之間,有一個primary的副本,擁有該數據塊的「租約」,決定寫操作的順序;
- 寫請求
1. client需要更新一個數據塊,詢問master誰擁有該數據塊的租約(誰是primary);
2. master將持有租約的primary和其它副本的位置告知client,client緩存之;
3. client向所有副本傳輸數據,這裡副本沒有先後順序,根據網路拓撲情況找出最短路徑,數據從client出發沿著路徑流向各個chunkserver,這個過程採用流水線(網路和存儲並行)。chunkserver將數據放到LRU緩存;
4. 一旦所有的副本都確定接受數據,client向primary發送寫請求,primary為這個前面接受到的數據分配序列號(primary為所有的寫操作分配連續的序列號表示先後順序),並且按照順序執行數據更新;
5. primary將寫請求發送給其它副本,每個副本都按照primary確定的順序執行更新;
6. 其它副本向primary彙報操作情況;
7. primary回復client操作情況,任何副本錯誤都導致此次請求失敗,並且此時副本處於不一致狀態(寫操作完成情況不一樣)。client會嘗試幾次3到7的步驟,實在不行就只能重頭來過了。
- 寫請求跨越chunk時可能會undefined:不同副本間定序沒問題,副本是一樣的,但是應用間寫可能交叉,導致不清楚寫的什麼玩意;
- Record append: 由於可能的副本失敗,可能會導致各副本之間的偏移位置不同;但總歸是有一個完全一致的(偏移量、數據一致的), atomically at least once;
Virtual machine fault tolerance
做什麼
primary/backup方式實現Fault Tolerance, backup完全複製primary;
如何做到
完全傳遞所有狀態?——不可取,需要太大帶寬;
因此:將servers建模為deterministic state machine,起自完全相同的initial state,經過完全相同順序的相同操作。
然而一些操作是不確定性的(如讀取時鐘)——必須同步這些不確定操作的結果;
因此,在建模為state machine的基礎上,primary和backup之間傳遞deterministic operation + non-deterministic operations result;
why VM?
物理機上做太難了,尤其是處理器頻率較高時;
但VM正好可以被看做是一個well-defined state machine whose operations are the operations of the machine being vertualized. —— VM的hypervisor可以將所有operation 捕捉下來,並且很方便地在另一個VM上replay;
VM deterministic replay
deterministic replay: [ReTrace: Collecting Execution Traces with Virtual Machine Deterministic Replay.]
VMWare此前的一篇論文,介紹了VM ReTrace的技術,該技術支持對一個VM所執行操作進行記錄,並將其在另一個VM上replay。此文的工作是基於該技術的基礎上的,但討論的重點在架構和協議;
Architecture
- 兩VM在不同的物理機上;
- 在有一個小的time lag的情況下保持sync;
- 「the two VMs are in virual lockstep」;
- 共享儲存 (non-shared virtual disks的情況也會討論);
- 外部僅可見primary VM;
- primary與backup通過logging channel聯繫;
- 存活確認:heartbeating & logging channel traffic monitoring
Deterministic Replay implementation
- 一個VM 所可能執行的operation(需要複製的operation):
- deterministic events / operations: incoming network packets, disk reads, input from keyboard & mouse...
- non-deterministic events / operations: virtual interrupts, reading the clock cycle counter of the processor...
- Deterministic Replay implementation 所可以做到的:
- 記錄下某VM所有的input和與VM執行相關的不確定性因素至Log中;
- 不確定因素:中斷髮生時執行到的指令位置啊什麼的;
- 根據該log,該VM的所有操作都可完美復現;
- ...是不是感覺什麼都沒說?如果想仔細了解,需要看看DRI的paper咯;
FT Protocol
雖然說整體簡單來講就是 將利用DRI生成的log通過logging channel傳遞給backup,backup直接執行就好;——但其中還是需要一個protocol的...(如何ack啊什麼的)
總之需要達到的目的:所謂*output requirement*,即backup VM 在primary fail之後take over, 該backup 會以與primary之前所給外界傳遞的output完全一致的執行方式執行下去;(儘管backup 與 primary在將來對某些事情的抉擇不同,但他們的歷史是相同的,不管是確定性事件還是非確定性事件)
實現方式:在每個output處創立一個special log entry然後確保如下*output rule* :primary在backup對與某個output相關聯的special log entry進行ACK之後,才可將該output發送給外界;
感覺上是,需要確保每一次的output都一樣,但不一定確保每個output只輸出一次...【畢竟這個好像沒法保證...所以用特殊的output log來說一下,我在這個點output了,來確保backup在take over之後,執行到output前,到output時,不會再有特殊的事件發生。——重複output相同的結果也大丈夫; *如果不用兩階段提交的事務處理方法時,primary想要發送一個output,一個backup是不能判斷primary是在output前掛的還是後掛的* ——但實際上丟包/重複包並不會有大問題——應用(計算機網路啊什麼的)本身已有處理丟包/重複包的機制;
當然,不是停等output: primary在等backup關於output log的ACK時,可以只是單純地推遲output的輸出到外界,其他的操作還是可以繼續執行的;
In case a failure occurs
* heartbeating + 雙向logging監控(log+ack,不會停,因為有regular timer interrupts);
* split brain?:利用共享存儲來解決:當一個VM要go live時,先在shared storage上進行"atomic test-and-set"動作;
Implementation details
# starting & restarting
用VMware vSphere的內置功能...VMware VMotion: allows the migration of a running VM from one server to another. (less than a second) Revised for: 1.首先不destroy原有的VM, 2.同時創建logging channel,3.
# log buffer
primary和backup都有buffer;
- primary儘快把東西出去;
- backup一旦從network中讀log到buffer中,即回復ack
- primary遇到滿buffer:停止對外響應;——會影響client;但也沒有辦法呀,只有保證backup儘可能地跟上;
- 為primary提供一套反饋機制,通過觀察lag time來調整宿主機上CPU limit以保證primary和backup基本一致速度;
# operation
* 一些控制向的operation也應當都做,比如說power off,increase CPU share等等;
* Only VMotion(VM 遷移)可以在兩者之間獨立進行;
# 關於I/Os
* disk IO:
- 並發硬碟讀寫會帶來不確定性————解決方案:找到IO race, 強行讓primary和backup以相同的順序線性執行;
- disk operation 與 memory access race. page protection... 這部分其實不太懂
# non-shared disk
primary&backup不是共享硬碟時:
- 首先disk變為VM的內部狀態,因此,disk writes不再需要special log entry;
- split brain:需要第三方決定;
# 問:shared disk時backup要親自讀數據嗎?
* 讀:減小logging channel上的負載(不自己讀就得別人傳給你)
* 讀:會降低backup的效率;backup本身要等;backup fail了還要重試;primary fail了還要把fail了之後髒的memory給backup以保證完全的一致;
* 讀:primary讀完之後迅速寫了,落後的backup讀到的是新數據...否則就要停等,還蠻麻煩;
需要自行根據具體情況發揮...
推薦閱讀:
※[OSDI] Large-scale Incremental Processing Using Distributed Transactions and Notifications
※可驗證隨機函數VRF之Algorand演算法
※分散式資料庫中為什麼要使用 Vector Clock?
※Zeppelin:一個分散式KV存儲平台之存儲節點
※bifrost : Rust 下的分散式系統框架
TAG:分布式系统 |