1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelOutboundBuffer;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.ServerChannel;
23
24 import java.io.IOException;
25 import java.net.PortUnreachableException;
26 import java.nio.channels.SelectableChannel;
27 import java.nio.channels.SelectionKey;
28 import java.util.ArrayList;
29 import java.util.List;
30
31
32
33
34 public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
35
36
37
38
39 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
40 super(parent, ch, readInterestOp);
41 }
42
43 @Override
44 protected AbstractNioUnsafe newUnsafe() {
45 return new NioMessageUnsafe();
46 }
47
48 private final class NioMessageUnsafe extends AbstractNioUnsafe {
49
50 private final List<Object> readBuf = new ArrayList<Object>();
51
52 @Override
53 public void read() {
54 assert eventLoop().inEventLoop();
55 final ChannelConfig config = config();
56 if (!config.isAutoRead() && !isReadPending()) {
57
58 removeReadOp();
59 return;
60 }
61
62 final int maxMessagesPerRead = config.getMaxMessagesPerRead();
63 final ChannelPipeline pipeline = pipeline();
64 boolean closed = false;
65 Throwable exception = null;
66 try {
67 try {
68 for (;;) {
69 int localRead = doReadMessages(readBuf);
70 if (localRead == 0) {
71 break;
72 }
73 if (localRead < 0) {
74 closed = true;
75 break;
76 }
77
78
79 if (!config.isAutoRead()) {
80 break;
81 }
82
83 if (readBuf.size() >= maxMessagesPerRead) {
84 break;
85 }
86 }
87 } catch (Throwable t) {
88 exception = t;
89 }
90 setReadPending(false);
91 int size = readBuf.size();
92 for (int i = 0; i < size; i ++) {
93 pipeline.fireChannelRead(readBuf.get(i));
94 }
95
96 readBuf.clear();
97 pipeline.fireChannelReadComplete();
98
99 if (exception != null) {
100 if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
101
102
103 closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
104 }
105
106 pipeline.fireExceptionCaught(exception);
107 }
108
109 if (closed) {
110 if (isOpen()) {
111 close(voidPromise());
112 }
113 }
114 } finally {
115
116
117
118
119
120
121 if (!config.isAutoRead() && !isReadPending()) {
122 removeReadOp();
123 }
124 }
125 }
126 }
127
128 @Override
129 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
130 final SelectionKey key = selectionKey();
131 final int interestOps = key.interestOps();
132
133 for (;;) {
134 Object msg = in.current();
135 if (msg == null) {
136
137 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
138 key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
139 }
140 break;
141 }
142 try {
143 boolean done = false;
144 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
145 if (doWriteMessage(msg, in)) {
146 done = true;
147 break;
148 }
149 }
150
151 if (done) {
152 in.remove();
153 } else {
154
155 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
156 key.interestOps(interestOps | SelectionKey.OP_WRITE);
157 }
158 break;
159 }
160 } catch (IOException e) {
161 if (continueOnWriteError()) {
162 in.remove(e);
163 } else {
164 throw e;
165 }
166 }
167 }
168 }
169
170
171
172
173 protected boolean continueOnWriteError() {
174 return false;
175 }
176
177
178
179
180 protected abstract int doReadMessages(List<Object> buf) throws Exception;
181
182
183
184
185
186
187 protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
188 }