View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.channel.Channel.Unsafe;
19  import io.netty.util.ReferenceCountUtil;
20  import io.netty.util.ResourceLeakDetector;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.concurrent.EventExecutorGroup;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.internal.OneTimeTask;
25  import io.netty.util.internal.PlatformDependent;
26  import io.netty.util.internal.StringUtil;
27  import io.netty.util.internal.logging.InternalLogger;
28  import io.netty.util.internal.logging.InternalLoggerFactory;
29  
30  import java.net.SocketAddress;
31  import java.util.ArrayList;
32  import java.util.IdentityHashMap;
33  import java.util.Iterator;
34  import java.util.LinkedHashMap;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.NoSuchElementException;
38  import java.util.WeakHashMap;
39  import java.util.concurrent.ExecutionException;
40  import java.util.concurrent.Future;
41  import java.util.concurrent.RejectedExecutionException;
42  
43  /**
44   * The default [email protected] ChannelPipeline} implementation.  It is usually created
45   * by a [email protected] Channel} implementation when the [email protected] Channel} is created.
46   */
47  final class DefaultChannelPipeline implements ChannelPipeline {
48  
49      static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
50  
51      private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
52              new FastThreadLocal<Map<Class<?>, String>>() {
53          @Override
54          protected Map<Class<?>, String> initialValue() throws Exception {
55              return new WeakHashMap<Class<?>, String>();
56          }
57      };
58  
59      final AbstractChannel channel;
60  
61      final AbstractChannelHandlerContext head;
62      final AbstractChannelHandlerContext tail;
63  
64      private final boolean touch = ResourceLeakDetector.isEnabled();
65  
66      /**
67       * @see #findInvoker(EventExecutorGroup)
68       */
69      private Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers;
70  
71      /**
72       * This is the head of a linked list that is processed by [email protected] #callHandlerAddedForAllHandlers()} and so process
73       * all the pending [email protected] #callHandlerAdded0(AbstractChannelHandlerContext)}.
74       *
75       * We only keep the head because it is expected that the list is used infrequently and its size is small.
76       * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
77       * complexity.
78       */
79      private PendingHandlerCallback pendingHandlerCallbackHead;
80  
81      /**
82       * Set to [email protected] true} once the [email protected] AbstractChannel} is registered.Once set to [email protected] true} the value will never
83       * change.
84       */
85      private boolean registered;
86  
87      DefaultChannelPipeline(AbstractChannel channel) {
88          if (channel == null) {
89              throw new NullPointerException("channel");
90          }
91          this.channel = channel;
92  
93          tail = new TailContext(this);
94          head = new HeadContext(this);
95  
96          head.next = tail;
97          tail.prev = head;
98      }
99  
100     Object touch(Object msg, AbstractChannelHandlerContext next) {
101         return touch ? ReferenceCountUtil.touch(msg, next) : msg;
102     }
103 
104     @Override
105     public Channel channel() {
106         return channel;
107     }
108 
109     @Override
110     public ChannelPipeline addFirst(String name, ChannelHandler handler) {
111         return addFirst(null, null, name, handler);
112     }
113 
114     @Override
115     public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
116         return addFirst(group, null, name, handler);
117     }
118 
119     @Override
120     public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
121         return addFirst(null, invoker, name, handler);
122     }
123 
124     private ChannelPipeline addFirst(
125             EventExecutorGroup group, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
126         final AbstractChannelHandlerContext newCtx;
127         final EventExecutor executor;
128         final boolean inEventLoop;
129         synchronized (this) {
130             checkMultiplicity(handler);
131 
132             if (group != null) {
133                 invoker = findInvoker(group);
134             }
135 
136             newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
137             executor = executorSafe(invoker);
138 
139             // If the executor is null it means that the channel was not registered on an eventloop yet.
140             // In this case we add the context to the pipeline and add a task that will call
141             // ChannelHandler.handlerAdded(...) once the channel is registered.
142             if (executor == null) {
143                 addFirst0(newCtx);
144                 callHandlerCallbackLater(newCtx, true);
145                 return this;
146             }
147             inEventLoop = executor.inEventLoop();
148             if (inEventLoop) {
149                 addFirst0(newCtx);
150             }
151         }
152 
153         if (inEventLoop) {
154             callHandlerAdded0(newCtx);
155         } else {
156             waitForFuture(executor.submit(new OneTimeTask() {
157                 @Override
158                 public void run() {
159                     synchronized (DefaultChannelPipeline.this) {
160                         addFirst0(newCtx);
161                     }
162                     callHandlerAdded0(newCtx);
163                 }
164             }));
165         }
166         return this;
167     }
168 
169     private void addFirst0(AbstractChannelHandlerContext newCtx) {
170         AbstractChannelHandlerContext nextCtx = head.next;
171         newCtx.prev = head;
172         newCtx.next = nextCtx;
173         head.next = newCtx;
174         nextCtx.prev = newCtx;
175     }
176 
177     @Override
178     public ChannelPipeline addLast(String name, ChannelHandler handler) {
179         return addLast(null, null, name, handler);
180     }
181 
182     @Override
183     public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
184         return addLast(group, null, name, handler);
185     }
186 
187     @Override
188     public ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
189         return addLast(null, invoker, name, handler);
190     }
191 
192     private ChannelPipeline addLast(EventExecutorGroup group, ChannelHandlerInvoker invoker,
193                                     String name, ChannelHandler handler) {
194         assertGroupAndInvoker(group, invoker);
195 
196         final EventExecutor executor;
197         final AbstractChannelHandlerContext newCtx;
198         final boolean inEventLoop;
199         synchronized (this) {
200             checkMultiplicity(handler);
201 
202             if (group != null) {
203                 invoker = findInvoker(group);
204             }
205 
206             newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
207             executor = executorSafe(invoker);
208 
209             // If the executor is null it means that the channel was not registered on an eventloop yet.
210             // In this case we add the context to the pipeline and add a task that will call
211             // ChannelHandler.handlerAdded(...) once the channel is registered.
212             if (executor == null) {
213                 addLast0(newCtx);
214                 callHandlerCallbackLater(newCtx, true);
215                 return this;
216             }
217             inEventLoop = executor.inEventLoop();
218             if (inEventLoop) {
219                 addLast0(newCtx);
220             }
221         }
222         if (inEventLoop) {
223             callHandlerAdded0(newCtx);
224         } else {
225             waitForFuture(executor.submit(new OneTimeTask() {
226                 @Override
227                 public void run() {
228                     synchronized (DefaultChannelPipeline.this) {
229                         addLast0(newCtx);
230                     }
231                     callHandlerAdded0(newCtx);
232                 }
233             }));
234         }
235         return this;
236     }
237 
238     private void addLast0(AbstractChannelHandlerContext newCtx) {
239         AbstractChannelHandlerContext prev = tail.prev;
240         newCtx.prev = prev;
241         newCtx.next = tail;
242         prev.next = newCtx;
243         tail.prev = newCtx;
244     }
245 
246     @Override
247     public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
248         return addBefore(null, null, baseName, name, handler);
249     }
250 
251     @Override
252     public ChannelPipeline addBefore(
253             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
254         return addBefore(group, null, baseName, name, handler);
255     }
256 
257     @Override
258     public ChannelPipeline addBefore(
259             ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
260         return addBefore(null, invoker, baseName, name, handler);
261     }
262 
263     private ChannelPipeline addBefore(EventExecutorGroup group,
264             ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
265         assertGroupAndInvoker(group, invoker);
266 
267         final EventExecutor executor;
268         final AbstractChannelHandlerContext newCtx;
269         final AbstractChannelHandlerContext ctx;
270         final boolean inEventLoop;
271         synchronized (this) {
272             checkMultiplicity(handler);
273             ctx = getContextOrDie(baseName);
274 
275             if (group != null) {
276                 invoker = findInvoker(group);
277             }
278 
279             newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
280             executor = executorSafe(invoker);
281 
282             // If the executor is null it means that the channel was not registered on an eventloop yet.
283             // In this case we add the context to the pipeline and add a task that will call
284             // ChannelHandler.handlerAdded(...) once the channel is registered.
285             if (executor == null) {
286                 addBefore0(ctx, newCtx);
287                 callHandlerCallbackLater(newCtx, true);
288                 return this;
289             }
290 
291             inEventLoop = executor.inEventLoop();
292             if (inEventLoop) {
293                 addBefore0(ctx, newCtx);
294             }
295         }
296 
297         if (inEventLoop) {
298             callHandlerAdded0(newCtx);
299         } else {
300             waitForFuture(executor.submit(new OneTimeTask() {
301                 @Override
302                 public void run() {
303                     synchronized (DefaultChannelPipeline.this) {
304                         addBefore0(ctx, newCtx);
305                     }
306                     callHandlerAdded0(newCtx);
307                 }
308             }));
309         }
310         return this;
311     }
312 
313     private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
314         newCtx.prev = ctx.prev;
315         newCtx.next = ctx;
316         ctx.prev.next = newCtx;
317         ctx.prev = newCtx;
318     }
319 
320     @Override
321     public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
322         return addAfter(null, null, baseName, name, handler);
323     }
324 
325     @Override
326     public ChannelPipeline addAfter(
327             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
328         return addAfter(group, null, baseName, name, handler);
329     }
330 
331     @Override
332     public ChannelPipeline addAfter(
333             ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
334         return addAfter(null, invoker, baseName, name, handler);
335     }
336 
337     private ChannelPipeline addAfter(EventExecutorGroup group,
338             ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
339         assertGroupAndInvoker(group, invoker);
340 
341         final EventExecutor executor;
342         final AbstractChannelHandlerContext newCtx;
343         final AbstractChannelHandlerContext ctx;
344         final boolean inEventLoop;
345 
346         synchronized (this) {
347             checkMultiplicity(handler);
348             ctx = getContextOrDie(baseName);
349 
350             if (group != null) {
351                 invoker = findInvoker(group);
352             }
353 
354             newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
355             executor = executorSafe(invoker);
356 
357             // If the executor is null it means that the channel was not registered on an eventloop yet.
358             // In this case we remove the context from the pipeline and add a task that will call
359             // ChannelHandler.handlerRemoved(...) once the channel is registered.
360             if (executor == null) {
361                 addAfter0(ctx, newCtx);
362                 callHandlerCallbackLater(newCtx, true);
363                 return this;
364             }
365             inEventLoop = executor.inEventLoop();
366             if (inEventLoop) {
367                 addAfter0(ctx, newCtx);
368             }
369         }
370         if (inEventLoop) {
371             callHandlerAdded0(newCtx);
372         } else {
373             waitForFuture(executor.submit(new OneTimeTask() {
374                 @Override
375                 public void run() {
376                     synchronized (DefaultChannelPipeline.this) {
377                         addAfter0(ctx, newCtx);
378                     }
379                     callHandlerAdded0(newCtx);
380                 }
381             }));
382         }
383         return this;
384     }
385 
386     private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
387         newCtx.prev = ctx;
388         newCtx.next = ctx.next;
389         ctx.next.prev = newCtx;
390         ctx.next = newCtx;
391     }
392 
393     @Override
394     public ChannelPipeline addFirst(ChannelHandler... handlers) {
395         return addFirst((ChannelHandlerInvoker) null, handlers);
396     }
397 
398     @Override
399     public ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers) {
400         if (handlers == null) {
401             throw new NullPointerException("handlers");
402         }
403         if (handlers.length == 0 || handlers[0] == null) {
404             return this;
405         }
406 
407         int size;
408         for (size = 1; size < handlers.length; size ++) {
409             if (handlers[size] == null) {
410                 break;
411             }
412         }
413 
414         for (int i = size - 1; i >= 0; i --) {
415             ChannelHandler h = handlers[i];
416             addFirst(group, null, h);
417         }
418 
419         return this;
420     }
421 
422     @Override
423     public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, ChannelHandler... handlers) {
424         if (handlers == null) {
425             throw new NullPointerException("handlers");
426         }
427         if (handlers.length == 0 || handlers[0] == null) {
428             return this;
429         }
430 
431         int size;
432         for (size = 1; size < handlers.length; size ++) {
433             if (handlers[size] == null) {
434                 break;
435             }
436         }
437 
438         for (int i = size - 1; i >= 0; i --) {
439             ChannelHandler h = handlers[i];
440             addFirst(invoker, null, h);
441         }
442 
443         return this;
444     }
445 
446     @Override
447     public ChannelPipeline addLast(ChannelHandler... handlers) {
448         return addLast((ChannelHandlerInvoker) null, handlers);
449     }
450 
451     @Override
452     public ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers) {
453         if (handlers == null) {
454             throw new NullPointerException("handlers");
455         }
456 
457         for (ChannelHandler h: handlers) {
458             if (h == null) {
459                 break;
460             }
461             addLast(group, null, h);
462         }
463 
464         return this;
465     }
466 
467     @Override
468     public ChannelPipeline addLast(ChannelHandlerInvoker invoker, ChannelHandler... handlers) {
469         if (handlers == null) {
470             throw new NullPointerException("handlers");
471         }
472 
473         for (ChannelHandler h: handlers) {
474             if (h == null) {
475                 break;
476             }
477             addLast(invoker, null, h);
478         }
479 
480         return this;
481     }
482 
483     private ChannelHandlerInvoker findInvoker(EventExecutorGroup group) {
484         if (group == null) {
485             return null;
486         }
487 
488         // Lazily initialize the data structure that maps an EventExecutorGroup to a ChannelHandlerInvoker.
489         Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers = this.childInvokers;
490         if (childInvokers == null) {
491             childInvokers = this.childInvokers = new IdentityHashMap<EventExecutorGroup, ChannelHandlerInvoker>(4);
492         }
493 
494         // Pick one of the child executors and remember its invoker
495         // so that the same invoker is used to fire events for the same channel.
496         ChannelHandlerInvoker  invoker = childInvokers.get(group);
497         if (invoker == null) {
498             EventExecutor executor = group.next();
499             if (executor instanceof EventLoop) {
500                 invoker = ((EventLoop) executor).asInvoker();
501             } else {
502                 invoker = new DefaultChannelHandlerInvoker(executor);
503             }
504             childInvokers.put(group, invoker);
505         }
506 
507         return invoker;
508     }
509 
510     private String generateName(ChannelHandler handler) {
511         Map<Class<?>, String> cache = nameCaches.get();
512         Class<?> handlerType = handler.getClass();
513         String name = cache.get(handlerType);
514         if (name == null) {
515             name = generateName0(handlerType);
516             cache.put(handlerType, name);
517         }
518 
519         // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
520         // any name conflicts.  Note that we don't cache the names generated here.
521         if (context0(name) != null) {
522             String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
523             for (int i = 1;; i ++) {
524                 String newName = baseName + i;
525                 if (context0(newName) == null) {
526                     name = newName;
527                     break;
528                 }
529             }
530         }
531         return name;
532     }
533 
534     private static String generateName0(Class<?> handlerType) {
535         return StringUtil.simpleClassName(handlerType) + "#0";
536     }
537 
538     @Override
539     public ChannelPipeline remove(ChannelHandler handler) {
540         remove(getContextOrDie(handler));
541         return this;
542     }
543 
544     @Override
545     public ChannelHandler remove(String name) {
546         return remove(getContextOrDie(name)).handler();
547     }
548 
549     @SuppressWarnings("unchecked")
550     @Override
551     public <T extends ChannelHandler> T remove(Class<T> handlerType) {
552         return (T) remove(getContextOrDie(handlerType)).handler();
553     }
554 
555     private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
556         assert ctx != head && ctx != tail;
557 
558         final EventExecutor executor;
559         final boolean inEventLoop;
560         synchronized (this) {
561             executor = executorSafe(ctx.invoker);
562 
563             // If the executor is null it means that the channel was not registered on an eventloop yet.
564             // In this case we remove the context from the pipeline and add a task that will call
565             // ChannelHandler.handlerRemoved(...) once the channel is registered.
566             if (executor == null) {
567                 remove0(ctx);
568                 callHandlerCallbackLater(ctx, false);
569                 return ctx;
570             }
571             inEventLoop = executor.inEventLoop();
572             if (inEventLoop) {
573                 remove0(ctx);
574             }
575         }
576         if (inEventLoop) {
577             callHandlerRemoved0(ctx);
578         } else {
579             waitForFuture(executor.submit(new OneTimeTask() {
580                 @Override
581                 public void run() {
582                     synchronized (DefaultChannelPipeline.this) {
583                         remove0(ctx);
584                     }
585                     callHandlerRemoved0(ctx);
586                 }
587             }));
588         }
589         return ctx;
590     }
591 
592     private static void remove0(AbstractChannelHandlerContext ctx) {
593         AbstractChannelHandlerContext prev = ctx.prev;
594         AbstractChannelHandlerContext next = ctx.next;
595         prev.next = next;
596         next.prev = prev;
597     }
598 
599     @Override
600     public ChannelHandler removeFirst() {
601         if (head.next == tail) {
602             throw new NoSuchElementException();
603         }
604         return remove(head.next).handler();
605     }
606 
607     @Override
608     public ChannelHandler removeLast() {
609         if (head.next == tail) {
610             throw new NoSuchElementException();
611         }
612         return remove(tail.prev).handler();
613     }
614 
615     @Override
616     public ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
617         replace(getContextOrDie(oldHandler), newName, newHandler);
618         return this;
619     }
620 
621     @Override
622     public ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
623         return replace(getContextOrDie(oldName), newName, newHandler);
624     }
625 
626     @Override
627     @SuppressWarnings("unchecked")
628     public <T extends ChannelHandler> T replace(
629             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
630         return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
631     }
632 
633     private ChannelHandler replace(
634             final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
635         assert ctx != head && ctx != tail;
636 
637         final AbstractChannelHandlerContext newCtx;
638         final EventExecutor executor;
639         final boolean inEventLoop;
640         synchronized (this) {
641             checkMultiplicity(newHandler);
642 
643             if (newName == null) {
644                 newName = ctx.name();
645             } else if (!ctx.name().equals(newName)) {
646                 newName = filterName(newName, newHandler);
647             }
648 
649             newCtx = new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler);
650             executor = executorSafe(ctx.invoker);
651 
652             // If the executor is null it means that the channel was not registered on an eventloop yet.
653             // In this case we replace the context in the pipeline
654             // and add a task that will call ChannelHandler.handlerAdded(...) and
655             // ChannelHandler.handlerRemoved(...) once the channel is registered.
656             if (executor == null) {
657                 replace0(ctx, newCtx);
658                 callHandlerCallbackLater(newCtx, true);
659                 callHandlerCallbackLater(ctx, false);
660                 return ctx.handler();
661             }
662             inEventLoop = executor.inEventLoop();
663             if (inEventLoop) {
664                 replace0(ctx, newCtx);
665             }
666         }
667         if (inEventLoop) {
668             // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
669             // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
670             // event handlers must be called after handlerAdded().
671             callHandlerAdded0(newCtx);
672             callHandlerRemoved0(ctx);
673         } else {
674             waitForFuture(executor.submit(new OneTimeTask() {
675                 @Override
676                 public void run() {
677                     synchronized (DefaultChannelPipeline.this) {
678                         replace0(ctx, newCtx);
679                     }
680                     // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
681                     // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
682                     // those event handlers must be called after handlerAdded().
683                     callHandlerAdded0(newCtx);
684                     callHandlerRemoved0(ctx);
685                 }
686             }));
687         }
688         return ctx.handler();
689     }
690 
691     private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
692         AbstractChannelHandlerContext prev = oldCtx.prev;
693         AbstractChannelHandlerContext next = oldCtx.next;
694         newCtx.prev = prev;
695         newCtx.next = next;
696 
697         // Finish the replacement of oldCtx with newCtx in the linked list.
698         // Note that this doesn't mean events will be sent to the new handler immediately
699         // because we are currently at the event handler thread and no more than one handler methods can be invoked
700         // at the same time (we ensured that in replace().)
701         prev.next = newCtx;
702         next.prev = newCtx;
703 
704         // update the reference to the replacement so forward of buffered content will work correctly
705         oldCtx.prev = newCtx;
706         oldCtx.next = newCtx;
707     }
708 
709     private static void checkMultiplicity(ChannelHandler handler) {
710         if (handler instanceof ChannelHandlerAdapter) {
711             ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
712             if (!h.isSharable() && h.added) {
713                 throw new ChannelPipelineException(
714                         h.getClass().getName() +
715                         " is not a @Sharable handler, so can't be added or removed multiple times.");
716             }
717             h.added = true;
718         }
719     }
720 
721     private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
722         try {
723             ctx.handler().handlerAdded(ctx);
724         } catch (Throwable t) {
725             boolean removed = false;
726             try {
727                 remove0(ctx);
728                 try {
729                     ctx.handler().handlerRemoved(ctx);
730                 } finally {
731                     ctx.setRemoved();
732                 }
733                 removed = true;
734             } catch (Throwable t2) {
735                 if (logger.isWarnEnabled()) {
736                     logger.warn("Failed to remove a handler: " + ctx.name(), t2);
737                 }
738             }
739 
740             if (removed) {
741                 fireExceptionCaught(new ChannelPipelineException(
742                         ctx.handler().getClass().getName() +
743                         ".handlerAdded() has thrown an exception; removed.", t));
744             } else {
745                 fireExceptionCaught(new ChannelPipelineException(
746                         ctx.handler().getClass().getName() +
747                         ".handlerAdded() has thrown an exception; also failed to remove.", t));
748             }
749         }
750     }
751 
752     private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
753         // Notify the complete removal.
754         try {
755             try {
756                 ctx.handler().handlerRemoved(ctx);
757             } finally {
758                 ctx.setRemoved();
759             }
760         } catch (Throwable t) {
761             fireExceptionCaught(new ChannelPipelineException(
762                     ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
763         }
764     }
765 
766     /**
767      * Waits for a future to finish.  If the task is interrupted, then the current thread will be interrupted.
768      * It is expected that the task performs any appropriate locking.
769      * <p>
770      * If the internal call throws a [email protected] Throwable}, but it is not an instance of [email protected] Error} or
771      * [email protected] RuntimeException}, then it is wrapped inside a [email protected] ChannelPipelineException} and that is
772      * thrown instead.</p>
773      *
774      * @param future wait for this future
775      * @see Future#get()
776      * @throws Error if the task threw this.
777      * @throws RuntimeException if the task threw this.
778      * @throws ChannelPipelineException with a [email protected] Throwable} as a cause, if the task threw another type of
779      *         [email protected] Throwable}.
780      */
781     private static void waitForFuture(Future<?> future) {
782         try {
783             future.get();
784         } catch (ExecutionException ex) {
785             // In the arbitrary case, we can throw Error, RuntimeException, and Exception
786             PlatformDependent.throwException(ex.getCause());
787         } catch (InterruptedException ex) {
788             // Interrupt the calling thread (note that this method is not called from the event loop)
789             Thread.currentThread().interrupt();
790         }
791     }
792 
793     @Override
794     public ChannelHandler first() {
795         ChannelHandlerContext first = firstContext();
796         if (first == null) {
797             return null;
798         }
799         return first.handler();
800     }
801 
802     @Override
803     public ChannelHandlerContext firstContext() {
804         AbstractChannelHandlerContext first = head.next;
805         if (first == tail) {
806             return null;
807         }
808         return head.next;
809     }
810 
811     @Override
812     public ChannelHandler last() {
813         AbstractChannelHandlerContext last = tail.prev;
814         if (last == head) {
815             return null;
816         }
817         return last.handler();
818     }
819 
820     @Override
821     public ChannelHandlerContext lastContext() {
822         AbstractChannelHandlerContext last = tail.prev;
823         if (last == head) {
824             return null;
825         }
826         return last;
827     }
828 
829     @Override
830     public ChannelHandler get(String name) {
831         ChannelHandlerContext ctx = context(name);
832         if (ctx == null) {
833             return null;
834         } else {
835             return ctx.handler();
836         }
837     }
838 
839     @SuppressWarnings("unchecked")
840     @Override
841     public <T extends ChannelHandler> T get(Class<T> handlerType) {
842         ChannelHandlerContext ctx = context(handlerType);
843         if (ctx == null) {
844             return null;
845         } else {
846             return (T) ctx.handler();
847         }
848     }
849 
850     @Override
851     public ChannelHandlerContext context(String name) {
852         if (name == null) {
853             throw new NullPointerException("name");
854         }
855 
856         return context0(name);
857     }
858 
859     @Override
860     public ChannelHandlerContext context(ChannelHandler handler) {
861         if (handler == null) {
862             throw new NullPointerException("handler");
863         }
864 
865         AbstractChannelHandlerContext ctx = head.next;
866         for (;;) {
867 
868             if (ctx == null) {
869                 return null;
870             }
871 
872             if (ctx.handler() == handler) {
873                 return ctx;
874             }
875 
876             ctx = ctx.next;
877         }
878     }
879 
880     @Override
881     public ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
882         if (handlerType == null) {
883             throw new NullPointerException("handlerType");
884         }
885 
886         AbstractChannelHandlerContext ctx = head.next;
887         for (;;) {
888             if (ctx == null) {
889                 return null;
890             }
891             if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
892                 return ctx;
893             }
894             ctx = ctx.next;
895         }
896     }
897 
898     @Override
899     public List<String> names() {
900         List<String> list = new ArrayList<String>();
901         AbstractChannelHandlerContext ctx = head.next;
902         for (;;) {
903             if (ctx == null) {
904                 return list;
905             }
906             list.add(ctx.name());
907             ctx = ctx.next;
908         }
909     }
910 
911     @Override
912     public Map<String, ChannelHandler> toMap() {
913         Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
914         AbstractChannelHandlerContext ctx = head.next;
915         for (;;) {
916             if (ctx == tail) {
917                 return map;
918             }
919             map.put(ctx.name(), ctx.handler());
920             ctx = ctx.next;
921         }
922     }
923 
924     @Override
925     public Iterator<Map.Entry<String, ChannelHandler>> iterator() {
926         return toMap().entrySet().iterator();
927     }
928 
929     /**
930      * Returns the [email protected] String} representation of this pipeline.
931      */
932     @Override
933     public String toString() {
934         StringBuilder buf = new StringBuilder()
935             .append(StringUtil.simpleClassName(this))
936             .append('{');
937         AbstractChannelHandlerContext ctx = head.next;
938         for (;;) {
939             if (ctx == tail) {
940                 break;
941             }
942 
943             buf.append('(')
944                .append(ctx.name())
945                .append(" = ")
946                .append(ctx.handler().getClass().getName())
947                .append(')');
948 
949             ctx = ctx.next;
950             if (ctx == tail) {
951                 break;
952             }
953 
954             buf.append(", ");
955         }
956         buf.append('}');
957         return buf.toString();
958     }
959 
960     @Override
961     public ChannelPipeline fireChannelRegistered() {
962         head.fireChannelRegistered();
963         return this;
964     }
965 
966     @Override
967     public ChannelPipeline fireChannelUnregistered() {
968         head.fireChannelUnregistered();
969 
970         // Remove all handlers sequentially if channel is closed and unregistered.
971         if (!channel.isOpen()) {
972             destroy();
973         }
974         return this;
975     }
976 
977     /**
978      * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger
979      * handlerRemoved().
980      *
981      * Note that we traverse up the pipeline ([email protected] #destroyUp(AbstractChannelHandlerContext, boolean)})
982      * before traversing down ([email protected] #destroyDown(Thread, AbstractChannelHandlerContext, boolean)}) so that
983      * the handlers are removed after all events are handled.
984      *
985      * See: https://github.com/netty/netty/issues/3156
986      */
987     private synchronized void destroy() {
988         destroyUp(head.next, false);
989     }
990 
991     private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
992         final Thread currentThread = Thread.currentThread();
993         final AbstractChannelHandlerContext tail = this.tail;
994         for (;;) {
995             if (ctx == tail) {
996                 destroyDown(currentThread, tail.prev, inEventLoop);
997                 break;
998             }
999 
1000             final EventExecutor executor = ctx.executor();
1001             if (!inEventLoop && !executor.inEventLoop(currentThread)) {
1002                 final AbstractChannelHandlerContext finalCtx = ctx;
1003                 executor.execute(new OneTimeTask() {
1004                     @Override
1005                     public void run() {
1006                         destroyUp(finalCtx, true);
1007                     }
1008                 });
1009                 break;
1010             }
1011 
1012             ctx = ctx.next;
1013             inEventLoop = false;
1014         }
1015     }
1016 
1017     private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
1018         // We have reached at tail; now traverse backwards.
1019         final AbstractChannelHandlerContext head = this.head;
1020         for (;;) {
1021             if (ctx == head) {
1022                 break;
1023             }
1024 
1025             final EventExecutor executor = ctx.executor();
1026             if (inEventLoop || executor.inEventLoop(currentThread)) {
1027                 synchronized (this) {
1028                     remove0(ctx);
1029                     callHandlerRemoved0(ctx);
1030                 }
1031             } else {
1032                 final AbstractChannelHandlerContext finalCtx = ctx;
1033                 executor.execute(new OneTimeTask() {
1034                     @Override
1035                     public void run() {
1036                         destroyDown(Thread.currentThread(), finalCtx, true);
1037                     }
1038                 });
1039                 break;
1040             }
1041 
1042             ctx = ctx.prev;
1043             inEventLoop = false;
1044         }
1045     }
1046 
1047     @Override
1048     public ChannelPipeline fireChannelActive() {
1049         head.fireChannelActive();
1050 
1051         if (channel.config().isAutoRead()) {
1052             channel.read();
1053         }
1054 
1055         return this;
1056     }
1057 
1058     @Override
1059     public ChannelPipeline fireChannelInactive() {
1060         head.fireChannelInactive();
1061         return this;
1062     }
1063 
1064     @Override
1065     public ChannelPipeline fireExceptionCaught(Throwable cause) {
1066         head.fireExceptionCaught(cause);
1067         return this;
1068     }
1069 
1070     @Override
1071     public ChannelPipeline fireUserEventTriggered(Object event) {
1072         head.fireUserEventTriggered(event);
1073         return this;
1074     }
1075 
1076     @Override
1077     public ChannelPipeline fireChannelRead(Object msg) {
1078         head.fireChannelRead(msg);
1079         return this;
1080     }
1081 
1082     @Override
1083     public ChannelPipeline fireChannelReadComplete() {
1084         head.fireChannelReadComplete();
1085         if (channel.config().isAutoRead()) {
1086             read();
1087         }
1088         return this;
1089     }
1090 
1091     @Override
1092     public ChannelPipeline fireChannelWritabilityChanged() {
1093         head.fireChannelWritabilityChanged();
1094         return this;
1095     }
1096 
1097     @Override
1098     public ChannelFuture bind(SocketAddress localAddress) {
1099         return tail.bind(localAddress);
1100     }
1101 
1102     @Override
1103     public ChannelFuture connect(SocketAddress remoteAddress) {
1104         return tail.connect(remoteAddress);
1105     }
1106 
1107     @Override
1108     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
1109         return tail.connect(remoteAddress, localAddress);
1110     }
1111 
1112     @Override
1113     public ChannelFuture disconnect() {
1114         return tail.disconnect();
1115     }
1116 
1117     @Override
1118     public ChannelFuture close() {
1119         return tail.close();
1120     }
1121 
1122     @Override
1123     public ChannelFuture deregister() {
1124         return tail.deregister();
1125     }
1126 
1127     @Override
1128     public ChannelPipeline flush() {
1129         tail.flush();
1130         return this;
1131     }
1132 
1133     @Override
1134     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
1135         return tail.bind(localAddress, promise);
1136     }
1137 
1138     @Override
1139     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
1140         return tail.connect(remoteAddress, promise);
1141     }
1142 
1143     @Override
1144     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1145         return tail.connect(remoteAddress, localAddress, promise);
1146     }
1147 
1148     @Override
1149     public ChannelFuture disconnect(ChannelPromise promise) {
1150         return tail.disconnect(promise);
1151     }
1152 
1153     @Override
1154     public ChannelFuture close(ChannelPromise promise) {
1155         return tail.close(promise);
1156     }
1157 
1158     @Override
1159     public ChannelFuture deregister(ChannelPromise promise) {
1160         return tail.deregister(promise);
1161     }
1162 
1163     @Override
1164     public ChannelPipeline read() {
1165         tail.read();
1166         return this;
1167     }
1168 
1169     @Override
1170     public ChannelFuture write(Object msg) {
1171         return tail.write(msg);
1172     }
1173 
1174     @Override
1175     public ChannelFuture write(Object msg, ChannelPromise promise) {
1176         return tail.write(msg, promise);
1177     }
1178 
1179     @Override
1180     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
1181         return tail.writeAndFlush(msg, promise);
1182     }
1183 
1184     @Override
1185     public ChannelFuture writeAndFlush(Object msg) {
1186         return tail.writeAndFlush(msg);
1187     }
1188 
1189     private String filterName(String name, ChannelHandler handler) {
1190         if (name == null) {
1191             return generateName(handler);
1192         }
1193 
1194         if (context0(name) == null) {
1195             return name;
1196         }
1197 
1198         throw new IllegalArgumentException("Duplicate handler name: " + name);
1199     }
1200 
1201     private AbstractChannelHandlerContext context0(String name) {
1202         AbstractChannelHandlerContext context = head.next;
1203         while (context != tail) {
1204             if (context.name().equals(name)) {
1205                 return context;
1206             }
1207             context = context.next;
1208         }
1209         return null;
1210     }
1211 
1212     private AbstractChannelHandlerContext getContextOrDie(String name) {
1213         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1214         if (ctx == null) {
1215             throw new NoSuchElementException(name);
1216         } else {
1217             return ctx;
1218         }
1219     }
1220 
1221     private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1222         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1223         if (ctx == null) {
1224             throw new NoSuchElementException(handler.getClass().getName());
1225         } else {
1226             return ctx;
1227         }
1228     }
1229 
1230     private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1231         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1232         if (ctx == null) {
1233             throw new NoSuchElementException(handlerType.getName());
1234         } else {
1235             return ctx;
1236         }
1237     }
1238 
1239     /**
1240      * Should be called before [email protected] #fireChannelRegistered()} is called the first time.
1241      */
1242     void callHandlerAddedForAllHandlers() {
1243         // This should only called from within the EventLoop.
1244         assert channel.eventLoop().inEventLoop();
1245 
1246         final PendingHandlerCallback pendingHandlerCallbackHead;
1247         synchronized (this) {
1248             assert !registered;
1249 
1250             // This Channel itself was registered.
1251             registered = true;
1252 
1253             pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1254             // Null out so it can be GC'ed.
1255             this.pendingHandlerCallbackHead = null;
1256         }
1257 
1258         // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
1259         // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
1260         // the EventLoop.
1261         PendingHandlerCallback task = pendingHandlerCallbackHead;
1262         while (task != null) {
1263             task.execute();
1264             task = task.next;
1265         }
1266     }
1267 
1268     private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1269         assert !registered;
1270 
1271         PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1272         PendingHandlerCallback pending = pendingHandlerCallbackHead;
1273         if (pending == null) {
1274             pendingHandlerCallbackHead = task;
1275         } else {
1276             // Find the tail of the linked-list.
1277             while (pending.next != null) {
1278                 pending = pending.next;
1279             }
1280             pending.next = task;
1281         }
1282     }
1283 
1284     private EventExecutor executorSafe(ChannelHandlerInvoker invoker) {
1285         if (invoker == null) {
1286             // We check for channel().isRegistered and handlerAdded because even if isRegistered() is false we
1287             // can safely access the invoker() if handlerAdded is true. This is because in this case the Channel
1288             // was previously registered and so we can still access the old EventLoop to dispatch things.
1289             return channel.isRegistered() || registered ? channel.unsafe().invoker().executor() : null;
1290         }
1291         return invoker.executor();
1292     }
1293 
1294     private static void assertGroupAndInvoker(EventExecutorGroup group, ChannelHandlerInvoker invoker) {
1295         assert group == null || invoker == null : "either group or invoker must be null";
1296     }
1297 
1298     // A special catch-all handler that handles both bytes and messages.
1299     static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1300 
1301         private static final String TAIL_NAME = generateName0(TailContext.class);
1302 
1303         TailContext(DefaultChannelPipeline pipeline) {
1304             super(pipeline, null, TAIL_NAME, true, false);
1305         }
1306 
1307         @Override
1308         public ChannelHandler handler() {
1309             return this;
1310         }
1311 
1312         @Override
1313         public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
1314 
1315         @Override
1316         public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
1317 
1318         @Override
1319         public void channelActive(ChannelHandlerContext ctx) throws Exception { }
1320 
1321         @Override
1322         public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
1323 
1324         @Override
1325         public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }
1326 
1327         @Override
1328         public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
1329 
1330         @Override
1331         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
1332 
1333         @Override
1334         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1335             // This may not be a configuration error and so don't log anything.
1336             // The event may be superfluous for the current pipeline configuration.
1337             ReferenceCountUtil.release(evt);
1338         }
1339 
1340         @Override
1341         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1342             try {
1343                 logger.warn(
1344                         "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1345                                 "It usually means the last handler in the pipeline did not handle the exception.",
1346                                 cause);
1347             } finally {
1348                 ReferenceCountUtil.release(cause);
1349             }
1350         }
1351 
1352         @Override
1353         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
1354             try {
1355                 logger.debug(
1356                         "Discarded inbound message {} that reached at the tail of the pipeline. " +
1357                                 "Please check your pipeline configuration.", msg);
1358             } finally {
1359                 ReferenceCountUtil.release(msg);
1360             }
1361         }
1362 
1363         @Override
1364         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
1365     }
1366 
1367     static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler {
1368 
1369         private static final String HEAD_NAME = generateName0(HeadContext.class);
1370 
1371         private final Unsafe unsafe;
1372 
1373         HeadContext(DefaultChannelPipeline pipeline) {
1374             super(pipeline, null, HEAD_NAME, false, true);
1375             unsafe = pipeline.channel().unsafe();
1376         }
1377 
1378         @Override
1379         public ChannelHandler handler() {
1380             return this;
1381         }
1382 
1383         @Override
1384         public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
1385 
1386         @Override
1387         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
1388 
1389         @Override
1390         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1391             ctx.fireExceptionCaught(cause);
1392         }
1393 
1394         @Override
1395         public void bind(
1396                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
1397                 throws Exception {
1398             unsafe.bind(localAddress, promise);
1399         }
1400 
1401         @Override
1402         public void connect(
1403                 ChannelHandlerContext ctx,
1404                 SocketAddress remoteAddress, SocketAddress localAddress,
1405                 ChannelPromise promise) throws Exception {
1406             unsafe.connect(remoteAddress, localAddress, promise);
1407         }
1408 
1409         @Override
1410         public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1411             unsafe.disconnect(promise);
1412         }
1413 
1414         @Override
1415         public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1416             unsafe.close(promise);
1417         }
1418 
1419         @Override
1420         public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1421             unsafe.deregister(promise);
1422         }
1423 
1424         @Override
1425         public void read(ChannelHandlerContext ctx) {
1426             unsafe.beginRead();
1427         }
1428 
1429         @Override
1430         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
1431             unsafe.write(msg, promise);
1432         }
1433 
1434         @Override
1435         public void flush(ChannelHandlerContext ctx) throws Exception {
1436             unsafe.flush();
1437         }
1438     }
1439 
1440     private abstract static class PendingHandlerCallback extends OneTimeTask {
1441         final AbstractChannelHandlerContext ctx;
1442         PendingHandlerCallback next;
1443 
1444         PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1445             this.ctx = ctx;
1446         }
1447 
1448         abstract void execute();
1449     }
1450 
1451     private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1452 
1453         PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1454             super(ctx);
1455         }
1456 
1457         @Override
1458         public void run() {
1459             callHandlerAdded0(ctx);
1460         }
1461 
1462         @Override
1463         void execute() {
1464             EventExecutor executor = ctx.executor();
1465             if (executor.inEventLoop()) {
1466                 callHandlerAdded0(ctx);
1467             } else {
1468                 try {
1469                     executor.execute(this);
1470                 } catch (RejectedExecutionException e) {
1471                     if (logger.isWarnEnabled()) {
1472                         logger.warn(
1473                                 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1474                                 executor, ctx.name(), e);
1475                     }
1476                     remove0(ctx);
1477                     ctx.setRemoved();
1478                 }
1479             }
1480         }
1481     }
1482 
1483     private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1484 
1485         PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1486             super(ctx);
1487         }
1488 
1489         @Override
1490         public void run() {
1491             callHandlerRemoved0(ctx);
1492         }
1493 
1494         @Override
1495         void execute() {
1496             EventExecutor executor = ctx.executor();
1497             if (executor.inEventLoop()) {
1498                 callHandlerRemoved0(ctx);
1499             } else {
1500                 try {
1501                     executor.execute(this);
1502                 } catch (RejectedExecutionException e) {
1503                     if (logger.isWarnEnabled()) {
1504                         logger.warn(
1505                                 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1506                                         " removing handler {}.", executor, ctx.name(), e);
1507                     }
1508                     // remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved().
1509                     ctx.setRemoved();
1510                 }
1511             }
1512         }
1513     }
1514 }