1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.channel.oio;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelPipeline;
21 import io.netty.channel.RecvByteBufAllocator;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26
27 /**
28 * Abstract base class for OIO which reads and writes objects from/to a Socket
29 */
30 public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
31
32 private final List<Object> readBuf = new ArrayList<Object>();
33
34 protected AbstractOioMessageChannel(Channel parent) {
35 super(parent);
36 }
37
38 @Override
39 protected void doRead() {
40 if (!readPending) {
41 // We have to check readPending here because the Runnable to read could have been scheduled and later
42 // during the same read loop readPending was set to false.
43 return;
44 }
45 // In OIO we should set readPending to false even if the read was not successful so we can schedule
46 // another read on the event loop if no reads are done.
47 readPending = false;
48
49 final ChannelConfig config = config();
50 final ChannelPipeline pipeline = pipeline();
51 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
52 allocHandle.reset(config);
53
54 boolean closed = false;
55 Throwable exception = null;
56 try {
57 do {
58 // Perform a read.
59 int localRead = doReadMessages(readBuf);
60 if (localRead == 0) {
61 break;
62 }
63 if (localRead < 0) {
64 closed = true;
65 break;
66 }
67
68 allocHandle.incMessagesRead(localRead);
69 } while (allocHandle.continueReading());
70 } catch (Throwable t) {
71 exception = t;
72 }
73
74 boolean readData = false;
75 int size = readBuf.size();
76 if (size > 0) {
77 readData = true;
78 for (int i = 0; i < size; i++) {
79 readPending = false;
80 pipeline.fireChannelRead(readBuf.get(i));
81 }
82 readBuf.clear();
83 allocHandle.readComplete();
84 pipeline.fireChannelReadComplete();
85 }
86
87 if (exception != null) {
88 if (exception instanceof IOException) {
89 closed = true;
90 }
91
92 pipeline.fireExceptionCaught(exception);
93 }
94
95 if (closed) {
96 if (isOpen()) {
97 unsafe().close(unsafe().voidPromise());
98 }
99 } else if (readPending || config.isAutoRead() || !readData && isActive()) {
100 // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
101 // should execute read() again because no data may have been read.
102 read();
103 }
104 }
105
106 /**
107 * Read messages into the given array and return the amount which was read.
108 */
109 protected abstract int doReadMessages(List<Object> msgs) throws Exception;
110 }