博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty中的ChannelPipeline源码分析
阅读量:5143 次
发布时间:2019-06-13

本文共 29236 字,大约阅读时间需要 97 分钟。

ChannelPipeline在Netty中是用来处理请求的责任链,默认实现是DefaultChannelPipeline,其构造方法如下:

1 private final Channel channel; 2 private final ChannelFuture succeededFuture; 3 private final VoidChannelPromise voidPromise; 4 final AbstractChannelHandlerContext head; 5 final AbstractChannelHandlerContext tail; 6  7 protected DefaultChannelPipeline(Channel channel) { 8     this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel"); 9     this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);10     this.voidPromise = new VoidChannelPromise(channel, true);11     this.tail = new DefaultChannelPipeline.TailContext(this);12     this.head = new DefaultChannelPipeline.HeadContext(this);13     this.head.next = this.tail;14     this.tail.prev = this.head;15 }

ChannelPipeline和Channel是一一对应关系,一个Channel绑定一条ChannelPipeline责任链

succeededFuture 和voidPromise用来处理异步操作
AbstractChannelHandlerContext 是持有请求的上下文对象,其和ChannelHandler是对应关系(在使用Sharable注解的情况下,不同的AbstractChannelHandlerContext 还可以对应同一个ChannelHandler),ChannelPipeline责任链
处理的就AbstractChannelHandlerContext ,再将最后的AbstractChannelHandlerContext 交给ChannelHandler去做正真的逻辑处理

AbstractChannelHandlerContext构造方法如下:

1 private final String name; 2 private final DefaultChannelPipeline pipeline; 3 final EventExecutor executor; 4 private final boolean inbound; 5 private final boolean outbound; 6 private final boolean ordered; 7 volatile AbstractChannelHandlerContext next; 8 volatile AbstractChannelHandlerContext prev; 9 10 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) {11     this.name = (String)ObjectUtil.checkNotNull(name, "name");12     this.pipeline = pipeline;13     this.executor = executor;14     this.inbound = inbound;15     this.outbound = outbound;16     this.ordered = executor == null || executor instanceof OrderedEventExecutor;17 }

name是AbstractChannelHandlerContext的名称,pipeline就是上面说的ChannelPipeline;executor是用来进行异步操作的,默认使用的是在前面博客中说过的NioEventLoop  ()

inbound 和outbound 代表两种请求处理方式,对应Netty中的I/O操作,若是inbound则处理Input操作,由ChannelPipeline从head 开始向后遍历链表,并且只处理ChannelInboundHandler类型的AbstractChannelHandlerContext;若是outbound 则处理Output操作,由ChannelPipeline从tail开始向前遍历链表,并且只处理ChannelOutboundHandler类型的AbstractChannelHandlerContext;

ordered 是判断是否需要提供executor。

由next和prev成员可以知道,ChannelPipeline维护的是一条AbstractChannelHandlerContext的双向链表

其头节点head和尾节点tail分别默认初始化了HeadContext和TailContext

HeadContext的构造:

1 final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {2     private final Unsafe unsafe;3     4     HeadContext(DefaultChannelPipeline pipeline) {5     super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, false, true);6     this.unsafe = pipeline.channel().unsafe();7     this.setAddComplete();8     }9 }

其中setAddComplete是由AbstractChannelHandlerContext实现的:

1 final void setAddComplete() {2     int oldState;3     do {4         oldState = this.handlerState;5     } while(oldState != 3 && !HANDLER_STATE_UPDATER.compareAndSet(this, oldState, 2));6 7 }

handlerState表示AbstractChannelHandlerContext对应的ChannelHandler的状态,有一下几种:

1 private static final int ADD_PENDING = 1;2 private static final int ADD_COMPLETE = 2;3 private static final int REMOVE_COMPLETE = 3;4 private static final int INIT = 0;5 private volatile int handlerState = 0;

handlerState初始化默认是INIT状态。

HANDLER_STATE_UPDATER是一个原子更新器:

1 private static final AtomicIntegerFieldUpdater
HANDLER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");

所以setAddComplete方法,就是通过CAS操作,将handlerState状态更新为ADD_COMPLETE

TailContext的构造:

1 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {2     TailContext(DefaultChannelPipeline pipeline) {3         super(pipeline, (EventExecutor)null, DefaultChannelPipeline.TAIL_NAME, true, false);4         this.setAddComplete();5     }6 }

