1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.channel.nio;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelOutboundBuffer;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.ServerChannel;
23
24 import java.io.IOException;
25 import java.net.PortUnreachableException;
26 import java.nio.channels.SelectableChannel;
27 import java.nio.channels.SelectionKey;
28 import java.util.ArrayList;
29 import java.util.List;
30
31 /**
32 * [email protected] AbstractNioChannel} base class for [email protected] Channel}s that operate on messages.
33 */
34 public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
35
36 /**
37 * @see [email protected] AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)}
38 */
39 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
40 super(parent, ch, readInterestOp);
41 }
42
43 @Override
44 protected AbstractNioUnsafe newUnsafe() {
45 return new NioMessageUnsafe();
46 }
47
48 private final class NioMessageUnsafe extends AbstractNioUnsafe {
49
50 private final List<Object> readBuf = new ArrayList<Object>();
51
52 @Override
53 public void read() {
54 assert eventLoop().inEventLoop();
55 final ChannelConfig config = config();
56 if (!config.isAutoRead() && !isReadPending()) {
57 // ChannelConfig.setAutoRead(false) was called in the meantime
58 removeReadOp();
59 return;
60 }
61
62 final int maxMessagesPerRead = config.getMaxMessagesPerRead();
63 final ChannelPipeline pipeline = pipeline();
64 boolean closed = false;
65 Throwable exception = null;
66 try {
67 try {
68 for (;;) {
69 int localRead = doReadMessages(readBuf);
70 if (localRead == 0) {
71 break;
72 }
73 if (localRead < 0) {
74 closed = true;
75 break;
76 }
77
78 // stop reading and remove op
79 if (!config.isAutoRead()) {
80 break;
81 }
82
83 if (readBuf.size() >= maxMessagesPerRead) {
84 break;
85 }
86 }
87 } catch (Throwable t) {
88 exception = t;
89 }
90 setReadPending(false);
91 int size = readBuf.size();
92 for (int i = 0; i < size; i ++) {
93 pipeline.fireChannelRead(readBuf.get(i));
94 }
95
96 readBuf.clear();
97 pipeline.fireChannelReadComplete();
98
99 if (exception != null) {
100 if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
101 // ServerChannel should not be closed even on IOException because it can often continue
102 // accepting incoming connections. (e.g. too many open files)
103 closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
104 }
105
106 pipeline.fireExceptionCaught(exception);
107 }
108
109 if (closed) {
110 if (isOpen()) {
111 close(voidPromise());
112 }
113 }
114 } finally {
115 // Check if there is a readPending which was not processed yet.
116 // This could be for two reasons:
117 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
118 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
119 //
120 // See https://github.com/netty/netty/issues/2254
121 if (!config.isAutoRead() && !isReadPending()) {
122 removeReadOp();
123 }
124 }
125 }
126 }
127
128 @Override
129 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
130 final SelectionKey key = selectionKey();
131 final int interestOps = key.interestOps();
132
133 for (;;) {
134 Object msg = in.current();
135 if (msg == null) {
136 // Wrote all messages.
137 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
138 key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
139 }
140 break;
141 }
142 try {
143 boolean done = false;
144 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
145 if (doWriteMessage(msg, in)) {
146 done = true;
147 break;
148 }
149 }
150
151 if (done) {
152 in.remove();
153 } else {
154 // Did not write all messages.
155 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
156 key.interestOps(interestOps | SelectionKey.OP_WRITE);
157 }
158 break;
159 }
160 } catch (IOException e) {
161 if (continueOnWriteError()) {
162 in.remove(e);
163 } else {
164 throw e;
165 }
166 }
167 }
168 }
169
170 /**
171 * Returns [email protected] true} if we should continue the write loop on a write error.
172 */
173 protected boolean continueOnWriteError() {
174 return false;
175 }
176
177 /**
178 * Read messages into the given array and return the amount which was read.
179 */
180 protected abstract int doReadMessages(List<Object> buf) throws Exception;
181
182 /**
183 * Write a message to the underlying [email protected] java.nio.channels.Channel}.
184 *
185 * @return [email protected] true} if and only if the message has been written
186 */
187 protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
188 }