您的位置:  首页 > 技术杂谈 > 正文

EventMesh源码手撕系列(二)之HTTP处理器

2021-11-16 16:00 https://my.oschina.net/webank/blog/5310226 微众开源 次阅读 条评论

源码解析

 

上一篇Http Server实现中讲到,当channel被注册之后,这个类中的initChannel方法就会被调用,同时在管道后面加入handlers。那具体的,我们的Http Server是怎样处理消息的?

【源代码片段】

     class HttpsServerInitializer extends ChannelInitializer<SocketChannel> {      ...            // 在管道后面加入handlers            pipeline.addLast(new HttpRequestDecoder(),// 这个是对http解码                    new HttpResponseEncoder(),// 这个是对http的响应编码                    new HttpConnectionHandler(),// 这个是对http连接处理                    new HttpObjectAggregator(Integer.MAX_VALUE),// 这个是http对象聚合                    new HTTPHandler());// 这个是http的Handler的具体实现        }    }}

    处理逻辑

    【源代码片段】

      class HTTPHandler extends SimpleChannelInboundHandler<HttpRequest> {         @Override        protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) throws Exception {            HttpPostRequestDecoder decoder = null;            // todo start server span, we should get channel here to put span in channel's context in async call.             try {                if (!httpRequest.decoderResult().isSuccess()) {                    sendError(ctx, HttpResponseStatus.BAD_REQUEST);                    return;                }         // 请求命令                final HttpCommand requestCommand = new HttpCommand();                // todo record command opaque in span.                        httpRequest.headers().set(ProtocolKey.ClientInstanceKey.IP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));                ...                Map<String, Object> bodyMap = new HashMap<>();         // GET请求                if (httpRequest.method() == HttpMethod.GET) {                    QueryStringDecoder getDecoder = new QueryStringDecoder(httpRequest.uri());                    getDecoder.parameters().entrySet().forEach(entry -> {                        bodyMap.put(entry.getKey(), entry.getValue().get(0));                    });                    // POST请求                } else if (httpRequest.method() == HttpMethod.POST) {                    decoder = new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest);                    // 获取参数列表                    List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas();                    for (InterfaceHttpData parm : parmList) {                        if (parm.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {                            Attribute data = (Attribute) parm;                            bodyMap.put(data.getName(), data.getValue());                        }                    }                } else {                    sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);                    return;                }                 ...                String requestCode =                        (httpRequest.method() == HttpMethod.POST) ? StringUtils.deleteWhitespace(httpRequest.headers().get(ProtocolKey.REQUEST_CODE))                                : MapUtils.getString(bodyMap, StringUtils.lowerCase(ProtocolKey.REQUEST_CODE), "");                 requestCommand.setHttpMethod(httpRequest.method().name());                requestCommand.setHttpVersion(httpRequest.protocolVersion().protocolName());                requestCommand.setRequestCode(requestCode);                // todo record command method, version and requestCode in span.         // 响应命令                HttpCommand responseCommand = null;               ...                 try {                    // requestCommand头部和主体                    requestCommand.setHeader(Header.buildHeader(requestCode, parseHTTPHeader(httpRequest)));                    requestCommand.setBody(Body.buildBody(requestCode, bodyMap));                } catch (Exception e) {                    responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 3));                    sendResponse(ctx, responseCommand.httpResponse());                    return;                }               ...                // 异步消息上下文                AsyncContext<HttpCommand> asyncContext = new AsyncContext<HttpCommand>(requestCommand, responseCommand, asyncContextCompleteHandler);                // 处理网络请求                processEventMeshRequest(ctx, asyncContext);            } catch (Exception ex) {                httpServerLogger.error("AbrstractHTTPServer.HTTPHandler.channelRead0 err", ex);                // todo span end with exception.            } finally {                try {                    decoder.destroy();                } catch (Exception e) {                }            }        }     // 处理网络请求        public void processEventMeshRequest(final ChannelHandlerContext ctx,                                            final AsyncContext<HttpCommand> asyncContext) {            final Pair<HttpRequestProcessor, ThreadPoolExecutor> choosed = processorTable.get(Integer.valueOf(asyncContext.getRequest().getRequestCode()));            try {                // 为任务分配一个线程                choosed.getObject2().submit(() -> {                    try {                        // 拒绝请求                        if (choosed.getObject1().rejectRequest()) {                            HttpCommand responseCommand = asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getRetCode(), EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getErrMsg());                            asyncContext.onComplete(responseCommand);                            if (asyncContext.isComplete()) {                                if (httpLogger.isDebugEnabled()) {                                    httpLogger.debug("{}", asyncContext.getResponse());                                }                                sendResponse(ctx, responseCommand.httpResponse());                            }                            return;                        }             // 处理请求,对接到http的协议部分的处理器                        choosed.getObject1().processRequest(ctx, asyncContext);                        if (asyncContext == null || !asyncContext.isComplete()) {                            return;                        }                        ...              // 发送响应                        sendResponse(ctx, asyncContext.getResponse().httpResponse());                    } catch (Exception e) {                        httpServerLogger.error("process error", e);                    }                });            } catch (RejectedExecutionException re) {                ...                try {                    sendResponse(ctx, asyncContext.getResponse().httpResponse());                } catch (Exception e) {                }            }        }...    }

      处理器

      每一个处理器都会继承它的父类HttpRequestProcessor,并且重写处理请求和拒绝请求两个方法。具体业务中,会根据消息的请求码分配处理器来对消息进行处理。

      处理器逻辑,这里以SendAsyncMessageProcessor消息处理为例进行讲解,其他处理器参照即可。

        public class SendAsyncMessageProcessor implements HttpRequestProcessor {  @Override public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext) throws Exception { ... // 获取发送消息的请求 SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) asyncContext.getRequest().getHeader(); SendMessageRequestBody sendMessageRequestBody = (SendMessageRequestBody) asyncContext.getRequest().getBody(); // 获取发送消息的响应 SendMessageResponseHeader sendMessageResponseHeader = SendMessageResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster, IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);  //证实请求头部 if (StringUtils.isBlank(sendMessageRequestHeader.getIdc()) || StringUtils.isBlank(sendMessageRequestHeader.getPid()) || !StringUtils.isNumeric(sendMessageRequestHeader.getPid()) || StringUtils.isBlank(sendMessageRequestHeader.getSys())) { responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( sendMessageResponseHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg())); asyncContext.onComplete(responseEventMeshCommand); return; }  //证实请求主体 if (StringUtils.isBlank(sendMessageRequestBody.getBizSeqNo()) || StringUtils.isBlank(sendMessageRequestBody.getUniqueId()) || StringUtils.isBlank(sendMessageRequestBody.getProducerGroup()) || StringUtils.isBlank(sendMessageRequestBody.getTopic()) || StringUtils.isBlank(sendMessageRequestBody.getContent()) || (StringUtils.isBlank(sendMessageRequestBody.getTtl()))) { //sync message TTL can't be empty responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( sendMessageResponseHeader, SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg())); asyncContext.onComplete(responseEventMeshCommand); return; }  //检查acl if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) { String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); String user = sendMessageRequestHeader.getUsername(); String pass = sendMessageRequestHeader.getPasswd(); String subsystem = sendMessageRequestHeader.getSys(); int requestCode = Integer.valueOf(sendMessageRequestHeader.getCode()); String topic = sendMessageRequestBody.getTopic(); try { Acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode); }catch (Exception e){ //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);  ... } } // 分配生产者组 String producerGroup = sendMessageRequestBody.getProducerGroup(); EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);  ...  String ttl = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS);  // 消息 Message omsMsg = new Message(); try { // body omsMsg.setBody(sendMessageRequestBody.getContent().getBytes(EventMeshConstants.DEFAULT_CHARSET)); // topic omsMsg.setTopic(sendMessageRequestBody.getTopic()); omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION, sendMessageRequestBody.getTopic());  ... } // ttl omsMsg.putUserProperties(Constants.PROPERTY_MESSAGE_TIMEOUT, ttl); // bizNo omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_SEARCH_KEYS, sendMessageRequestBody.getBizSeqNo()); omsMsg.putUserProperties("msgType", "persistent"); omsMsg.putUserProperties(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())); omsMsg.putUserProperties(Constants.RMB_UNIQ_ID, sendMessageRequestBody.getUniqueId()); omsMsg.putUserProperties(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()));  // new rocketmq client can't support put DeFiBusConstant.PROPERTY_MESSAGE_TTL// rocketMQMsg.putUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_TTL, ttl);   } catch (Exception e) { ... asyncContext.onComplete(responseEventMeshCommand); return; } // 发送消息的上下文 final SendMessageContext sendMessageContext = new SendMessageContext(sendMessageRequestBody.getBizSeqNo(), omsMsg, eventMeshProducer, eventMeshHTTPServer); eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsg();  final CompleteHandler<HttpCommand> handler = new CompleteHandler<HttpCommand>() { @Override // 响应 public void onResponse(HttpCommand httpCommand) { try { if (httpLogger.isDebugEnabled()) { httpLogger.debug("{}", httpCommand); } eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse()); eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - asyncContext.getRequest().getReqTime()); } catch (Exception ex) { } } };   try { sendMessageContext.getMsg().getUserProperties().put(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis())); // 生产组发送精简消息 eventMeshProducer.send(sendMessageContext, new SendCallback() { // 成功 @Override public void onSuccess(SendResult sendResult) { ... } // 错误 @Override public void onException(OnExceptionContext context) { ... } }); } catch (Exception ex) { ... }  return; }  @Override public boolean rejectRequest() { return false; } }

        最后,我推荐大家去看看官方的协议文档中关于协议这一部分的内容,可以方便我们理解:官方http协议文档

        :本文内容由社区小伙伴陈创慧提供, 

        • 0
          感动
        • 0
          路过
        • 0
          高兴
        • 0
          难过
        • 0
          搞笑
        • 0
          无聊
        • 0
          愤怒
        • 0
          同情
        热度排行
        友情链接