View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.traffic;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.channel.ChannelPromise;
21  import io.netty.util.internal.OneTimeTask;
22  
23  import java.util.ArrayDeque;
24  import java.util.concurrent.TimeUnit;
25  
26  /**
27   * <p>This implementation of the [email protected] AbstractTrafficShapingHandler} is for channel
28   * traffic shaping, that is to say a per channel limitation of the bandwidth.</p>
29   * <p>Note the index used in [email protected] OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>1</b>.</p>
30   *
31   * <p>The general use should be as follow:</p>
32   * <ul>
33   * <li><p>Add in your pipeline a new ChannelTrafficShapingHandler.</p>
34   * <p><tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler();</tt></p>
35   * <p><tt>pipeline.addLast(myHandler);</tt></p>
36   *
37   * <p><b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
38   * for each new channel as the counter cannot be shared among all channels.</b>.</p>
39   *
40   * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
41   * or the check interval (in millisecond) that represents the delay between two computations of the
42   * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
43   *
44   * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
45   * it is recommended to set a positive value, even if it is high since the precision of the
46   * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
47   * the less precise the traffic shaping will be. It is suggested as higher value something close
48   * to 5 or 10 minutes.</p>
49   *
50   * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
51   * </li>
52   * <li>In your handler, you should consider to use the [email protected] channel.isWritable()} and
53   * [email protected] channelWritabilityChanged(ctx)} to handle writability, or through
54   * [email protected] future.addListener(new GenericFutureListener())} on the future returned by
55   * [email protected] ctx.write()}.</li>
56   * <li><p>You shall also consider to have object size in read or write operations relatively adapted to
57   * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
58   * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
59   * <li><p>Some configuration methods will be taken as best effort, meaning
60   * that all already scheduled traffics will not be
61   * changed, but only applied to new traffics.</p>
62   * <p>So the expected usage of those methods are to be used not too often,
63   * accordingly to the traffic shaping configuration.</p></li>
64   * </ul>
65   */
66  public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
67      private final ArrayDeque<ToSend> messagesQueue = new ArrayDeque<ToSend>();
68      private long queueSize;
69  
70      /**
71       * Create a new instance.
72       *
73       * @param writeLimit
74       *            0 or a limit in bytes/s
75       * @param readLimit
76       *            0 or a limit in bytes/s
77       * @param checkInterval
78       *            The delay between two computations of performances for
79       *            channels or 0 if no stats are to be computed.
80       * @param maxTime
81       *            The maximum delay to wait in case of traffic excess.
82       */
83      public ChannelTrafficShapingHandler(long writeLimit, long readLimit,
84              long checkInterval, long maxTime) {
85          super(writeLimit, readLimit, checkInterval, maxTime);
86      }
87  
88      /**
89       * Create a new instance using default
90       * max time as delay allowed value of 15000 ms.
91       *
92       * @param writeLimit
93       *          0 or a limit in bytes/s
94       * @param readLimit
95       *          0 or a limit in bytes/s
96       * @param checkInterval
97       *          The delay between two computations of performances for
98       *            channels or 0 if no stats are to be computed.
99       */
100     public ChannelTrafficShapingHandler(long writeLimit,
101             long readLimit, long checkInterval) {
102         super(writeLimit, readLimit, checkInterval);
103     }
104 
105     /**
106      * Create a new instance using default Check Interval value of 1000 ms and
107      * max time as delay allowed value of 15000 ms.
108      *
109      * @param writeLimit
110      *          0 or a limit in bytes/s
111      * @param readLimit
112      *          0 or a limit in bytes/s
113      */
114     public ChannelTrafficShapingHandler(long writeLimit,
115             long readLimit) {
116         super(writeLimit, readLimit);
117     }
118 
119     /**
120      * Create a new instance using
121      * default max time as delay allowed value of 15000 ms and no limit.
122      *
123      * @param checkInterval
124      *          The delay between two computations of performances for
125      *            channels or 0 if no stats are to be computed.
126      */
127     public ChannelTrafficShapingHandler(long checkInterval) {
128         super(checkInterval);
129     }
130 
131     @Override
132     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
133         TrafficCounter trafficCounter = new TrafficCounter(this, ctx.executor(), "ChannelTC" +
134                 ctx.channel().hashCode(), checkInterval);
135         setTrafficCounter(trafficCounter);
136         trafficCounter.start();
137         super.handlerAdded(ctx);
138     }
139 
140     @Override
141     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
142         trafficCounter.stop();
143         // write order control
144         synchronized (this) {
145             if (ctx.channel().isActive()) {
146                 for (ToSend toSend : messagesQueue) {
147                     long size = calculateSize(toSend.toSend);
148                     trafficCounter.bytesRealWriteFlowControl(size);
149                     queueSize -= size;
150                     ctx.write(toSend.toSend, toSend.promise);
151                 }
152             } else {
153                 for (ToSend toSend : messagesQueue) {
154                     if (toSend.toSend instanceof ByteBuf) {
155                         ((ByteBuf) toSend.toSend).release();
156                     }
157                 }
158             }
159             messagesQueue.clear();
160         }
161         releaseWriteSuspended(ctx);
162         releaseReadSuspended(ctx);
163         super.handlerRemoved(ctx);
164     }
165 
166     private static final class ToSend {
167         final long relativeTimeAction;
168         final Object toSend;
169         final ChannelPromise promise;
170 
171         private ToSend(final long delay, final Object toSend, final ChannelPromise promise) {
172             relativeTimeAction = delay;
173             this.toSend = toSend;
174             this.promise = promise;
175         }
176     }
177 
178     @Override
179     void submitWrite(final ChannelHandlerContext ctx, final Object msg,
180             final long size, final long delay, final long now,
181             final ChannelPromise promise) {
182         final ToSend newToSend;
183         // write order control
184         synchronized (this) {
185             if (delay == 0 && messagesQueue.isEmpty()) {
186                 trafficCounter.bytesRealWriteFlowControl(size);
187                 ctx.write(msg, promise);
188                 return;
189             }
190             newToSend = new ToSend(delay + now, msg, promise);
191             messagesQueue.addLast(newToSend);
192             queueSize += size;
193             checkWriteSuspend(ctx, delay, queueSize);
194         }
195         final long futureNow = newToSend.relativeTimeAction;
196         ctx.executor().schedule(new OneTimeTask() {
197             @Override
198             public void run() {
199                 sendAllValid(ctx, futureNow);
200             }
201         }, delay, TimeUnit.MILLISECONDS);
202     }
203 
204     private void sendAllValid(final ChannelHandlerContext ctx, final long now) {
205         // write order control
206         synchronized (this) {
207             ToSend newToSend = messagesQueue.pollFirst();
208             for (; newToSend != null; newToSend = messagesQueue.pollFirst()) {
209                 if (newToSend.relativeTimeAction <= now) {
210                     long size = calculateSize(newToSend.toSend);
211                     trafficCounter.bytesRealWriteFlowControl(size);
212                     queueSize -= size;
213                     ctx.write(newToSend.toSend, newToSend.promise);
214                 } else {
215                     messagesQueue.addFirst(newToSend);
216                     break;
217                 }
218             }
219             if (messagesQueue.isEmpty()) {
220                 releaseWriteSuspended(ctx);
221             }
222         }
223         ctx.flush();
224     }
225 
226     /**
227     * @return current size in bytes of the write buffer.
228     */
229    public long queueSize() {
230        return queueSize;
231    }
232 }