1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.traffic;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.ChannelPromise;
21 import io.netty.util.internal.OneTimeTask;
22
23 import java.util.ArrayDeque;
24 import java.util.concurrent.TimeUnit;
25
26 /**
27 * <p>This implementation of the [email protected] AbstractTrafficShapingHandler} is for channel
28 * traffic shaping, that is to say a per channel limitation of the bandwidth.</p>
29 * <p>Note the index used in [email protected] OutboundBuffer.setUserDefinedWritability(index, boolean)} is <b>1</b>.</p>
30 *
31 * <p>The general use should be as follow:</p>
32 * <ul>
33 * <li><p>Add in your pipeline a new ChannelTrafficShapingHandler.</p>
34 * <p><tt>ChannelTrafficShapingHandler myHandler = new ChannelTrafficShapingHandler();</tt></p>
35 * <p><tt>pipeline.addLast(myHandler);</tt></p>
36 *
37 * <p><b>Note that this handler has a Pipeline Coverage of "one" which means a new handler must be created
38 * for each new channel as the counter cannot be shared among all channels.</b>.</p>
39 *
40 * <p>Other arguments can be passed like write or read limitation (in bytes/s where 0 means no limitation)
41 * or the check interval (in millisecond) that represents the delay between two computations of the
42 * bandwidth and so the call back of the doAccounting method (0 means no accounting at all).</p>
43 *
44 * <p>A value of 0 means no accounting for checkInterval. If you need traffic shaping but no such accounting,
45 * it is recommended to set a positive value, even if it is high since the precision of the
46 * Traffic Shaping depends on the period where the traffic is computed. The highest the interval,
47 * the less precise the traffic shaping will be. It is suggested as higher value something close
48 * to 5 or 10 minutes.</p>
49 *
50 * <p>maxTimeToWait, by default set to 15s, allows to specify an upper bound of time shaping.</p>
51 * </li>
52 * <li>In your handler, you should consider to use the [email protected] channel.isWritable()} and
53 * [email protected] channelWritabilityChanged(ctx)} to handle writability, or through
54 * [email protected] future.addListener(new GenericFutureListener())} on the future returned by
55 * [email protected] ctx.write()}.</li>
56 * <li><p>You shall also consider to have object size in read or write operations relatively adapted to
57 * the bandwidth you required: for instance having 10 MB objects for 10KB/s will lead to burst effect,
58 * while having 100 KB objects for 1 MB/s should be smoothly handle by this TrafficShaping handler.</p></li>
59 * <li><p>Some configuration methods will be taken as best effort, meaning
60 * that all already scheduled traffics will not be
61 * changed, but only applied to new traffics.</p>
62 * <p>So the expected usage of those methods are to be used not too often,
63 * accordingly to the traffic shaping configuration.</p></li>
64 * </ul>
65 */
66 public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
67 private final ArrayDeque<ToSend> messagesQueue = new ArrayDeque<ToSend>();
68 private long queueSize;
69
70 /**
71 * Create a new instance.
72 *
73 * @param writeLimit
74 * 0 or a limit in bytes/s
75 * @param readLimit
76 * 0 or a limit in bytes/s
77 * @param checkInterval
78 * The delay between two computations of performances for
79 * channels or 0 if no stats are to be computed.
80 * @param maxTime
81 * The maximum delay to wait in case of traffic excess.
82 */
83 public ChannelTrafficShapingHandler(long writeLimit, long readLimit,
84 long checkInterval, long maxTime) {
85 super(writeLimit, readLimit, checkInterval, maxTime);
86 }
87
88 /**
89 * Create a new instance using default
90 * max time as delay allowed value of 15000 ms.
91 *
92 * @param writeLimit
93 * 0 or a limit in bytes/s
94 * @param readLimit
95 * 0 or a limit in bytes/s
96 * @param checkInterval
97 * The delay between two computations of performances for
98 * channels or 0 if no stats are to be computed.
99 */
100 public ChannelTrafficShapingHandler(long writeLimit,
101 long readLimit, long checkInterval) {
102 super(writeLimit, readLimit, checkInterval);
103 }
104
105 /**
106 * Create a new instance using default Check Interval value of 1000 ms and
107 * max time as delay allowed value of 15000 ms.
108 *
109 * @param writeLimit
110 * 0 or a limit in bytes/s
111 * @param readLimit
112 * 0 or a limit in bytes/s
113 */
114 public ChannelTrafficShapingHandler(long writeLimit,
115 long readLimit) {
116 super(writeLimit, readLimit);
117 }
118
119 /**
120 * Create a new instance using
121 * default max time as delay allowed value of 15000 ms and no limit.
122 *
123 * @param checkInterval
124 * The delay between two computations of performances for
125 * channels or 0 if no stats are to be computed.
126 */
127 public ChannelTrafficShapingHandler(long checkInterval) {
128 super(checkInterval);
129 }
130
131 @Override
132 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
133 TrafficCounter trafficCounter = new TrafficCounter(this, ctx.executor(), "ChannelTC" +
134 ctx.channel().hashCode(), checkInterval);
135 setTrafficCounter(trafficCounter);
136 trafficCounter.start();
137 super.handlerAdded(ctx);
138 }
139
140 @Override
141 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
142 trafficCounter.stop();
143 // write order control
144 synchronized (this) {
145 if (ctx.channel().isActive()) {
146 for (ToSend toSend : messagesQueue) {
147 long size = calculateSize(toSend.toSend);
148 trafficCounter.bytesRealWriteFlowControl(size);
149 queueSize -= size;
150 ctx.write(toSend.toSend, toSend.promise);
151 }
152 } else {
153 for (ToSend toSend : messagesQueue) {
154 if (toSend.toSend instanceof ByteBuf) {
155 ((ByteBuf) toSend.toSend).release();
156 }
157 }
158 }
159 messagesQueue.clear();
160 }
161 releaseWriteSuspended(ctx);
162 releaseReadSuspended(ctx);
163 super.handlerRemoved(ctx);
164 }
165
166 private static final class ToSend {
167 final long relativeTimeAction;
168 final Object toSend;
169 final ChannelPromise promise;
170
171 private ToSend(final long delay, final Object toSend, final ChannelPromise promise) {
172 relativeTimeAction = delay;
173 this.toSend = toSend;
174 this.promise = promise;
175 }
176 }
177
178 @Override
179 void submitWrite(final ChannelHandlerContext ctx, final Object msg,
180 final long size, final long delay, final long now,
181 final ChannelPromise promise) {
182 final ToSend newToSend;
183 // write order control
184 synchronized (this) {
185 if (delay == 0 && messagesQueue.isEmpty()) {
186 trafficCounter.bytesRealWriteFlowControl(size);
187 ctx.write(msg, promise);
188 return;
189 }
190 newToSend = new ToSend(delay + now, msg, promise);
191 messagesQueue.addLast(newToSend);
192 queueSize += size;
193 checkWriteSuspend(ctx, delay, queueSize);
194 }
195 final long futureNow = newToSend.relativeTimeAction;
196 ctx.executor().schedule(new OneTimeTask() {
197 @Override
198 public void run() {
199 sendAllValid(ctx, futureNow);
200 }
201 }, delay, TimeUnit.MILLISECONDS);
202 }
203
204 private void sendAllValid(final ChannelHandlerContext ctx, final long now) {
205 // write order control
206 synchronized (this) {
207 ToSend newToSend = messagesQueue.pollFirst();
208 for (; newToSend != null; newToSend = messagesQueue.pollFirst()) {
209 if (newToSend.relativeTimeAction <= now) {
210 long size = calculateSize(newToSend.toSend);
211 trafficCounter.bytesRealWriteFlowControl(size);
212 queueSize -= size;
213 ctx.write(newToSend.toSend, newToSend.promise);
214 } else {
215 messagesQueue.addFirst(newToSend);
216 break;
217 }
218 }
219 if (messagesQueue.isEmpty()) {
220 releaseWriteSuspended(ctx);
221 }
222 }
223 ctx.flush();
224 }
225
226 /**
227 * @return current size in bytes of the write buffer.
228 */
229 public long queueSize() {
230 return queueSize;
231 }
232 }