和HeadContext一样,将handlerState状态更新为ADD_COMPLETE

结合官方给出的ChannelPipeline的图示更容易理解:

1                                              I/O Request 2                                         via Channel or 3                                     ChannelHandlerContext 4                                                   | 5 +---------------------------------------------------+---------------+ 6 |                           ChannelPipeline         |               | 7 |                                                  \|/              | 8 |    +---------------------+            +-----------+----------+    | 9 |    | Inbound Handler  N  |            | Outbound Handler  1  |    |10 |    +----------+----------+            +-----------+----------+    |11 |              /|\                                  |               |12 |               |                                  \|/              |13 |    +----------+----------+            +-----------+----------+    |14 |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |15 |    +----------+----------+            +-----------+----------+    |16 |              /|\                                  .               |17 |               .                                   .               |18 | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|19 |        [ method call]                       [method call]         |20 |               .                                   .               |21 |               .                                  \|/              |22 |    +----------+----------+            +-----------+----------+    |23 |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |24 |    +----------+----------+            +-----------+----------+    |25 |              /|\                                  |               |26 |               |                                  \|/              |27 |    +----------+----------+            +-----------+----------+    |28 |    | Inbound Handler  1  |            | Outbound Handler  M  |    |29 |    +----------+----------+            +-----------+----------+    |30 |              /|\                                  |               |31 +---------------+-----------------------------------+---------------+32               |                                  \|/33 +---------------+-----------------------------------+---------------+34 |               |                                   |               |35 |       [ Socket.read() ]                    [ Socket.write() ]     |36 |                                                                   |37 |  Netty Internal I/O Threads (Transport Implementation)            |38 +-------------------------------------------------------------------+

 

下面对一些主要方法分析:

addFirst方法,有如下几种重载:

1 public final ChannelPipeline addFirst(ChannelHandler handler) { 2     return this.addFirst((String)null, (ChannelHandler)handler); 3 } 4  5 public final ChannelPipeline addFirst(String name, ChannelHandler handler) { 6     return this.addFirst((EventExecutorGroup)null, name, handler); 7 } 8  9 public final ChannelPipeline addFirst(ChannelHandler... handlers) {10     return this.addFirst((EventExecutorGroup)null, (ChannelHandler[])handlers);11 }12 13 public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {14     if (handlers == null) {15         throw new NullPointerException("handlers");16     } else if (handlers.length != 0 && handlers[0] != null) {17         int size;18         for(size = 1; size < handlers.length && handlers[size] != null; ++size) {19             ;20         }21 22         for(int i = size - 1; i >= 0; --i) {23             ChannelHandler h = handlers[i];24             this.addFirst(executor, (String)null, h);25         }26 27         return this;28     } else {29         return this;30     }31 }32 33 public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {34     final AbstractChannelHandlerContext newCtx;35     synchronized(this) {36         checkMultiplicity(handler);37         name = this.filterName(name, handler);38         newCtx = this.newContext(group, name, handler);39         this.addFirst0(newCtx);40         if (!this.registered) {41             newCtx.setAddPending();42             this.callHandlerCallbackLater(newCtx, true);43             return this;44         }45 46         EventExecutor executor = newCtx.executor();47         if (!executor.inEventLoop()) {48             newCtx.setAddPending();49             executor.execute(new Runnable() {50                 public void run() {51                     DefaultChannelPipeline.this.callHandlerAdded0(newCtx);52                 }53             });54             return this;55         }56     }57 58     this.callHandlerAdded0(newCtx);59     return this;60 }

前面几种都是间接调用的第四种没什么好说的,直接看第四种addFirst

首先调用checkMultiplicity,检查ChannelHandlerAdapter在不共享的情况下是否重复:

