標籤:

分散式系統經典論文概述(1)

此系列的論文來自於MIT分散式系統課程6.824所給出的paper list;具體到每一篇論文的概述側重點可能依個人的關注點而有所偏差,基本目的是,講清楚該論文所介紹的概念所屬的概念層級,對一些具體的細節可能會略過。

PS: 概述僅是筆者作為一個入門選手自學過程中的小筆記,所以大概率會有疏漏偏差,歡迎討論:)

本篇中所要概述的論文為如下三篇:

  • MapReduce: Simplified Data Processing on Large Clusters
  • The Google File System
  • The Design of a Practical System for Fault-Tolerant Virtual Machines

MapReduce

首先是一些概述:

概念層級:

A programming model & an associated implementation for processing and generating large data sets;

即一種編程思路;同時Google實現了對應的對這種思路提供支持的可以供人實現並發編程的工具;

目的:

想以通用的平台解決分散式計算代碼中要解決的how-tos:(hide the messy details in a generalized framework)

  • parallelize the computation
  • distribute the data
  • handle failuresload balancing

用戶只需提供map function & reduce function, map-reduce會幫其解決背後的如上問題;


一些細節:

Map & Reduce:

Inspired by the map and reduce primitives in Lisp;

input & output: key/value pairs

  • Map: input pairs -> intermediate pairs;

- map((k1, v1)) -> list((k2, v2))

  • (map-reduce library will do the grouping & passing work)
  • Reduce: intermediate pairs with certain intermediate key value -> merge together -> smaller set of values;

- reduce((k2, list(v2))) -> list(v2)

Implementation

根據具體系統構成的不同可以有不同的實現;

論文中所實現的系統的物理描述

Google給出了其針對large clusters of commodity PCs connected together with switched Ethernet (特點:單機2~4G內存,常速網路,頻繁machine failure,存儲器為直接與單獨機器相連的IDE disk,文件系統:GFS)的實現;

流程

1. Split input & invoke copies of program

- MapReduce library splits the input files into M pieces (typically 16~64MB);

- Starts up many copies on clusters of machines; (fork) (One special master)

2. Master assigns works to idle workers.

- M map tasks, R reduce works (reduce works are distributed by partitioning the intermediate key space)

- Map workers: reads from corresponding input splits; parse k/v pairs, pass to map function, buffer the results in memory; Periodically write the buffered pairs into local disk & pass the locations back to master;

- Reduce workers: notified by the master about the locations of corresponding intermediate results; use RPCs to read data from map workers; Sort by intermediate keys(many keys may be assigned to the same reduce worker); Pass each key & its value lists to Reduce function; Append the output of reduce function.

3. Work finished: return to user program.

- output: R file: each reduce work one output file.

Data structure:

  • For task: the state of task; worker ID;
  • For transmitting data: locations & sizes of R(may < R if not contain certain keys) intermediate file regions of one single map task;

- Update when: map task finished;

- Push incrementally when: worker still have in-progress reduce task;

細節

  • fault tolerance:

- worker failure: 周期性的ping以確認存活; -> 由於map結果存在單節點上,故單節點fail後其所完成的map task都需redo;

- master failure: trivial - 不常發生,發生重啟即可;—— periodical checkpoints;

  • locality:

- master分配任務時儘可能就近;

  • task granularity:

- master負擔:make O(M+R) scheduling decision; store O(M*R) states (~one byte of data per map/reduce pair):粒度越粗越好;

- load balancing & recovery speed up: 粒度越細越好;

  • backup task:

- 遇到(因為某些原因而)拖後腿的機器時,可以直接分配另一個空閑worker也做這個任務;可以根據實際需求進行調整(「拖後腿」的判據之類

Refinements

  • 關注實際應用場景,巧妙哈希
  • reduce自帶sorting
  • Combiner Function: map worker 先儘可能reduce一下結果,(比如Word counting時,傳一堆「我,1」 ——> 傳 「我,100」)

- basically 就是reduce function在map先做一下;

- difference:combiner function的輸出寫入intermediate file, 而reduce function的輸出寫入最終的output file。

  • 跳過壞記錄:mapreduce library會跳過會引發deterministic crashes的record。

the Google File System

概念層級

A scalable distrubuted file system for large distributed data-intensive applications;

為什麼,要做什麼

* 傳統FS的需求: performance, scalability, reliability, availability;

* 新的變化:

- Component failure: from exception to norm; [We need constant monitoring, error detection, fault tolerance, automatic recovery]

- Data size: Traditional huge file (~GB) to small blocks (~KB) (working with fast growing, TB size, comprising billions of objects, data)

- File mutation: Appending rather than overwriting, seldom random writes - once written, only read, often only sequentially;

- Examples: mapreduce 里的reduce task的輸入等;

- appending: focus; atomicity guarantee;


Design

# Assumptions

* 物理組成:低價機器組成的大規模集群;

* 支持文件規模:modest number of large files; millions of ~100MB ~ GB files; 亦支持小文件,但不太care;

* workload 種類:

- large streaming read(~1MB, contiguous region of a file) & small random reads (~KB, random offset)

- large sequential writes (append, ~1MB), seldom modify / small arbitrary writes

* 並發支持:解決多client同時append的問題; Atomicity with minimal synchronization overhead is essential.

* High sustained bandwidth > low latency; 帶寬比延時重要;

# Interface Provided:

* FS Interface: create, delete, open, close, read, write

* More:

- snapshot: create a copy of a file / directory at low cost

- Record append: 支持並發的append,確保atomicity of each clients append

# Architecture:

* Single master:

- 元數據維護器:namespace, 訪問控制、文件-塊映射,塊地址維護;

- 系統級活動:垃圾回收、負載均衡...

- 與client聯繫,告知client去哪拿數據;

* multiple chunkservers;

- Chunks: fixed-sized;

- chunk handle: assigned by master, identify the chunk;

- Store chunk on local disk as Linux files;

- read & write specified by chunk handle & byte range;

- each chunk is replicated on multiple chunck servers;

read流程

- *應用* 將所需的文件名&byte offset轉換為文件名&chunk index,給master

- master返回chunk handle & locations of the replicas

- client, cache this information;

- client contact (likely the closest one) replica chunck server;

- chunk server reply with data;

數據布局的考量

- 64MB chunk size: 減少master與client交互次數;減少client與chunk server非data交互開銷;減少元數據規模(甚至可以讓master直接把元數據都放在內存中)

- 然而塊太大會帶來碎片化的問題——但由於前期假設,該影響問題不大

- 放新chunk時: :1.優先考慮存儲利用率低於平均水平的結點;2.限制單個結點同時創建副本的數量;3.副本盡量跨子網。

master的元數據;

1. file & chunk namespace

2. mapping from file to chunks *whats the difference between 1 & 2?*

3. locations of each chunks replicas

- 1&2會被放置於persistent storage中;——用log來記錄每一次的關鍵變化。一方面防止master宕機,一方面定義操作的先後順序;

- master僅周期性的輪詢chunk他們有的東西(3);——沒必要讓master一直維護這個信息...太難

- 無目錄樹結構:represents its namespace as a lookup table mapping full pathnames to metadata.

- 鎖機制:讀寫某目錄樹下的某個地方時:需要所有上級目錄的讀鎖和相關目錄的讀/寫鎖;File creation does not require a write lock on the parent directory because there is no 「directory」, or inode-like, data structure to be protected from modification.

- The read lock on the directory name suffices to prevent the directory from being deleted, renamed, or snapshotted.

- The write locks on file names serialize attempts to create a file with the same name twice.

緩存與預取

client會緩存元數據;master會預取臨近元數據一併發給client;

fault tolerance

  • 數據完整性:

- 校驗和檢查錯誤:每chunk劃分為64KBblock,每block一個32位校驗和;分開存儲(校驗和也是元數據...);chunk server會查;如果不匹配,報告master,master會另複製新的replica並讓錯誤副本刪除;

  • 一致性(如何並發寫):同一數據塊的所有副本之間,有一個primary的副本,擁有該數據塊的「租約」,決定寫操作的順序;

- 寫請求

1. client需要更新一個數據塊,詢問master誰擁有該數據塊的租約(誰是primary);

2. master將持有租約的primary和其它副本的位置告知client,client緩存之;

3. client向所有副本傳輸數據,這裡副本沒有先後順序,根據網路拓撲情況找出最短路徑,數據從client出發沿著路徑流向各個chunkserver,這個過程採用流水線(網路和存儲並行)。chunkserver將數據放到LRU緩存;

4. 一旦所有的副本都確定接受數據,client向primary發送寫請求,primary為這個前面接受到的數據分配序列號(primary為所有的寫操作分配連續的序列號表示先後順序),並且按照順序執行數據更新;

5. primary將寫請求發送給其它副本,每個副本都按照primary確定的順序執行更新;

6. 其它副本向primary彙報操作情況;

7. primary回復client操作情況,任何副本錯誤都導致此次請求失敗,並且此時副本處於不一致狀態(寫操作完成情況不一樣)。client會嘗試幾次3到7的步驟,實在不行就只能重頭來過了。

- 寫請求跨越chunk時可能會undefined:不同副本間定序沒問題,副本是一樣的,但是應用間寫可能交叉,導致不清楚寫的什麼玩意;

- Record append: 由於可能的副本失敗,可能會導致各副本之間的偏移位置不同;但總歸是有一個完全一致的(偏移量、數據一致的), atomically at least once;


Virtual machine fault tolerance

做什麼

primary/backup方式實現Fault Tolerance, backup完全複製primary;

如何做到

完全傳遞所有狀態?——不可取,需要太大帶寬;

因此:將servers建模為deterministic state machine,起自完全相同的initial state,經過完全相同順序的相同操作。

然而一些操作是不確定性的(如讀取時鐘)——必須同步這些不確定操作的結果;

因此,在建模為state machine的基礎上,primary和backup之間傳遞deterministic operation + non-deterministic operations result;

why VM?

物理機上做太難了,尤其是處理器頻率較高時;

但VM正好可以被看做是一個well-defined state machine whose operations are the operations of the machine being vertualized. —— VM的hypervisor可以將所有operation 捕捉下來,並且很方便地在另一個VM上replay;


VM deterministic replay

deterministic replay: [ReTrace: Collecting Execution Traces with Virtual Machine Deterministic Replay.]

VMWare此前的一篇論文,介紹了VM ReTrace的技術,該技術支持對一個VM所執行操作進行記錄,並將其在另一個VM上replay。此文的工作是基於該技術的基礎上的,但討論的重點在架構和協議;

Architecture

- 兩VM在不同的物理機上;

- 在有一個小的time lag的情況下保持sync;

- 「the two VMs are in virual lockstep」;

- 共享儲存 (non-shared virtual disks的情況也會討論);

- 外部僅可見primary VM;

- primary與backup通過logging channel聯繫;

- 存活確認:heartbeating & logging channel traffic monitoring

Deterministic Replay implementation

  • 一個VM 所可能執行的operation(需要複製的operation):

- deterministic events / operations: incoming network packets, disk reads, input from keyboard & mouse...

- non-deterministic events / operations: virtual interrupts, reading the clock cycle counter of the processor...

  • Deterministic Replay implementation 所可以做到的:

- 記錄下某VM所有的input和與VM執行相關的不確定性因素至Log中;

- 不確定因素:中斷髮生時執行到的指令位置啊什麼的;

- 根據該log,該VM的所有操作都可完美復現;

- ...是不是感覺什麼都沒說?如果想仔細了解,需要看看DRI的paper咯;

FT Protocol

雖然說整體簡單來講就是 將利用DRI生成的log通過logging channel傳遞給backup,backup直接執行就好;——但其中還是需要一個protocol的...(如何ack啊什麼的)

總之需要達到的目的:所謂*output requirement*,即backup VM 在primary fail之後take over, 該backup 會以與primary之前所給外界傳遞的output完全一致的執行方式執行下去;(儘管backup 與 primary在將來對某些事情的抉擇不同,但他們的歷史是相同的,不管是確定性事件還是非確定性事件)

實現方式:在每個output處創立一個special log entry然後確保如下*output rule* :primary在backup對與某個output相關聯的special log entry進行ACK之後,才可將該output發送給外界;

感覺上是,需要確保每一次的output都一樣,但不一定確保每個output只輸出一次...【畢竟這個好像沒法保證...所以用特殊的output log來說一下,我在這個點output了,來確保backup在take over之後,執行到output前,到output時,不會再有特殊的事件發生。——重複output相同的結果也大丈夫; *如果不用兩階段提交的事務處理方法時,primary想要發送一個output,一個backup是不能判斷primary是在output前掛的還是後掛的* ——但實際上丟包/重複包並不會有大問題——應用(計算機網路啊什麼的)本身已有處理丟包/重複包的機制;

當然,不是停等output: primary在等backup關於output log的ACK時,可以只是單純地推遲output的輸出到外界,其他的操作還是可以繼續執行的;

In case a failure occurs

* heartbeating + 雙向logging監控(log+ack,不會停,因為有regular timer interrupts);

* split brain?:利用共享存儲來解決:當一個VM要go live時,先在shared storage上進行"atomic test-and-set"動作;

Implementation details

# starting & restarting

用VMware vSphere的內置功能...VMware VMotion: allows the migration of a running VM from one server to another. (less than a second) Revised for: 1.首先不destroy原有的VM, 2.同時創建logging channel,3.

# log buffer

primary和backup都有buffer;

- primary儘快把東西出去;

- backup一旦從network中讀log到buffer中,即回復ack

- primary遇到滿buffer:停止對外響應;——會影響client;但也沒有辦法呀,只有保證backup儘可能地跟上;

- 為primary提供一套反饋機制,通過觀察lag time來調整宿主機上CPU limit以保證primary和backup基本一致速度;

# operation

* 一些控制向的operation也應當都做,比如說power off,increase CPU share等等;

* Only VMotion(VM 遷移)可以在兩者之間獨立進行;

# 關於I/Os

* disk IO:

- 並發硬碟讀寫會帶來不確定性————解決方案:找到IO race, 強行讓primary和backup以相同的順序線性執行;

- disk operation 與 memory access race. page protection... 這部分其實不太懂

# non-shared disk

primary&backup不是共享硬碟時:

- 首先disk變為VM的內部狀態,因此,disk writes不再需要special log entry;

- split brain:需要第三方決定;

# 問:shared disk時backup要親自讀數據嗎?

* 讀:減小logging channel上的負載(不自己讀就得別人傳給你)

* 讀:會降低backup的效率;backup本身要等;backup fail了還要重試;primary fail了還要把fail了之後髒的memory給backup以保證完全的一致;

* 讀:primary讀完之後迅速寫了,落後的backup讀到的是新數據...否則就要停等,還蠻麻煩;

需要自行根據具體情況發揮...


推薦閱讀:

[OSDI] Large-scale Incremental Processing Using Distributed Transactions and Notifications
可驗證隨機函數VRF之Algorand演算法
分散式資料庫中為什麼要使用 Vector Clock?
Zeppelin:一個分散式KV存儲平台之存儲節點
bifrost : Rust 下的分散式系統框架

TAG:分布式系统 |