1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.oio;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelPipeline;
21 import io.netty.channel.RecvByteBufAllocator;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26
27
28
29
30 public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
31
32 private final List<Object> readBuf = new ArrayList<Object>();
33
34 protected AbstractOioMessageChannel(Channel parent) {
35 super(parent);
36 }
37
38 @Override
39 protected void doRead() {
40 if (!readPending) {
41
42
43 return;
44 }
45
46
47 readPending = false;
48
49 final ChannelConfig config = config();
50 final ChannelPipeline pipeline = pipeline();
51 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
52 allocHandle.reset(config);
53
54 boolean closed = false;
55 Throwable exception = null;
56 try {
57 do {
58
59 int localRead = doReadMessages(readBuf);
60 if (localRead == 0) {
61 break;
62 }
63 if (localRead < 0) {
64 closed = true;
65 break;
66 }
67
68 allocHandle.incMessagesRead(localRead);
69 } while (allocHandle.continueReading());
70 } catch (Throwable t) {
71 exception = t;
72 }
73
74 boolean readData = false;
75 int size = readBuf.size();
76 if (size > 0) {
77 readData = true;
78 for (int i = 0; i < size; i++) {
79 readPending = false;
80 pipeline.fireChannelRead(readBuf.get(i));
81 }
82 readBuf.clear();
83 allocHandle.readComplete();
84 pipeline.fireChannelReadComplete();
85 }
86
87 if (exception != null) {
88 if (exception instanceof IOException) {
89 closed = true;
90 }
91
92 pipeline.fireExceptionCaught(exception);
93 }
94
95 if (closed) {
96 if (isOpen()) {
97 unsafe().close(unsafe().voidPromise());
98 }
99 } else if (readPending || config.isAutoRead() || !readData && isActive()) {
100
101
102 read();
103 }
104 }
105
106
107
108
109 protected abstract int doReadMessages(List<Object> msgs) throws Exception;
110 }