1 private static void checkMultiplicity(ChannelHandler handler) { 2     if (handler instanceof ChannelHandlerAdapter) { 3         ChannelHandlerAdapter h = (ChannelHandlerAdapter)handler; 4         if (!h.isSharable() && h.added) { 5             throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times."); 6         } 7  8         h.added = true; 9     }10 11 }

isSharable方法:

1 public boolean isSharable() { 2     Class
clazz = this.getClass(); 3 Map
, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache(); 4 Boolean sharable = (Boolean)cache.get(clazz); 5 if (sharable == null) { 6 sharable = clazz.isAnnotationPresent(Sharable.class); 7 cache.put(clazz, sharable); 8 } 9 10 return sharable;11 }

首先尝试从当前线程的InternalThreadLocalMap中获取handlerSharableCache,(InternalThreadLocalMap是在Netty中使用高效的FastThreadLocal替代JDK的ThreadLocal使用的 )

InternalThreadLocalMap的handlerSharableCache方法:

1 public Map
, Boolean> handlerSharableCache() {2 Map
, Boolean> cache = this.handlerSharableCache;3 if (cache == null) {4 this.handlerSharableCache = (Map)(cache = new WeakHashMap(4));5 }6 7 return (Map)cache;8 }

当当前线程的InternalThreadLocalMap中没有handlerSharableCache时,直接创建一个大小为4的WeakHashMap弱引用Map;

根据clazz从map中get,若是没有,需要检测当前clazz是否有Sharable注解,添加了Sharable注解的ChannelHandlerAdapter可以在不同Channel中共享使用一个单例,前提是确保线程安全;

之后会将该clazz以及是否实现Sharable注解的情况添加在cache缓存中;
其中ChannelHandler的added是用来标识是否添加过;

回到addFirst方法:

checkMultiplicity成功结束后,调用filterName方法,给当前要产生的AbstractChannelHandlerContext对象产生一个名称,
然后调用newContext方法,产生AbstractChannelHandlerContext对象:

1 private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {2     return new DefaultChannelHandlerContext(this, this.childExecutor(group), name, handler);3 }

这里实际上产生了一个DefaultChannelHandlerContext对象:

1 final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { 2     private final ChannelHandler handler; 3  4     DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { 5         super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); 6         if (handler == null) { 7             throw new NullPointerException("handler"); 8         } else { 9             this.handler = handler;10         }11     }12 13     public ChannelHandler handler() {14         return this.handler;15     }16 17     private static boolean isInbound(ChannelHandler handler) {18         return handler instanceof ChannelInboundHandler;19     }20 21     private static boolean isOutbound(ChannelHandler handler) {22         return handler instanceof ChannelOutboundHandler;23     }24 }

可以看到DefaultChannelHandlerContext 仅仅是将AbstractChannelHandlerContext和ChannelHandler封装了

在产生了DefaultChannelHandlerContext 对象后,调用addFirst0方法:

1 private void addFirst0(AbstractChannelHandlerContext newCtx) {2     AbstractChannelHandlerContext nextCtx = this.head.next;3     newCtx.prev = this.head;4     newCtx.next = nextCtx;5     this.head.next = newCtx;6     nextCtx.prev = newCtx;7 }

这里就是一个简单的双向链表的操作,将newCtx节点插入到了head后面

然后判断registered成员的状态:

1 private boolean registered;

在初始化时是false

registered若是false,首先调用AbstractChannelHandlerContext的setAddPending方法:

1 final void setAddPending() {2    boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, 0, 1);3 4     assert updated;5 6 }

和前面说过的setAddComplete方法同理,通过CAS操作,将handlerState状态设置为ADD_PENDING

接着调用callHandlerCallbackLater方法:

1 private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { 2     assert !this.registered; 3  4     DefaultChannelPipeline.PendingHandlerCallback task = added ? new DefaultChannelPipeline.PendingHandlerAddedTask(ctx) : new DefaultChannelPipeline.PendingHandlerRemovedTask(ctx); 5     DefaultChannelPipeline.PendingHandlerCallback pending = this.pendingHandlerCallbackHead; 6     if (pending == null) { 7         this.pendingHandlerCallbackHead = (DefaultChannelPipeline.PendingHandlerCallback)task; 8     } else { 9         while(pending.next != null) {10             pending = pending.next;11         }12 13         pending.next = (DefaultChannelPipeline.PendingHandlerCallback)task;14     }15 16 }

首先断言判断registered可能存在的多线程改变,然后根据added判断产生何种类型的PendingHandlerCallback

PendingHandlerCallback是用来处理ChannelHandler的两种回调,定义如下:

1 private abstract static class PendingHandlerCallback implements Runnable { 2     final AbstractChannelHandlerContext ctx; 3     DefaultChannelPipeline.PendingHandlerCallback next; 4  5     PendingHandlerCallback(AbstractChannelHandlerContext ctx) { 6         this.ctx = ctx; 7     } 8  9     abstract void execute();10 }

PendingHandlerAddedTask定义如下:

1 private final class PendingHandlerAddedTask extends DefaultChannelPipeline.PendingHandlerCallback { 2     PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) { 3         super(ctx); 4     } 5  6     public void run() { 7         DefaultChannelPipeline.this.callHandlerAdded0(this.ctx); 8     } 9 10     void execute() {11         EventExecutor executor = this.ctx.executor();12         if (executor.inEventLoop()) {13             DefaultChannelPipeline.this.callHandlerAdded0(this.ctx);14         } else {15             try {16                 executor.execute(this);17             } catch (RejectedExecutionException var3) {18                 if (DefaultChannelPipeline.logger.isWarnEnabled()) {19                     DefaultChannelPipeline.logger.warn("Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.", new Object[]{executor, this.ctx.name(), var3});20                 }21 22                 DefaultChannelPipeline.remove0(this.ctx);23                 this.ctx.setRemoved();24             }25         }26 27     }28 }

