1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.channel.Channel.Unsafe;
19 import io.netty.util.ReferenceCountUtil;
20 import io.netty.util.ResourceLeakDetector;
21 import io.netty.util.concurrent.EventExecutor;
22 import io.netty.util.concurrent.EventExecutorGroup;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.internal.OneTimeTask;
25 import io.netty.util.internal.PlatformDependent;
26 import io.netty.util.internal.StringUtil;
27 import io.netty.util.internal.logging.InternalLogger;
28 import io.netty.util.internal.logging.InternalLoggerFactory;
29
30 import java.net.SocketAddress;
31 import java.util.ArrayList;
32 import java.util.IdentityHashMap;
33 import java.util.Iterator;
34 import java.util.LinkedHashMap;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.NoSuchElementException;
38 import java.util.WeakHashMap;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.RejectedExecutionException;
42
43
44
45
46
47 final class DefaultChannelPipeline implements ChannelPipeline {
48
49 static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
50
51 private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
52 new FastThreadLocal<Map<Class<?>, String>>() {
53 @Override
54 protected Map<Class<?>, String> initialValue() throws Exception {
55 return new WeakHashMap<Class<?>, String>();
56 }
57 };
58
59 final AbstractChannel channel;
60
61 final AbstractChannelHandlerContext head;
62 final AbstractChannelHandlerContext tail;
63
64 private final boolean touch = ResourceLeakDetector.isEnabled();
65
66
67
68
69 private Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers;
70
71
72
73
74
75
76
77
78
79 private PendingHandlerCallback pendingHandlerCallbackHead;
80
81
82
83
84
85 private boolean registered;
86
87 DefaultChannelPipeline(AbstractChannel channel) {
88 if (channel == null) {
89 throw new NullPointerException("channel");
90 }
91 this.channel = channel;
92
93 tail = new TailContext(this);
94 head = new HeadContext(this);
95
96 head.next = tail;
97 tail.prev = head;
98 }
99
100 Object touch(Object msg, AbstractChannelHandlerContext next) {
101 return touch ? ReferenceCountUtil.touch(msg, next) : msg;
102 }
103
104 @Override
105 public Channel channel() {
106 return channel;
107 }
108
109 @Override
110 public ChannelPipeline addFirst(String name, ChannelHandler handler) {
111 return addFirst(null, null, name, handler);
112 }
113
114 @Override
115 public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
116 return addFirst(group, null, name, handler);
117 }
118
119 @Override
120 public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
121 return addFirst(null, invoker, name, handler);
122 }
123
124 private ChannelPipeline addFirst(
125 EventExecutorGroup group, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
126 final AbstractChannelHandlerContext newCtx;
127 final EventExecutor executor;
128 final boolean inEventLoop;
129 synchronized (this) {
130 checkMultiplicity(handler);
131
132 if (group != null) {
133 invoker = findInvoker(group);
134 }
135
136 newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
137 executor = executorSafe(invoker);
138
139
140
141
142 if (executor == null) {
143 addFirst0(newCtx);
144 callHandlerCallbackLater(newCtx, true);
145 return this;
146 }
147 inEventLoop = executor.inEventLoop();
148 if (inEventLoop) {
149 addFirst0(newCtx);
150 }
151 }
152
153 if (inEventLoop) {
154 callHandlerAdded0(newCtx);
155 } else {
156 waitForFuture(executor.submit(new OneTimeTask() {
157 @Override
158 public void run() {
159 synchronized (DefaultChannelPipeline.this) {
160 addFirst0(newCtx);
161 }
162 callHandlerAdded0(newCtx);
163 }
164 }));
165 }
166 return this;
167 }
168
169 private void addFirst0(AbstractChannelHandlerContext newCtx) {
170 AbstractChannelHandlerContext nextCtx = head.next;
171 newCtx.prev = head;
172 newCtx.next = nextCtx;
173 head.next = newCtx;
174 nextCtx.prev = newCtx;
175 }
176
177 @Override
178 public ChannelPipeline addLast(String name, ChannelHandler handler) {
179 return addLast(null, null, name, handler);
180 }
181
182 @Override
183 public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
184 return addLast(group, null, name, handler);
185 }
186
187 @Override
188 public ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
189 return addLast(null, invoker, name, handler);
190 }
191
192 private ChannelPipeline addLast(EventExecutorGroup group, ChannelHandlerInvoker invoker,
193 String name, ChannelHandler handler) {
194 assertGroupAndInvoker(group, invoker);
195
196 final EventExecutor executor;
197 final AbstractChannelHandlerContext newCtx;
198 final boolean inEventLoop;
199 synchronized (this) {
200 checkMultiplicity(handler);
201
202 if (group != null) {
203 invoker = findInvoker(group);
204 }
205
206 newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
207 executor = executorSafe(invoker);
208
209
210
211
212 if (executor == null) {
213 addLast0(newCtx);
214 callHandlerCallbackLater(newCtx, true);
215 return this;
216 }
217 inEventLoop = executor.inEventLoop();
218 if (inEventLoop) {
219 addLast0(newCtx);
220 }
221 }
222 if (inEventLoop) {
223 callHandlerAdded0(newCtx);
224 } else {
225 waitForFuture(executor.submit(new OneTimeTask() {
226 @Override
227 public void run() {
228 synchronized (DefaultChannelPipeline.this) {
229 addLast0(newCtx);
230 }
231 callHandlerAdded0(newCtx);
232 }
233 }));
234 }
235 return this;
236 }
237
238 private void addLast0(AbstractChannelHandlerContext newCtx) {
239 AbstractChannelHandlerContext prev = tail.prev;
240 newCtx.prev = prev;
241 newCtx.next = tail;
242 prev.next = newCtx;
243 tail.prev = newCtx;
244 }
245
246 @Override
247 public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
248 return addBefore(null, null, baseName, name, handler);
249 }
250
251 @Override
252 public ChannelPipeline addBefore(
253 EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
254 return addBefore(group, null, baseName, name, handler);
255 }
256
257 @Override
258 public ChannelPipeline addBefore(
259 ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
260 return addBefore(null, invoker, baseName, name, handler);
261 }
262
263 private ChannelPipeline addBefore(EventExecutorGroup group,
264 ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
265 assertGroupAndInvoker(group, invoker);
266
267 final EventExecutor executor;
268 final AbstractChannelHandlerContext newCtx;
269 final AbstractChannelHandlerContext ctx;
270 final boolean inEventLoop;
271 synchronized (this) {
272 checkMultiplicity(handler);
273 ctx = getContextOrDie(baseName);
274
275 if (group != null) {
276 invoker = findInvoker(group);
277 }
278
279 newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
280 executor = executorSafe(invoker);
281
282
283
284
285 if (executor == null) {
286 addBefore0(ctx, newCtx);
287 callHandlerCallbackLater(newCtx, true);
288 return this;
289 }
290
291 inEventLoop = executor.inEventLoop();
292 if (inEventLoop) {
293 addBefore0(ctx, newCtx);
294 }
295 }
296
297 if (inEventLoop) {
298 callHandlerAdded0(newCtx);
299 } else {
300 waitForFuture(executor.submit(new OneTimeTask() {
301 @Override
302 public void run() {
303 synchronized (DefaultChannelPipeline.this) {
304 addBefore0(ctx, newCtx);
305 }
306 callHandlerAdded0(newCtx);
307 }
308 }));
309 }
310 return this;
311 }
312
313 private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
314 newCtx.prev = ctx.prev;
315 newCtx.next = ctx;
316 ctx.prev.next = newCtx;
317 ctx.prev = newCtx;
318 }
319
320 @Override
321 public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
322 return addAfter(null, null, baseName, name, handler);
323 }
324
325 @Override
326 public ChannelPipeline addAfter(
327 EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
328 return addAfter(group, null, baseName, name, handler);
329 }
330
331 @Override
332 public ChannelPipeline addAfter(
333 ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
334 return addAfter(null, invoker, baseName, name, handler);
335 }
336
337 private ChannelPipeline addAfter(EventExecutorGroup group,
338 ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
339 assertGroupAndInvoker(group, invoker);
340
341 final EventExecutor executor;
342 final AbstractChannelHandlerContext newCtx;
343 final AbstractChannelHandlerContext ctx;
344 final boolean inEventLoop;
345
346 synchronized (this) {
347 checkMultiplicity(handler);
348 ctx = getContextOrDie(baseName);
349
350 if (group != null) {
351 invoker = findInvoker(group);
352 }
353
354 newCtx = new DefaultChannelHandlerContext(this, invoker, filterName(name, handler), handler);
355 executor = executorSafe(invoker);
356
357
358
359
360 if (executor == null) {
361 addAfter0(ctx, newCtx);
362 callHandlerCallbackLater(newCtx, true);
363 return this;
364 }
365 inEventLoop = executor.inEventLoop();
366 if (inEventLoop) {
367 addAfter0(ctx, newCtx);
368 }
369 }
370 if (inEventLoop) {
371 callHandlerAdded0(newCtx);
372 } else {
373 waitForFuture(executor.submit(new OneTimeTask() {
374 @Override
375 public void run() {
376 synchronized (DefaultChannelPipeline.this) {
377 addAfter0(ctx, newCtx);
378 }
379 callHandlerAdded0(newCtx);
380 }
381 }));
382 }
383 return this;
384 }
385
386 private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
387 newCtx.prev = ctx;
388 newCtx.next = ctx.next;
389 ctx.next.prev = newCtx;
390 ctx.next = newCtx;
391 }
392
393 @Override
394 public ChannelPipeline addFirst(ChannelHandler... handlers) {
395 return addFirst((ChannelHandlerInvoker) null, handlers);
396 }
397
398 @Override
399 public ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers) {
400 if (handlers == null) {
401 throw new NullPointerException("handlers");
402 }
403 if (handlers.length == 0 || handlers[0] == null) {
404 return this;
405 }
406
407 int size;
408 for (size = 1; size < handlers.length; size ++) {
409 if (handlers[size] == null) {
410 break;
411 }
412 }
413
414 for (int i = size - 1; i >= 0; i --) {
415 ChannelHandler h = handlers[i];
416 addFirst(group, null, h);
417 }
418
419 return this;
420 }
421
422 @Override
423 public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, ChannelHandler... handlers) {
424 if (handlers == null) {
425 throw new NullPointerException("handlers");
426 }
427 if (handlers.length == 0 || handlers[0] == null) {
428 return this;
429 }
430
431 int size;
432 for (size = 1; size < handlers.length; size ++) {
433 if (handlers[size] == null) {
434 break;
435 }
436 }
437
438 for (int i = size - 1; i >= 0; i --) {
439 ChannelHandler h = handlers[i];
440 addFirst(invoker, null, h);
441 }
442
443 return this;
444 }
445
446 @Override
447 public ChannelPipeline addLast(ChannelHandler... handlers) {
448 return addLast((ChannelHandlerInvoker) null, handlers);
449 }
450
451 @Override
452 public ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers) {
453 if (handlers == null) {
454 throw new NullPointerException("handlers");
455 }
456
457 for (ChannelHandler h: handlers) {
458 if (h == null) {
459 break;
460 }
461 addLast(group, null, h);
462 }
463
464 return this;
465 }
466
467 @Override
468 public ChannelPipeline addLast(ChannelHandlerInvoker invoker, ChannelHandler... handlers) {
469 if (handlers == null) {
470 throw new NullPointerException("handlers");
471 }
472
473 for (ChannelHandler h: handlers) {
474 if (h == null) {
475 break;
476 }
477 addLast(invoker, null, h);
478 }
479
480 return this;
481 }
482
483 private ChannelHandlerInvoker findInvoker(EventExecutorGroup group) {
484 if (group == null) {
485 return null;
486 }
487
488
489 Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers = this.childInvokers;
490 if (childInvokers == null) {
491 childInvokers = this.childInvokers = new IdentityHashMap<EventExecutorGroup, ChannelHandlerInvoker>(4);
492 }
493
494
495
496 ChannelHandlerInvoker invoker = childInvokers.get(group);
497 if (invoker == null) {
498 EventExecutor executor = group.next();
499 if (executor instanceof EventLoop) {
500 invoker = ((EventLoop) executor).asInvoker();
501 } else {
502 invoker = new DefaultChannelHandlerInvoker(executor);
503 }
504 childInvokers.put(group, invoker);
505 }
506
507 return invoker;
508 }
509
510 private String generateName(ChannelHandler handler) {
511 Map<Class<?>, String> cache = nameCaches.get();
512 Class<?> handlerType = handler.getClass();
513 String name = cache.get(handlerType);
514 if (name == null) {
515 name = generateName0(handlerType);
516 cache.put(handlerType, name);
517 }
518
519
520
521 if (context0(name) != null) {
522 String baseName = name.substring(0, name.length() - 1);
523 for (int i = 1;; i ++) {
524 String newName = baseName + i;
525 if (context0(newName) == null) {
526 name = newName;
527 break;
528 }
529 }
530 }
531 return name;
532 }
533
534 private static String generateName0(Class<?> handlerType) {
535 return StringUtil.simpleClassName(handlerType) + "#0";
536 }
537
538 @Override
539 public ChannelPipeline remove(ChannelHandler handler) {
540 remove(getContextOrDie(handler));
541 return this;
542 }
543
544 @Override
545 public ChannelHandler remove(String name) {
546 return remove(getContextOrDie(name)).handler();
547 }
548
549 @SuppressWarnings("unchecked")
550 @Override
551 public <T extends ChannelHandler> T remove(Class<T> handlerType) {
552 return (T) remove(getContextOrDie(handlerType)).handler();
553 }
554
555 private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
556 assert ctx != head && ctx != tail;
557
558 final EventExecutor executor;
559 final boolean inEventLoop;
560 synchronized (this) {
561 executor = executorSafe(ctx.invoker);
562
563
564
565
566 if (executor == null) {
567 remove0(ctx);
568 callHandlerCallbackLater(ctx, false);
569 return ctx;
570 }
571 inEventLoop = executor.inEventLoop();
572 if (inEventLoop) {
573 remove0(ctx);
574 }
575 }
576 if (inEventLoop) {
577 callHandlerRemoved0(ctx);
578 } else {
579 waitForFuture(executor.submit(new OneTimeTask() {
580 @Override
581 public void run() {
582 synchronized (DefaultChannelPipeline.this) {
583 remove0(ctx);
584 }
585 callHandlerRemoved0(ctx);
586 }
587 }));
588 }
589 return ctx;
590 }
591
592 private static void remove0(AbstractChannelHandlerContext ctx) {
593 AbstractChannelHandlerContext prev = ctx.prev;
594 AbstractChannelHandlerContext next = ctx.next;
595 prev.next = next;
596 next.prev = prev;
597 }
598
599 @Override
600 public ChannelHandler removeFirst() {
601 if (head.next == tail) {
602 throw new NoSuchElementException();
603 }
604 return remove(head.next).handler();
605 }
606
607 @Override
608 public ChannelHandler removeLast() {
609 if (head.next == tail) {
610 throw new NoSuchElementException();
611 }
612 return remove(tail.prev).handler();
613 }
614
615 @Override
616 public ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
617 replace(getContextOrDie(oldHandler), newName, newHandler);
618 return this;
619 }
620
621 @Override
622 public ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
623 return replace(getContextOrDie(oldName), newName, newHandler);
624 }
625
626 @Override
627 @SuppressWarnings("unchecked")
628 public <T extends ChannelHandler> T replace(
629 Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
630 return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
631 }
632
633 private ChannelHandler replace(
634 final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
635 assert ctx != head && ctx != tail;
636
637 final AbstractChannelHandlerContext newCtx;
638 final EventExecutor executor;
639 final boolean inEventLoop;
640 synchronized (this) {
641 checkMultiplicity(newHandler);
642
643 if (newName == null) {
644 newName = ctx.name();
645 } else if (!ctx.name().equals(newName)) {
646 newName = filterName(newName, newHandler);
647 }
648
649 newCtx = new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler);
650 executor = executorSafe(ctx.invoker);
651
652
653
654
655
656 if (executor == null) {
657 replace0(ctx, newCtx);
658 callHandlerCallbackLater(newCtx, true);
659 callHandlerCallbackLater(ctx, false);
660 return ctx.handler();
661 }
662 inEventLoop = executor.inEventLoop();
663 if (inEventLoop) {
664 replace0(ctx, newCtx);
665 }
666 }
667 if (inEventLoop) {
668
669
670
671 callHandlerAdded0(newCtx);
672 callHandlerRemoved0(ctx);
673 } else {
674 waitForFuture(executor.submit(new OneTimeTask() {
675 @Override
676 public void run() {
677 synchronized (DefaultChannelPipeline.this) {
678 replace0(ctx, newCtx);
679 }
680
681
682
683 callHandlerAdded0(newCtx);
684 callHandlerRemoved0(ctx);
685 }
686 }));
687 }
688 return ctx.handler();
689 }
690
691 private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
692 AbstractChannelHandlerContext prev = oldCtx.prev;
693 AbstractChannelHandlerContext next = oldCtx.next;
694 newCtx.prev = prev;
695 newCtx.next = next;
696
697
698
699
700
701 prev.next = newCtx;
702 next.prev = newCtx;
703
704
705 oldCtx.prev = newCtx;
706 oldCtx.next = newCtx;
707 }
708
709 private static void checkMultiplicity(ChannelHandler handler) {
710 if (handler instanceof ChannelHandlerAdapter) {
711 ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
712 if (!h.isSharable() && h.added) {
713 throw new ChannelPipelineException(
714 h.getClass().getName() +
715 " is not a @Sharable handler, so can't be added or removed multiple times.");
716 }
717 h.added = true;
718 }
719 }
720
721 private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
722 try {
723 ctx.handler().handlerAdded(ctx);
724 } catch (Throwable t) {
725 boolean removed = false;
726 try {
727 remove0(ctx);
728 try {
729 ctx.handler().handlerRemoved(ctx);
730 } finally {
731 ctx.setRemoved();
732 }
733 removed = true;
734 } catch (Throwable t2) {
735 if (logger.isWarnEnabled()) {
736 logger.warn("Failed to remove a handler: " + ctx.name(), t2);
737 }
738 }
739
740 if (removed) {
741 fireExceptionCaught(new ChannelPipelineException(
742 ctx.handler().getClass().getName() +
743 ".handlerAdded() has thrown an exception; removed.", t));
744 } else {
745 fireExceptionCaught(new ChannelPipelineException(
746 ctx.handler().getClass().getName() +
747 ".handlerAdded() has thrown an exception; also failed to remove.", t));
748 }
749 }
750 }
751
752 private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
753
754 try {
755 try {
756 ctx.handler().handlerRemoved(ctx);
757 } finally {
758 ctx.setRemoved();
759 }
760 } catch (Throwable t) {
761 fireExceptionCaught(new ChannelPipelineException(
762 ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
763 }
764 }
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781 private static void waitForFuture(Future<?> future) {
782 try {
783 future.get();
784 } catch (ExecutionException ex) {
785
786 PlatformDependent.throwException(ex.getCause());
787 } catch (InterruptedException ex) {
788
789 Thread.currentThread().interrupt();
790 }
791 }
792
793 @Override
794 public ChannelHandler first() {
795 ChannelHandlerContext first = firstContext();
796 if (first == null) {
797 return null;
798 }
799 return first.handler();
800 }
801
802 @Override
803 public ChannelHandlerContext firstContext() {
804 AbstractChannelHandlerContext first = head.next;
805 if (first == tail) {
806 return null;
807 }
808 return head.next;
809 }
810
811 @Override
812 public ChannelHandler last() {
813 AbstractChannelHandlerContext last = tail.prev;
814 if (last == head) {
815 return null;
816 }
817 return last.handler();
818 }
819
820 @Override
821 public ChannelHandlerContext lastContext() {
822 AbstractChannelHandlerContext last = tail.prev;
823 if (last == head) {
824 return null;
825 }
826 return last;
827 }
828
829 @Override
830 public ChannelHandler get(String name) {
831 ChannelHandlerContext ctx = context(name);
832 if (ctx == null) {
833 return null;
834 } else {
835 return ctx.handler();
836 }
837 }
838
839 @SuppressWarnings("unchecked")
840 @Override
841 public <T extends ChannelHandler> T get(Class<T> handlerType) {
842 ChannelHandlerContext ctx = context(handlerType);
843 if (ctx == null) {
844 return null;
845 } else {
846 return (T) ctx.handler();
847 }
848 }
849
850 @Override
851 public ChannelHandlerContext context(String name) {
852 if (name == null) {
853 throw new NullPointerException("name");
854 }
855
856 return context0(name);
857 }
858
859 @Override
860 public ChannelHandlerContext context(ChannelHandler handler) {
861 if (handler == null) {
862 throw new NullPointerException("handler");
863 }
864
865 AbstractChannelHandlerContext ctx = head.next;
866 for (;;) {
867
868 if (ctx == null) {
869 return null;
870 }
871
872 if (ctx.handler() == handler) {
873 return ctx;
874 }
875
876 ctx = ctx.next;
877 }
878 }
879
880 @Override
881 public ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
882 if (handlerType == null) {
883 throw new NullPointerException("handlerType");
884 }
885
886 AbstractChannelHandlerContext ctx = head.next;
887 for (;;) {
888 if (ctx == null) {
889 return null;
890 }
891 if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
892 return ctx;
893 }
894 ctx = ctx.next;
895 }
896 }
897
898 @Override
899 public List<String> names() {
900 List<String> list = new ArrayList<String>();
901 AbstractChannelHandlerContext ctx = head.next;
902 for (;;) {
903 if (ctx == null) {
904 return list;
905 }
906 list.add(ctx.name());
907 ctx = ctx.next;
908 }
909 }
910
911 @Override
912 public Map<String, ChannelHandler> toMap() {
913 Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
914 AbstractChannelHandlerContext ctx = head.next;
915 for (;;) {
916 if (ctx == tail) {
917 return map;
918 }
919 map.put(ctx.name(), ctx.handler());
920 ctx = ctx.next;
921 }
922 }
923
924 @Override
925 public Iterator<Map.Entry<String, ChannelHandler>> iterator() {
926 return toMap().entrySet().iterator();
927 }
928
929
930
931
932 @Override
933 public String toString() {
934 StringBuilder buf = new StringBuilder()
935 .append(StringUtil.simpleClassName(this))
936 .append('{');
937 AbstractChannelHandlerContext ctx = head.next;
938 for (;;) {
939 if (ctx == tail) {
940 break;
941 }
942
943 buf.append('(')
944 .append(ctx.name())
945 .append(" = ")
946 .append(ctx.handler().getClass().getName())
947 .append(')');
948
949 ctx = ctx.next;
950 if (ctx == tail) {
951 break;
952 }
953
954 buf.append(", ");
955 }
956 buf.append('}');
957 return buf.toString();
958 }
959
960 @Override
961 public ChannelPipeline fireChannelRegistered() {
962 head.fireChannelRegistered();
963 return this;
964 }
965
966 @Override
967 public ChannelPipeline fireChannelUnregistered() {
968 head.fireChannelUnregistered();
969
970
971 if (!channel.isOpen()) {
972 destroy();
973 }
974 return this;
975 }
976
977
978
979
980
981
982
983
984
985
986
987 private synchronized void destroy() {
988 destroyUp(head.next, false);
989 }
990
991 private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
992 final Thread currentThread = Thread.currentThread();
993 final AbstractChannelHandlerContext tail = this.tail;
994 for (;;) {
995 if (ctx == tail) {
996 destroyDown(currentThread, tail.prev, inEventLoop);
997 break;
998 }
999
1000 final EventExecutor executor = ctx.executor();
1001 if (!inEventLoop && !executor.inEventLoop(currentThread)) {
1002 final AbstractChannelHandlerContext finalCtx = ctx;
1003 executor.execute(new OneTimeTask() {
1004 @Override
1005 public void run() {
1006 destroyUp(finalCtx, true);
1007 }
1008 });
1009 break;
1010 }
1011
1012 ctx = ctx.next;
1013 inEventLoop = false;
1014 }
1015 }
1016
1017 private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
1018
1019 final AbstractChannelHandlerContext head = this.head;
1020 for (;;) {
1021 if (ctx == head) {
1022 break;
1023 }
1024
1025 final EventExecutor executor = ctx.executor();
1026 if (inEventLoop || executor.inEventLoop(currentThread)) {
1027 synchronized (this) {
1028 remove0(ctx);
1029 callHandlerRemoved0(ctx);
1030 }
1031 } else {
1032 final AbstractChannelHandlerContext finalCtx = ctx;
1033 executor.execute(new OneTimeTask() {
1034 @Override
1035 public void run() {
1036 destroyDown(Thread.currentThread(), finalCtx, true);
1037 }
1038 });
1039 break;
1040 }
1041
1042 ctx = ctx.prev;
1043 inEventLoop = false;
1044 }
1045 }
1046
1047 @Override
1048 public ChannelPipeline fireChannelActive() {
1049 head.fireChannelActive();
1050
1051 if (channel.config().isAutoRead()) {
1052 channel.read();
1053 }
1054
1055 return this;
1056 }
1057
1058 @Override
1059 public ChannelPipeline fireChannelInactive() {
1060 head.fireChannelInactive();
1061 return this;
1062 }
1063
1064 @Override
1065 public ChannelPipeline fireExceptionCaught(Throwable cause) {
1066 head.fireExceptionCaught(cause);
1067 return this;
1068 }
1069
1070 @Override
1071 public ChannelPipeline fireUserEventTriggered(Object event) {
1072 head.fireUserEventTriggered(event);
1073 return this;
1074 }
1075
1076 @Override
1077 public ChannelPipeline fireChannelRead(Object msg) {
1078 head.fireChannelRead(msg);
1079 return this;
1080 }
1081
1082 @Override
1083 public ChannelPipeline fireChannelReadComplete() {
1084 head.fireChannelReadComplete();
1085 if (channel.config().isAutoRead()) {
1086 read();
1087 }
1088 return this;
1089 }
1090
1091 @Override
1092 public ChannelPipeline fireChannelWritabilityChanged() {
1093 head.fireChannelWritabilityChanged();
1094 return this;
1095 }
1096
1097 @Override
1098 public ChannelFuture bind(SocketAddress localAddress) {
1099 return tail.bind(localAddress);
1100 }
1101
1102 @Override
1103 public ChannelFuture connect(SocketAddress remoteAddress) {
1104 return tail.connect(remoteAddress);
1105 }
1106
1107 @Override
1108 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
1109 return tail.connect(remoteAddress, localAddress);
1110 }
1111
1112 @Override
1113 public ChannelFuture disconnect() {
1114 return tail.disconnect();
1115 }
1116
1117 @Override
1118 public ChannelFuture close() {
1119 return tail.close();
1120 }
1121
1122 @Override
1123 public ChannelFuture deregister() {
1124 return tail.deregister();
1125 }
1126
1127 @Override
1128 public ChannelPipeline flush() {
1129 tail.flush();
1130 return this;
1131 }
1132
1133 @Override
1134 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
1135 return tail.bind(localAddress, promise);
1136 }
1137
1138 @Override
1139 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
1140 return tail.connect(remoteAddress, promise);
1141 }
1142
1143 @Override
1144 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1145 return tail.connect(remoteAddress, localAddress, promise);
1146 }
1147
1148 @Override
1149 public ChannelFuture disconnect(ChannelPromise promise) {
1150 return tail.disconnect(promise);
1151 }
1152
1153 @Override
1154 public ChannelFuture close(ChannelPromise promise) {
1155 return tail.close(promise);
1156 }
1157
1158 @Override
1159 public ChannelFuture deregister(ChannelPromise promise) {
1160 return tail.deregister(promise);
1161 }
1162
1163 @Override
1164 public ChannelPipeline read() {
1165 tail.read();
1166 return this;
1167 }
1168
1169 @Override
1170 public ChannelFuture write(Object msg) {
1171 return tail.write(msg);
1172 }
1173
1174 @Override
1175 public ChannelFuture write(Object msg, ChannelPromise promise) {
1176 return tail.write(msg, promise);
1177 }
1178
1179 @Override
1180 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
1181 return tail.writeAndFlush(msg, promise);
1182 }
1183
1184 @Override
1185 public ChannelFuture writeAndFlush(Object msg) {
1186 return tail.writeAndFlush(msg);
1187 }
1188
1189 private String filterName(String name, ChannelHandler handler) {
1190 if (name == null) {
1191 return generateName(handler);
1192 }
1193
1194 if (context0(name) == null) {
1195 return name;
1196 }
1197
1198 throw new IllegalArgumentException("Duplicate handler name: " + name);
1199 }
1200
1201 private AbstractChannelHandlerContext context0(String name) {
1202 AbstractChannelHandlerContext context = head.next;
1203 while (context != tail) {
1204 if (context.name().equals(name)) {
1205 return context;
1206 }
1207 context = context.next;
1208 }
1209 return null;
1210 }
1211
1212 private AbstractChannelHandlerContext getContextOrDie(String name) {
1213 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1214 if (ctx == null) {
1215 throw new NoSuchElementException(name);
1216 } else {
1217 return ctx;
1218 }
1219 }
1220
1221 private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1222 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1223 if (ctx == null) {
1224 throw new NoSuchElementException(handler.getClass().getName());
1225 } else {
1226 return ctx;
1227 }
1228 }
1229
1230 private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1231 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1232 if (ctx == null) {
1233 throw new NoSuchElementException(handlerType.getName());
1234 } else {
1235 return ctx;
1236 }
1237 }
1238
1239
1240
1241
1242 void callHandlerAddedForAllHandlers() {
1243
1244 assert channel.eventLoop().inEventLoop();
1245
1246 final PendingHandlerCallback pendingHandlerCallbackHead;
1247 synchronized (this) {
1248 assert !registered;
1249
1250
1251 registered = true;
1252
1253 pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1254
1255 this.pendingHandlerCallbackHead = null;
1256 }
1257
1258
1259
1260
1261 PendingHandlerCallback task = pendingHandlerCallbackHead;
1262 while (task != null) {
1263 task.execute();
1264 task = task.next;
1265 }
1266 }
1267
1268 private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1269 assert !registered;
1270
1271 PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1272 PendingHandlerCallback pending = pendingHandlerCallbackHead;
1273 if (pending == null) {
1274 pendingHandlerCallbackHead = task;
1275 } else {
1276
1277 while (pending.next != null) {
1278 pending = pending.next;
1279 }
1280 pending.next = task;
1281 }
1282 }
1283
1284 private EventExecutor executorSafe(ChannelHandlerInvoker invoker) {
1285 if (invoker == null) {
1286
1287
1288
1289 return channel.isRegistered() || registered ? channel.unsafe().invoker().executor() : null;
1290 }
1291 return invoker.executor();
1292 }
1293
1294 private static void assertGroupAndInvoker(EventExecutorGroup group, ChannelHandlerInvoker invoker) {
1295 assert group == null || invoker == null : "either group or invoker must be null";
1296 }
1297
1298
1299 static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1300
1301 private static final String TAIL_NAME = generateName0(TailContext.class);
1302
1303 TailContext(DefaultChannelPipeline pipeline) {
1304 super(pipeline, null, TAIL_NAME, true, false);
1305 }
1306
1307 @Override
1308 public ChannelHandler handler() {
1309 return this;
1310 }
1311
1312 @Override
1313 public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
1314
1315 @Override
1316 public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
1317
1318 @Override
1319 public void channelActive(ChannelHandlerContext ctx) throws Exception { }
1320
1321 @Override
1322 public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
1323
1324 @Override
1325 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }
1326
1327 @Override
1328 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
1329
1330 @Override
1331 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
1332
1333 @Override
1334 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1335
1336
1337 ReferenceCountUtil.release(evt);
1338 }
1339
1340 @Override
1341 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1342 try {
1343 logger.warn(
1344 "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1345 "It usually means the last handler in the pipeline did not handle the exception.",
1346 cause);
1347 } finally {
1348 ReferenceCountUtil.release(cause);
1349 }
1350 }
1351
1352 @Override
1353 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
1354 try {
1355 logger.debug(
1356 "Discarded inbound message {} that reached at the tail of the pipeline. " +
1357 "Please check your pipeline configuration.", msg);
1358 } finally {
1359 ReferenceCountUtil.release(msg);
1360 }
1361 }
1362
1363 @Override
1364 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
1365 }
1366
1367 static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler {
1368
1369 private static final String HEAD_NAME = generateName0(HeadContext.class);
1370
1371 private final Unsafe unsafe;
1372
1373 HeadContext(DefaultChannelPipeline pipeline) {
1374 super(pipeline, null, HEAD_NAME, false, true);
1375 unsafe = pipeline.channel().unsafe();
1376 }
1377
1378 @Override
1379 public ChannelHandler handler() {
1380 return this;
1381 }
1382
1383 @Override
1384 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
1385
1386 @Override
1387 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
1388
1389 @Override
1390 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1391 ctx.fireExceptionCaught(cause);
1392 }
1393
1394 @Override
1395 public void bind(
1396 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
1397 throws Exception {
1398 unsafe.bind(localAddress, promise);
1399 }
1400
1401 @Override
1402 public void connect(
1403 ChannelHandlerContext ctx,
1404 SocketAddress remoteAddress, SocketAddress localAddress,
1405 ChannelPromise promise) throws Exception {
1406 unsafe.connect(remoteAddress, localAddress, promise);
1407 }
1408
1409 @Override
1410 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1411 unsafe.disconnect(promise);
1412 }
1413
1414 @Override
1415 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1416 unsafe.close(promise);
1417 }
1418
1419 @Override
1420 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1421 unsafe.deregister(promise);
1422 }
1423
1424 @Override
1425 public void read(ChannelHandlerContext ctx) {
1426 unsafe.beginRead();
1427 }
1428
1429 @Override
1430 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
1431 unsafe.write(msg, promise);
1432 }
1433
1434 @Override
1435 public void flush(ChannelHandlerContext ctx) throws Exception {
1436 unsafe.flush();
1437 }
1438 }
1439
1440 private abstract static class PendingHandlerCallback extends OneTimeTask {
1441 final AbstractChannelHandlerContext ctx;
1442 PendingHandlerCallback next;
1443
1444 PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1445 this.ctx = ctx;
1446 }
1447
1448 abstract void execute();
1449 }
1450
1451 private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1452
1453 PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1454 super(ctx);
1455 }
1456
1457 @Override
1458 public void run() {
1459 callHandlerAdded0(ctx);
1460 }
1461
1462 @Override
1463 void execute() {
1464 EventExecutor executor = ctx.executor();
1465 if (executor.inEventLoop()) {
1466 callHandlerAdded0(ctx);
1467 } else {
1468 try {
1469 executor.execute(this);
1470 } catch (RejectedExecutionException e) {
1471 if (logger.isWarnEnabled()) {
1472 logger.warn(
1473 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1474 executor, ctx.name(), e);
1475 }
1476 remove0(ctx);
1477 ctx.setRemoved();
1478 }
1479 }
1480 }
1481 }
1482
1483 private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1484
1485 PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1486 super(ctx);
1487 }
1488
1489 @Override
1490 public void run() {
1491 callHandlerRemoved0(ctx);
1492 }
1493
1494 @Override
1495 void execute() {
1496 EventExecutor executor = ctx.executor();
1497 if (executor.inEventLoop()) {
1498 callHandlerRemoved0(ctx);
1499 } else {
1500 try {
1501 executor.execute(this);
1502 } catch (RejectedExecutionException e) {
1503 if (logger.isWarnEnabled()) {
1504 logger.warn(
1505 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1506 " removing handler {}.", executor, ctx.name(), e);
1507 }
1508
1509 ctx.setRemoved();
1510 }
1511 }
1512 }
1513 }
1514 }