View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.oio;
17  
18  import io.netty.channel.AbstractChannel;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelPromise;
21  import io.netty.channel.EventLoop;
22  import io.netty.channel.ThreadPerChannelEventLoop;
23  import io.netty.util.internal.OneTimeTask;
24  
25  import java.net.SocketAddress;
26  
27  /**
28   * Abstract base class for [email protected] Channel} implementations that use Old-Blocking-IO
29   */
30  public abstract class AbstractOioChannel extends AbstractChannel {
31  
32      protected static final int SO_TIMEOUT = 1000;
33  
34      boolean readPending;
35      private final Runnable readTask = new Runnable() {
36          @Override
37          public void run() {
38              doRead();
39          }
40      };
41      private final Runnable clearReadPendingRunnable = new Runnable() {
42          @Override
43          public void run() {
44              readPending = false;
45          }
46      };
47  
48      /**
49       * @see AbstractChannel#AbstractChannel(Channel)
50       */
51      protected AbstractOioChannel(Channel parent) {
52          super(parent);
53      }
54  
55      @Override
56      protected AbstractUnsafe newUnsafe() {
57          return new DefaultOioUnsafe();
58      }
59  
60      private final class DefaultOioUnsafe extends AbstractUnsafe {
61          @Override
62          public void connect(
63                  final SocketAddress remoteAddress,
64                  final SocketAddress localAddress, final ChannelPromise promise) {
65              if (!promise.setUncancellable() || !ensureOpen(promise)) {
66                  return;
67              }
68  
69              try {
70                  boolean wasActive = isActive();
71                  doConnect(remoteAddress, localAddress);
72                  safeSetSuccess(promise);
73                  if (!wasActive && isActive()) {
74                      pipeline().fireChannelActive();
75                  }
76              } catch (Throwable t) {
77                  safeSetFailure(promise, annotateConnectException(t, remoteAddress));
78                  closeIfClosed();
79              }
80          }
81      }
82  
83      @Override
84      protected boolean isCompatible(EventLoop loop) {
85          return loop instanceof ThreadPerChannelEventLoop;
86      }
87  
88      /**
89       * Connect to the remote peer using the given localAddress if one is specified or [email protected] null} otherwise.
90       */
91      protected abstract void doConnect(
92              SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
93  
94      @Override
95      protected void doBeginRead() throws Exception {
96          if (readPending) {
97              return;
98          }
99  
100         readPending = true;
101         eventLoop().execute(readTask);
102     }
103 
104     protected abstract void doRead();
105 
106     /**
107      * @deprecated No longer supported.
108      * No longer supported.
109      */
110     @Deprecated
111     protected boolean isReadPending() {
112         return readPending;
113     }
114 
115     /**
116      * @deprecated Use [email protected] #clearReadPending()} if appropriate instead.
117      * No longer supported.
118      */
119     @Deprecated
120     protected void setReadPending(final boolean readPending) {
121         if (isRegistered()) {
122             EventLoop eventLoop = eventLoop();
123             if (eventLoop.inEventLoop()) {
124                 this.readPending = readPending;
125             } else {
126                 eventLoop.execute(new OneTimeTask() {
127                     @Override
128                     public void run() {
129                         AbstractOioChannel.this.readPending = readPending;
130                     }
131                 });
132             }
133         } else {
134             this.readPending = readPending;
135         }
136     }
137 
138     /**
139      * Set read pending to [email protected] false}.
140      */
141     protected final void clearReadPending() {
142         if (isRegistered()) {
143             EventLoop eventLoop = eventLoop();
144             if (eventLoop.inEventLoop()) {
145                 readPending = false;
146             } else {
147                 eventLoop.execute(clearReadPendingRunnable);
148             }
149         } else {
150             // Best effort if we are not registered yet clear readPending. This happens during channel initialization.
151             readPending = false;
152         }
153     }
154 }