除去异常处理,无论是在execute方法还是在run方法中,主要核心是异步执行callHandlerAdded0方法:

1 private void callHandlerAdded0(AbstractChannelHandlerContext ctx) { 2     try { 3         ctx.setAddComplete(); 4         ctx.handler().handlerAdded(ctx); 5     } catch (Throwable var10) { 6         boolean removed = false; 7  8         try { 9             remove0(ctx);10 11             try {12                 ctx.handler().handlerRemoved(ctx);13             } finally {14                 ctx.setRemoved();15             }16 17             removed = true;18         } catch (Throwable var9) {19             if (logger.isWarnEnabled()) {20                 logger.warn("Failed to remove a handler: " + ctx.name(), var9);21             }22         }23 24         if (removed) {25             this.fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; removed.", var10));26         } else {27             this.fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; also failed to remove.", var10));28         }29     }30 31 }

除去异常处理,主要核心就两行代码,首先通过setAddComplete方法,设置handlerState状态为ADD_COMPLETE,然后回调ChannelHandler的handlerAdded方法,这个handlerAdded方法就很熟悉了,在使用Netty处理业务逻辑时,会覆盖这个方法。

PendingHandlerRemovedTask定义如下:

1 private final class PendingHandlerRemovedTask extends DefaultChannelPipeline.PendingHandlerCallback { 2     PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) { 3         super(ctx); 4     } 5  6     public void run() { 7         DefaultChannelPipeline.this.callHandlerRemoved0(this.ctx); 8     } 9 10     void execute() {11         EventExecutor executor = this.ctx.executor();12         if (executor.inEventLoop()) {13             DefaultChannelPipeline.this.callHandlerRemoved0(this.ctx);14         } else {15             try {16                 executor.execute(this);17             } catch (RejectedExecutionException var3) {18                 if (DefaultChannelPipeline.logger.isWarnEnabled()) {19                     DefaultChannelPipeline.logger.warn("Can't invoke handlerRemoved() as the EventExecutor {} rejected it, removing handler {}.", new Object[]{executor, this.ctx.name(), var3});20                 }21 22                 this.ctx.setRemoved();23             }24         }25 26     }27 }

和PendingHandlerAddedTask一样,主要还是异步调用callHandlerRemoved0方法:

