View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.oio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelPipeline;
21  import io.netty.channel.RecvByteBufAllocator;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  
27  /**
28   * Abstract base class for OIO which reads and writes objects from/to a Socket
29   */
30  public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
31  
32      private final List<Object> readBuf = new ArrayList<Object>();
33  
34      protected AbstractOioMessageChannel(Channel parent) {
35          super(parent);
36      }
37  
38      @Override
39      protected void doRead() {
40          if (!readPending) {
41              // We have to check readPending here because the Runnable to read could have been scheduled and later
42              // during the same read loop readPending was set to false.
43              return;
44          }
45          // In OIO we should set readPending to false even if the read was not successful so we can schedule
46          // another read on the event loop if no reads are done.
47          readPending = false;
48  
49          final ChannelConfig config = config();
50          final ChannelPipeline pipeline = pipeline();
51          final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
52          allocHandle.reset(config);
53  
54          boolean closed = false;
55          Throwable exception = null;
56          try {
57              do {
58                  // Perform a read.
59                  int localRead = doReadMessages(readBuf);
60                  if (localRead == 0) {
61                      break;
62                  }
63                  if (localRead < 0) {
64                      closed = true;
65                      break;
66                  }
67  
68                  allocHandle.incMessagesRead(localRead);
69              } while (allocHandle.continueReading());
70          } catch (Throwable t) {
71              exception = t;
72          }
73  
74          boolean readData = false;
75          int size = readBuf.size();
76          if (size > 0) {
77              readData = true;
78              for (int i = 0; i < size; i++) {
79                  readPending = false;
80                  pipeline.fireChannelRead(readBuf.get(i));
81              }
82              readBuf.clear();
83              allocHandle.readComplete();
84              pipeline.fireChannelReadComplete();
85          }
86  
87          if (exception != null) {
88              if (exception instanceof IOException) {
89                  closed = true;
90              }
91  
92              pipeline.fireExceptionCaught(exception);
93          }
94  
95          if (closed) {
96              if (isOpen()) {
97                  unsafe().close(unsafe().voidPromise());
98              }
99          } else if (readPending || config.isAutoRead() || !readData && isActive()) {
100             // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
101             // should execute read() again because no data may have been read.
102             read();
103         }
104     }
105 
106     /**
107      * Read messages into the given array and return the amount which was read.
108      */
109     protected abstract int doReadMessages(List<Object> msgs) throws Exception;
110 }