Elasticsearch源碼之Http Rest請求實現

Elasticsearch源碼之Http Rest請求實現

本文主要講述HttpRest請求的過程。

Rest請求註冊

Rest請求註冊是在org.elasticsearch.action.ActionModule#initRestHandlers方法中實現的,該方法的調用在org.elasticsearch.node.Node實例化時調用。

  1. initRestHandlers

public void ActionModule#initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) { List<AbstractCatAction> catActions = new ArrayList<>(); Consumer<RestHandler> registerHandler = a -> { if (a instanceof AbstractCatAction) { catActions.add((AbstractCatAction) a); } }; registerHandler.accept(new RestMainAction(settings, restController)); registerHandler.accept(new RestNodesInfoAction(settings, restController, settingsFilter)); registerHandler.accept(new RestRemoteClusterInfoAction(settings, restController)); registerHandler.accept(new RestNodesStatsAction(settings, restController)); registerHandler.accept(new RestNodesUsageAction(settings, restController)); registerHandler.accept(new RestNodesHotThreadsAction(settings, restController)); //省略}

此處實例化大量Rest*Action來註冊Rest請求。以RestMainAction為例來說明。

  1. RestMainAction 初始化

public RestMainAction(Settings settings, RestController controller) {

super(settings);

controller.registerHandler(GET, "/", this);

controller.registerHandler(HEAD, "/", this);

}

通過回調RestController.registerHandler來註冊請求URL(/)。

  1. registerHandler

public void registerHandler(RestRequest.Method method, String path, RestHandler handler) { if (handler instanceof BaseRestHandler) { usageService.addRestHandler((BaseRestHandler) handler); } handlers.insertOrUpdate(path, new MethodHandlers(path, handler, method), (mHandlers, newMHandler) -> { return mHandlers.addMethods(handler, method); }); }

此處註冊path,請求方法和path請求處理類RestMainAction到handlers。

Rest伺服器註冊

es Rest請求是通過Netty伺服器來構建的,是在org.elasticsearch.http.netty4.Netty4HttpServerTransport#doStart初始化ServerBootstrap。其中的chanel註冊在HttpChannelHandler實例中。

  1. HttpChannelHandler

protected static class HttpChannelHandler extends ChannelInitializer<Channel> {private final Netty4HttpServerTransport transport; private final Netty4HttpRequestHandler requestHandler;protected HttpChannelHandler( final Netty4HttpServerTransport transport, final boolean detailedErrorsEnabled, final ThreadContext threadContext) { this.transport = transport; this.requestHandler = new Netty4HttpRequestHandler(transport, detailedErrorsEnabled, threadContext); }@Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("openChannels", transport.serverOpenChannels); final HttpRequestDecoder decoder = new HttpRequestDecoder( Math.toIntExact(transport.maxInitialLineLength.getBytes()), Math.toIntExact(transport.maxHeaderSize.getBytes()), Math.toIntExact(transport.maxChunkSize.getBytes())); decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR); ch.pipeline().addLast("decoder", decoder); ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor()); ch.pipeline().addLast("encoder", new HttpResponseEncoder()); final HttpObjectAggregator aggregator = new HttpObjectAggregator(Math.toIntExact(transport.maxContentLength.getBytes())); if (transport.maxCompositeBufferComponents != -1) { aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); } ch.pipeline().addLast("aggregator", aggregator); if (transport.compression) { ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(transport.compressionLevel)); } if (SETTING_CORS_ENABLED.get(transport.settings())) { ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.getCorsConfig())); } if (transport.pipelining) { ch.pipeline().addLast("pipelining", new HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents)); } ch.pipeline().addLast("handler", requestHandler); }@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Netty4Utils.maybeDie(cause); super.exceptionCaught(ctx, cause); }}核心在initChannel中註冊Netty4HttpRequestHandler這樣每個請求的處理在SimpleChannelInboundHandler#channelRead0

Http請求處理

當Http請求經過Netty channelRead0方法到達Netty4HttpRequestHandler時,又被轉發給Netty4HttpServerTransport#dispatchRequest,再被轉發給org.elasticsearch.rest.RestController#dispatchRequest。

  1. dispatchRequest

public void RestController#dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { try { tryAllHandlers(request, channel, threadContext); } catch (Exception e) { try { channel.sendResponse(new BytesRestResponse(channel, e)); } catch (Exception inner) { inner.addSuppressed(e); logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner); } } }

此處調用tryAllHandlers,獲取上面註冊的Rest請求的handleRequest方法。還是以RestMainAction為例。

2、 RestMainAction

public final void BaseRestHandler#handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception { // prepare the request for execution; has the side effect of touching the request parameters final RestChannelConsumer action = prepareRequest(request, client);// validate unconsumed params, but we must exclude params used to format the response // use a sorted set so the unconsumed parameters appear in a reliable sorted order final SortedSet<String> unconsumedParams = request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));// validate the non-response params if (!unconsumedParams.isEmpty()) { final Set<String> candidateParams = new HashSet<>(); candidateParams.addAll(request.consumedParams()); candidateParams.addAll(responseParams()); throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter")); }usageCount.increment(); // execute the action action.accept(channel); }核心實現是調用子類具體實現的請求包裝prepareRequest),最終回調請求action.accept(channel);)。

3、prepareRequest

public RestChannelConsumer RestMainAction#prepareRequest(final RestRequest request, final NodeClient client) throws IOException { return channel -> client.execute(MainAction.INSTANCE, new MainRequest(), new RestBuilderListener<MainResponse>(channel) { @Override public RestResponse buildResponse(MainResponse mainResponse, XContentBuilder builder) throws Exception { return convertMainResponse(mainResponse, request, builder); } }); }

此處直接返回前面的action,回調時調用client.execute方法(此處變進入transport處理,不再贅述),當請求處理完成成功時回調RestBuilderListener的onResponse或onFailure。

推薦閱讀:

守護進程
es6常用語法筆記
es 6 export import
你應該知道的HTTP基礎知識
第一章:計算機和網際網路 |《計算機網路:自頂向下方法》

TAG:計算機科學 | Elasticsearch |