1 private void callHandlerRemoved0(AbstractChannelHandlerContext ctx) { 2     try { 3         try { 4             ctx.handler().handlerRemoved(ctx); 5         } finally { 6             ctx.setRemoved(); 7         } 8     } catch (Throwable var6) { 9         this.fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", var6));10     }11 12 }

首先直接回调ChannelHandler的handlerRemoved方法,然后通过setRemoved方法将handlerState状态设置为REMOVE_COMPLETE

回到callHandlerCallbackLater,其中成员pendingHandlerCallbackHead定义:

1 private DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead;

结合PendingHandlerCallback 可知,这个pendingHandlerCallbackHead是 DefaultChannelPipeline存储的一条PendingHandlerCallback单链表,用来处理ChannelHandler的handlerAdded和handlerRemoved的回调,在add的这些方法里调用callHandlerCallbackLater时,added参数都为true,所以add的ChannelHandler只向pendingHandlerCallbackHead添加了handlerAdded的回调。

回到addFirst方法,若是registered为true,先获取EventExecutor,判断是否处于轮询中,若不是,则需要开启轮询线程直接异步执行callHandlerAdded0方法,若处于轮询,由于ChannelPipeline的调用是发生在轮询时的,所以还是直接异步执行callHandlerAdded0方法。

addFirst方法到此结束,再来看addLast方法,同样有好几种重载:

1 public final ChannelPipeline addLast(ChannelHandler handler) { 2     return this.addLast((String)null, (ChannelHandler)handler); 3 } 4  5 public final ChannelPipeline addLast(String name, ChannelHandler handler) { 6     return this.addLast((EventExecutorGroup)null, name, handler); 7 } 8  9 public final ChannelPipeline addLast(ChannelHandler... handlers) {10     return this.addLast((EventExecutorGroup)null, (ChannelHandler[])handlers);11 }12 13 public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {14     if (handlers == null) {15         throw new NullPointerException("handlers");16     } else {17         ChannelHandler[] var3 = handlers;18         int var4 = handlers.length;19 20         for(int var5 = 0; var5 < var4; ++var5) {21             ChannelHandler h = var3[var5];22             if (h == null) {23                 break;24             }25 26             this.addLast(executor, (String)null, h);27         }28 29         return this;30     }31 }32 33 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {34     final AbstractChannelHandlerContext newCtx;35     synchronized(this) {36         checkMultiplicity(handler);37         newCtx = this.newContext(group, this.filterName(name, handler), handler);38         this.addLast0(newCtx);39         if (!this.registered) {40             newCtx.setAddPending();41             this.callHandlerCallbackLater(newCtx, true);42             return this;43         }44 45         EventExecutor executor = newCtx.executor();46         if (!executor.inEventLoop()) {47             newCtx.setAddPending();48             executor.execute(new Runnable() {49                 public void run() {50                     DefaultChannelPipeline.this.callHandlerAdded0(newCtx);51                 }52             });53             return this;54         }55     }56 57     this.callHandlerAdded0(newCtx);58     return this;59 }

还是间接调用最后一种:

对比addFirst来看,只有addLast0不一样:

1 private void addLast0(AbstractChannelHandlerContext newCtx) {2     AbstractChannelHandlerContext prev = this.tail.prev;3     newCtx.prev = prev;4     newCtx.next = this.tail;5     prev.next = newCtx;6     this.tail.prev = newCtx;7 }

还是非常简单的双向链表基本操作,只不过这次,是将AbstractChannelHandlerContext插入到了tail之前

还有两个,addBefore和addAfter方法,和上述方法类似,就不再累赘

接下来看看ChannelPipeline是如何完成请求的传递的:
invokeHandlerAddedIfNeeded方法:

1 final void invokeHandlerAddedIfNeeded() {2     assert this.channel.eventLoop().inEventLoop();3 4     if (this.firstRegistration) {5         this.firstRegistration = false;6         this.callHandlerAddedForAllHandlers();7     }8 9 }

断言判断是否处于轮询线程(ChannelPipeline处理请求都是在轮询线程中,都需要异步处理)

其中firstRegistration成员在DefaultChannelPipeline初始化时为true:

1 private boolean firstRegistration = true;

此时设置为false,表示第一次调用,以后都不再调用后面的callHandlerAddedForAllHandlers:

1 private void callHandlerAddedForAllHandlers() { 2     DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead; 3     synchronized(this) { 4         assert !this.registered; 5  6         this.registered = true; 7         pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; 8         this.pendingHandlerCallbackHead = null; 9     }10 11     for(DefaultChannelPipeline.PendingHandlerCallback task = pendingHandlerCallbackHead; task != null; task = task.next) {12         task.execute();13     }14 15 }

刚才说过registered初始是false,在这里判断符合,之后就令其为true,然后获取处理ChannelHandler的回调链表pendingHandlerCallbackHead,并且将pendingHandlerCallbackHead置为null

然后遍历这个单链表,处理ChannelHandler的handlerAdded和handlerRemoved的回调

fireChannelRegistered方法,当Channel完成了向Selector的注册后,会由channel的Unsafe进行回调,异步处理:

1 public final ChannelPipeline fireChannelRegistered() {2     AbstractChannelHandlerContext.invokeChannelRegistered(this.head);3     return this;4 }

实际上的处理由AbstractChannelHandlerContext的静态方法invokeChannelRegistered完成,这里传递的参数head就是DefaultChannelPipeline初始化时创建的HeadContext:

1 static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { 2     EventExecutor executor = next.executor(); 3     if (executor.inEventLoop()) { 4         next.invokeChannelRegistered(); 5     } else { 6         executor.execute(new Runnable() { 7             public void run() { 8                 next.invokeChannelRegistered(); 9             }10         });11     }12 13 }

可以看到实际上是异步执行head对象的invokeChannelRegistered方法:

1 private void invokeChannelRegistered() { 2     if (this.invokeHandler()) { 3         try { 4             ((ChannelInboundHandler)this.handler()).channelRegistered(this); 5         } catch (Throwable var2) { 6             this.notifyHandlerException(var2); 7         } 8     } else { 9         this.fireChannelRegistered();10     }11 12 }

其中invokeHandler是用来判断当前的handlerState状态:

1 private boolean invokeHandler() {2     int handlerState = this.handlerState;3     return handlerState == 2 || !this.ordered && handlerState == 1;4 }

若是当前handlerState状态为ADD_COMPLETE,或者不需要提供EventExecutor并且状态为ADD_PENDING时返回true,否则返回false

在成立的情况下,调用ChannelInboundHandler的channelRegistered方法,由于当前是head,所以由HeadContext实现了:

1 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {2     DefaultChannelPipeline.this.invokeHandlerAddedIfNeeded();3     ctx.fireChannelRegistered();4 }

首先调用invokeHandlerAddedIfNeeded,处理ChannelHandler的handlerAdded和handlerRemoved的回调

然后调用ctx的fireChannelRegistered方法:

1 public ChannelHandlerContext fireChannelRegistered() {2     invokeChannelRegistered(this.findContextInbound());3     return this;4 }

findContextInbound方法,用来找出下一个ChannelInboundInvoker:

1 private AbstractChannelHandlerContext findContextInbound() {2     AbstractChannelHandlerContext ctx = this;3 4     do {5         ctx = ctx.next;6     } while(!ctx.inbound);7 8     return ctx;9 }

从当前节点向后遍历,inbound之前说过,该方法就是找到下一个ChannelInboundInvoker的类型的AbstractChannelHandlerContext,然后调用静态方法invokeChannelRegistered,重复上述操作,若是在ChannelInboundHandler中没有重写channelRegistered方法,会一直执直到完所有ChannelHandler的channelRegistered方法。

ChannelInboundHandlerAdapter中的默认channelRegistered方法:

1 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {2     ctx.fireChannelRegistered();3 }

比HeadContext中的实现还简单,直接调用fireChannelRegistered向后传递

fireChannelRead方法,是在Selector轮循到读事件就绪,会由channel的Unsafe进行回调,异步处理:

1 public final ChannelPipeline fireChannelRead(Object msg) {2     AbstractChannelHandlerContext.invokeChannelRead(this.head, msg);3     return this;4 }

还是从head开始调用AbstractChannelHandlerContext的静态方法invokeChannelRead:

1 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { 2     final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); 3     EventExecutor executor = next.executor(); 4     if (executor.inEventLoop()) { 5         next.invokeChannelRead(m); 6     } else { 7         executor.execute(new Runnable() { 8             public void run() { 9                 next.invokeChannelRead(m);10             }11         });12     }13 14 }

和上面一个逻辑异步调用AbstractChannelHandlerContext对象的invokeChannelRead方法:

1 private void invokeChannelRead(Object msg) { 2     if (this.invokeHandler()) { 3         try { 4             ((ChannelInboundHandler)this.handler()).channelRead(this, msg); 5         } catch (Throwable var3) { 6             this.notifyHandlerException(var3); 7         } 8     } else { 9         this.fireChannelRead(msg);10     }11 12 }

这里也和上面一样,调用了HeadContext的channelRead方法:

1 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {2     ctx.fireChannelRead(msg);3 }

这里直接不处理,调用ChannelHandlerContext 的fireChannelRead方法:

1 public ChannelHandlerContext fireChannelRead(Object msg) {2     invokeChannelRead(this.findContextInbound(), msg);3     return this;4 }

和之前注册一样,选择下一个ChannelInboundHandler,重复执行上述操作。

再来看到writeAndFlush方法,和上面的就不太一样,这个发生在轮询前,用户通过channel来间接调用,在AbstractChannel中实现:

1 public ChannelFuture writeAndFlush(Object msg) {2     return this.pipeline.writeAndFlush(msg);3 }

实际上直接调用了DefaultChannelPipeline的writeAndFlush方法:

1 public final ChannelFuture writeAndFlush(Object msg) {2     return this.tail.writeAndFlush(msg);3 }

这里又有些不一样了,调用了tail的writeAndFlush方法,即TailContext的writeAndFlush,在AbstractChannelHandlerContext中实现:

1 public ChannelFuture writeAndFlush(Object msg) {2     return this.writeAndFlush(msg, this.newPromise());3 }

newPromise产生了一个ChannelPromise,用来处理异步事件的;实际上调用了writeAndFlush的重载:

1 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { 2     if (msg == null) { 3         throw new NullPointerException("msg"); 4     } else if (this.isNotValidPromise(promise, true)) { 5         ReferenceCountUtil.release(msg); 6         return promise; 7     } else { 8         this.write(msg, true, promise); 9         return promise;10     }11 }

继续调用write方法:

1 private void write(Object msg, boolean flush, ChannelPromise promise) { 2     AbstractChannelHandlerContext next = this.findContextOutbound(); 3     Object m = this.pipeline.touch(msg, next); 4     EventExecutor executor = next.executor(); 5     if (executor.inEventLoop()) { 6         if (flush) { 7             next.invokeWriteAndFlush(m, promise); 8         } else { 9             next.invokeWrite(m, promise);10         }11     } else {12         Object task;13         if (flush) {14             task = AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next, m, promise);15         } else {16             task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise);17         }18 19         safeExecute(executor, (Runnable)task, promise, m);20     }21 22 }

