之前学习的Reactor模型只是懂得了服务器端如何把一个连接放到workergroup中处理,那么真正处理某一个具体的请求的过程是什么样的呢
生命周期函数
,Netty将这些生命周期函数封装在Handler
中,通过责任链的模式封装在了一个 pipeline
中。当 channel中触发了对应的IO事件,就会调用pipeline中的头或者尾
的一个handler,至于是否传递到下一级,由对应的handler判断.ChannelInBoundHandler和ChannelOutBoundHandler
,他们各自有着不同的生命周期函数inbound事件是从head节点往后传播,outbound事件是从tail节点往前传播
HandlerContext
. context允许handler在生命周期方法中直接改变数据的流向
,比如读完数据就可以让 context在把数据写出去,那么数据的流向就从 inbound变成了outboundpublic interface ChannelInboundHandler extends ChannelHandler {
# channel 注册到eventLoop
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
# channel从eventloop中取消注册
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
# channel已经建立了连接
void channelActive(ChannelHandlerContext ctx) throws Exception;
# channel结束了连接
void channelInactive(ChannelHandlerContext ctx) throws Exception;
# channel从网络中读到了数据
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
# channel读取完数据
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
# 读取数据中发生了异常调用该方法
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
复制代码
我们来看看-生命周期的调用源码。
private void register0(ChannelPromise promise) {
try {
# 这里就是让 channel注册到eventloop并且注册到selector中的方法
doRegister();
# 注册结束之后就会调用 pipeline的fireChannelRegistered方法
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
# 如果是激活状态还会调用 fireChannelActive 方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
}
}
} catch (Throwable t) {
}
}
复制代码
那么我们来看看pipeline的源码
@Override
public final ChannelPipeline fireChannelRegistered() {
# 传了一个pipeline 的头结点进去,说明是从头开始传播的
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
复制代码
pipeline调用了 context的静态方法,下面的都是context内部的调用关系
# AbstractChannelHandlerContext 的 invokeChannelRegistered 方法
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
# 又调用头结点的 invokeChannelRegistered 方法
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
# 拿到head节点的 handler调用handler的钩子函数
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
复制代码
最后调用到了就是我们自定义的handler的钩子函数,特别注意如果我们在这个函数里面没有调用super.channelRegistered(ctx);那么转播就会终止
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered----Inbound1");
super.channelRegistered(ctx);
}
复制代码
如果我们继续转播的话, context就会找到下一个inbound的context然后再执行方法。
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
复制代码
public interface ChannelOutboundHandler extends ChannelHandler {
# 当绑定端口成功后触发
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
# 客户端连接到服务器触发
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
# 断开连接触发
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void read(ChannelHandlerContext ctx) throws Exception;
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
void flush(ChannelHandlerContext ctx) throws Exception;
}
复制代码
outbound和inbound就是触发的时机不同,基本上用不太到,以后有用到再来补。
在 BootstrapServer中,我们往往会创建一个 ChannelInitializer的 ChildHandler。然后在 initChannel方法中对新来的 channel的pipeline中添加handler。,我们来看看这个流程
public static void main(String[] args) throws Exception {
new ServerBootstrap()
.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
# 保存childHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new Inbound1());
ch.pipeline().addLast(new OutBound1());
ch.pipeline().addLast(new Inbound2());
ch.pipeline().addLast(new OutBound2());
ch.pipeline().addLast(new Inbound3());
ch.pipeline().addLast(new OutBound3());
}
}).bind(9090).sync().channel().closeFuture().sync();
}
复制代码
public void channelRead(ChannelHandlerContext ctx, Object msg) {
# 这就是客户端channel
final Channel child = (Channel) msg;
# 给客户端Channel添加了刚才写的ChannelInitializer
child.pipeline().addLast(childHandler);
}
复制代码
private void register0(ChannelPromise promise) {
try {
# 这里就是让 channel注册到eventloop并且注册到selector中的方法
doRegister();
# 调用pipeline的添加handler的方法
pipeline.invokeHandlerAddedIfNeeded();
# 注册结束之后就会调用 pipeline的fireChannelRegistered方法
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
# 如果是激活状态还会调用 fireChannelActive 方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
}
}
} catch (Throwable t) {
}
}
复制代码
callHandlerAdded0
方法,里面拿到handler执行handlerAdded方法5. pipeline中的方法
这几个流程就大体的梳理完了,总结一下
如果你觉得此文对你有一丁点帮助,点个赞。或者可以加入我的开发交流群:1025263163相互学习,我们会有专业的技术答疑解惑
如果你觉得这篇文章对你有点用的话,麻烦请给我们的开源项目点点star:http://github.crmeb.net/u/defu不胜感激 !
PHP学习手册:https://doc.crmeb.com
技术交流论坛:https://q.crmeb.com
|