調試 Web Hadoop Distributed File System
經常在 Machine Learning 相關的 JD 上看到過形如『有 Hadoop、Spark 等使用經驗的加分』的字眼。在如今算力和數據都急速膨脹的情況下,上集群有時候確實是繞不開的問題。
關於 Hadoop 是什麼,詳細的介紹很多,這裡從 Wikipedia 引用一小段:
Apache Hadoop 是一款支持數據密集型分散式應用程序並以 Apache 2.0 許可協議發布的開源軟體框架。它支持在商品硬體構建的大型集群上運行的應用程序。Hadoop 是根據谷歌公司發表的 MapReduce 和 Google 文件系統的論文自行實現而成。
Hadoop 中最為基礎的一個子項目是 Hadoop 分散式文件系統 (HDFS, Hadoop Distributed File System)。在使用中的一個關鍵問題是,HDFS 提供 CLI 和 Java Interface,但是我們很可能需要其他各種的語言和框架來訪問它,這就造成了困難。
為此,Hadoop 還提供了一種基於 RESTful HTTP API 的訪問方式:WebHDFS。通過 WebHDFS 的介面,我們可以方便的使用 HTTP 請求來進行 HDFS 操作,以解決跨語言的問題。像我們在 AI 開發中常用的 Python,就有很多封裝了 WebHDFS 的輪子。
按說在已經有輪子的情況下一般不需要在親手調試 HTTP API 了,然而我那好 (zuo) 學 (si) 的隊友偏偏要用 Web 來實現前端,而 WebHDFS 竟然沒有支持跨域的配置。這真的是十分坑爹的狀況了。在隊友們碼力薄弱的情況下,我實在不敢讓他們輕易更換技術棧,只好在伺服器上手寫一個簡易的代理解決跨域問題。在調試 WebHDFS 過程中也順道解決了幾個坑。
Hadoop 配置
首先,需要修改 hdfs-site.xml,啟用 WebHDFS,並確認 namenode 的地址和埠,這裡使用了默認的 50700。
<configuration>n <!-- other properties ... -->nn <property>n <name>dfs.webhdfs.enabled</name>n <value>true</value>n </property>n <property>n <name>dfs.namenode.http-address</name>n <value>ADDRESS:50700</value>n </property>nn <!-- other properties ... -->n</configuration>n
代理框架
最近在知乎上看了 @圓胖腫 幾篇 Vert.x 的介紹,十分有好感,這次寫代理拿來一用,確實十分方便。
我們首先繼承一個 AbstractVerticle,命名為 ProxyService,並在 Launcher 中 deploy,這樣架子就搭好了。本文不過多涉及 Vert.x 語法,關於框架內容,可以直接見 Vert.x 官網。
路由
WebHDFS 代理的大致思路和原則是,盡量不改變其原 API 格式的情況下,完成代理的轉發,並支持跨域。因此我們直接寫一個全匹配的路由。
@Overridenpublic void start() throws Exception {n final Router router = Router.router(vertx);n router.route().handler(this::transfer);n vertx.createHttpServer().requestHandler(router::accept).listen(10000);n}n
其中,在 transfer 中,我們去實現代理請求並返回結果。
第一步,是拼接代理需要請求的真正 URL。WebHDFS 的介面形如:
http://<HOST>:<HTTP_PORT>/webhdfs/v1/<PATH>?op=...n
我們的代理規定是,除了 Endpoint 不同,後邊的路徑,請求方法,請求 body 都保持相同。因此,只要把「webhdfs/v1/」之前的部分替換為此前在 hdfs-site.xml 中配置的地址即可。
private void transfer(RoutingContext context) {n final String uri = context.request().absoluteURI();n final String targetUrl = "http://" + ADDRESS + ":50700/" n + uri.substring(uri.indexOf("webhdfs/v1"));n final HttpServerResponse response = context.response();n final String method = context.request().method().toString();n // ...n}n
代理的 HTTP 請求我沒有用 Vert.x 來做,主要原因是不熟悉,尤其是相對於熟練多年的 Square 三件套之一 OkHttp。這裡,因為請求方法不定,我們需要手動去指定,而不同的請求方法對 RequestBody 的要求不一樣,GET 等方法要求 RequestBody 必須為 null;POST、PUT 等要求 RequestBody 必須 not null,即使是空請求。
private static final MediaType JSON = n MediaType.parse("application/json; charset=utf-8");nnprivate void transfer(RoutingContext context) {n // ...n final OkHttpClient client = new OkHttpClient.Builder().build();n final RequestBody requestBody;n if (!HttpMethod.permitsRequestBody(method)) {n requestBody = null;n } else {n requestBody = RequestBody.create(JSON, "");n }n final Request transferRequest = new Request.Builder()n .url(targetUrl)n .method(method, requestBody)n .build();n try {n final Response transferResponse =n client.newCall(transferRequest).execute();n response.headers().clear();n for (String name : transferResponse.headers().names()) {n if (!"Transfer-Encoding".equals(name)) {n final String value = transferResponse.header(name);n response.putHeader(name, value);n }n }n // CORSn response.putHeader("Access-Control-Allow-Origin", "*");n response.putHeader("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, HEAD, OPTIONS");n n final ResponseBody body = transferResponse.body();n if (body == null) {n response.end();n } else {n response.end(Buffer.buffer(body.bytes()));n }n } catch (IOException ex) {n response.setStatusCode(500);n }n}n
在得到代理請求的結果 transferResponse 後,我們則把其中內容搬運到 Vert.x 的 response 中。其中代理返回的 Transfer-Encoding 已經變化,我們直接捨棄該項 header。在搬運完所有的 header 和 body 之後,我們再額外加上 CORS 支持,這個代理就初步完成了。
Read File 和 Create File 重定向的坑
WebHDFS 的創建和讀取文件介面都涉及一個 HTTP 307 重定向。WebHDFS 特別藝 (keng) 術 (die) 的地方在於,重定向給的地址格式是:
Location: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=...n
居然是 <DATANODE> 而不是 <ADDRESS>,也就是說這個地址根本就沒法去做 HTTP 請求,導致網路請求庫會直接報錯。這種神邏輯也真是非常優秀了,好在 OkHttp 有功能強大的 Intercepter,我們添加一個把 <DATANODE> 替換成 <ADDRESS> 的 Network Intercepter 即可。
private static final String HEADER_LOCATION = "Location";nnprivate static class RedirectInterceptor implements Interceptor {n @Overriden public Response intercept(Chain chain) throws IOException {n final Request request = chain.request();n final Response response = chain.proceed(request);n final String location = response.header(HEADER_LOCATION);n final Response newResponse;n if (location != null) {n newResponse = response.newBuilder()n .header(HEADER_LOCATION, location.replaceAll("yourandell", ADDRESS))n .build();n } else {n newResponse = response;n }n return newResponse;n }n}n
其中「yourandell」是我們的 datanode 名稱,這個視實際情況而定,也有可能有多個,那樣代碼需要做相應更改。
同時在實例化 client 時加上 Network Intercepter:
final OkHttpClient client = new OkHttpClient.Builder()n .addNetworkInterceptor(new RedirectInterceptor())n .build();n
支持文件創建介面
WebHDFS 的文件創建需要進行兩步操作,對同一個介面先不含 file body 的 PUT 一次,返回 HTTP 200 後再帶上 file body 重新 PUT 一次。據 Apache 表示這是為了避免在文件創建之前就把數據發了過來 (FYI: RFC 2616, Section 8.2.3)。
為了方 (ta) 便 (men) 隊 (bu) 友 (hui),在代理中可以把這兩步一起完成,一次返回給前端。
首先我們需要增加路由設置支持文件上傳:
router.route().handler(BodyHandler.create().setUploadsDirectory("uploads"));n
然後新增一個方法來在需要第二個請求的時候去上傳文件。這裡我們的業務只處理 geotiff 文件,因此 media type 直接設置成「image/tiff」。
private static final MediaType TIFF = MediaType.parse("image/tiff");nnprivate Response uploadFileOrNot(RoutingContext context, OkHttpClient client, n Response origin) throws IOException {n final String location = origin.header(HEADER_LOCATION);n if (location != null && location.contains("op=CREATE")) {n final FileUpload fileUpload = n context.fileUploads().iterator().next();n final byte[] fileBytes = vertx.fileSystem()n .readFileBlocking(fileUpload.uploadedFileName()).getBytes();n final RequestBody requestBody = RequestBody.create(TIFF, fileBytes);n final Request request = new Request.Builder()n .url(location)n .method(context.request().method().toString(), requestBody)n .build();n return client.newCall(request).execute();n } else {n return origin;n }n}n
之後把 transferResponse 實例化的語句更改一下,
final Response transferResponse = uploadFileOrNot(n context, client, client.newCall(transferRequest).execute());n
這樣文件上傳的兩步操作整合就完成了。
結語
WebHDFS 的介面除了少許幾個坑之外,整體而言十分清爽簡便,如果沒有跨域問題的話直接用 HTTP 請求起來也是十分方便。代理的實現的並不複雜。當然在可以用現有輪子的情況下還是可以優先用輪,避免無謂的時間浪費。
推薦閱讀:
※AI筆記 | 一張圖幫你全面掌握機器學習演算法知識體系
※基於PaddlePaddle的點擊率的深度學習方法嘗試
※有基於spark的parameter server嗎?