还是很相似,只不过先调用findContextOutbound找到下一个ChannelOutboundInvoker类型的ChannelHandlerContext,而且这里是从尾部往前遍历的,这样来看前面所给的图是没有任何问题的

在找到ChannelOutboundInvoker后,调用invokeWriteAndFlush或者invokeWrite方法:
invokeWriteAndFlush方法:

1 private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { 2     if (this.invokeHandler()) { 3         this.invokeWrite0(msg, promise); 4         this.invokeFlush0(); 5     } else { 6         this.writeAndFlush(msg, promise); 7     } 8  9 }10 11 private void invokeWrite0(Object msg, ChannelPromise promise) {12     try {13         ((ChannelOutboundHandler)this.handler()).write(this, msg, promise);14     } catch (Throwable var4) {15         notifyOutboundHandlerException(var4, promise);16     }17 18 }19 20 private void invokeFlush0() {21     try {22         ((ChannelOutboundHandler)this.handler()).flush(this);23     } catch (Throwable var2) {24         this.notifyHandlerException(var2);25     }26 27 }

可以看到invokeWriteAndFlush回调了ChannelOutboundHandler的write和flush方法

最终会调用HeadContext的write和flush方法:

1 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {2     this.unsafe.write(msg, promise);3 }4 5 public void flush(ChannelHandlerContext ctx) throws Exception {6     this.unsafe.flush();7 }

可以看到调用了unsafe的write和flush方法,向unsafe缓冲区写入了消息,当Selector轮询到写事件就绪时,就会通过unsafe将刚才写入的内容交由JDK的SocketChannel完成最终的write操作。

ChannelPipeline的分析到此全部结束。

 

转载于:https://www.cnblogs.com/a526583280/p/10961855.html

你可能感兴趣的文章
过滤器(Filter)
查看>>
字符串的操作
查看>>
性能优化之Java(Android)代码优化
查看>>
springMVC相关—文件上传
查看>>
由Oracle 11g SYSAUX 和 SYSTEM 表空间回收引发的联想
查看>>
uva 1416 Warfare And Logistics
查看>>
欲则不达
查看>>
盒子游戏
查看>>
OpenJudgeP1.10.08:病人排队__(刷题)_水题
查看>>
观察者模式
查看>>
Hadoop分布式文件系统中架构和设计要点汇总
查看>>
cout和printf
查看>>
UVa 10088 - Trees on My Island (pick定理)
查看>>
#C++PrimerPlus# Chapter11_Exersice4_mytimeV4
查看>>
iOS8 针对开发者所拥有的新特性汇总如下
查看>>
Jmeter + Grafana搭建实时监控可视化
查看>>
uCGUI字符串显示过程分析和uCGUI字库的组建
查看>>
h5唤起app
查看>>
SQL Server 2008 /SQL Server 2008 R2 配置数据库邮件
查看>>
[转]vs2010编译金山代码
查看>>