1 /*
2 * Copyright 2016 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 /*
17 * Licensed under the Apache License, Version 2.0 (the "License");
18 * you may not use this file except in compliance with the License.
19 * You may obtain a copy of the License at
20 *
21 * http://www.apache.org/licenses/LICENSE-2.0
22 *
23 * Unless required by applicable law or agreed to in writing, software
24 * distributed under the License is distributed on an "AS IS" BASIS,
25 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26 * See the License for the specific language governing permissions and
27 * limitations under the License.
28 */
29 package io.netty.util.internal;
30
31 import java.util.Queue;
32
33 /**
34 * Forked from <a href="https://github.com/JCTools/JCTools">JCTools</a>.
35 *
36 * This is a tagging interface for the queues in this library which implement a subset of the [email protected] Queue}
37 * interface sufficient for concurrent message passing.<br>
38 * Message passing queues provide happens before semantics to messages passed through, namely that writes made
39 * by the producer before offering the message are visible to the consuming thread after the message has been
40 * polled out of the queue.
41 *
42 * @param <T>
43 * the event/message type
44 */
45 public interface MessagePassingQueue<T> {
46 int UNBOUNDED_CAPACITY = -1;
47
48 interface Supplier<T> {
49 /**
50 * This method will return the next value to be written to the queue. As such the queue
51 * implementations are commited to insert the value once the call is made.
52 * <p>
53 * Users should be aware that underlying queue implementations may upfront claim parts of the queue
54 * for batch operations and this will effect the view on the queue from the supplier method. In
55 * particular size and any offer methods may take the view that the full batch has already happened.
56 *
57 * @return new element, NEVER null
58 */
59 T get();
60 }
61
62 interface Consumer<T> {
63 /**
64 * This method will process an element already removed from the queue. This method is expected to
65 * never throw an exception.
66 * <p>
67 * Users should be aware that underlying queue implementations may upfront claim parts of the queue
68 * for batch operations and this will effect the view on the queue from the accept method. In
69 * particular size and any poll/peek methods may take the view that the full batch has already
70 * happened.
71 *
72 * @param e not null
73 */
74 void accept(T e);
75 }
76
77 interface WaitStrategy {
78 /**
79 * This method can implement static or dynamic backoff. Dynamic backoff will rely on the counter for
80 * estimating how long the caller has been idling. The expected usage is:
81 *
82 * <pre>
83 * <code>
84 * int ic = 0;
85 * while(true) {
86 * if(!isGodotArrived()) {
87 * ic = w.idle(ic);
88 * continue;
89 * }
90 * ic = 0;
91 * // party with Godot until he goes again
92 * }
93 * </code>
94 * </pre>
95 *
96 * @param idleCounter idle calls counter, managed by the idle method until reset
97 * @return new counter value to be used on subsequent idle cycle
98 */
99 int idle(int idleCounter);
100 }
101
102 interface ExitCondition {
103
104 /**
105 * This method should be implemented such that the flag read or determination cannot be hoisted out of
106 * a loop which notmally means a volatile load, but with JDK9 VarHandles may mean getOpaque.
107 *
108 * @return true as long as we should keep running
109 */
110 boolean keepRunning();
111 }
112
113 /**
114 * Called from a producer thread subject to the restrictions appropriate to the implementation and
115 * according to the [email protected] Queue#offer(Object)} interface.
116 *
117 * @param e not null, will throw NPE if it is
118 * @return true if element was inserted into the queue, false iff full
119 */
120 boolean offer(T e);
121
122 /**
123 * Called from the consumer thread subject to the restrictions appropriate to the implementation and
124 * according to the [email protected] Queue#poll()} interface.
125 *
126 * @return a message from the queue if one is available, null iff empty
127 */
128 T poll();
129
130 /**
131 * Called from the consumer thread subject to the restrictions appropriate to the implementation and
132 * according to the [email protected] Queue#peek()} interface.
133 *
134 * @return a message from the queue if one is available, null iff empty
135 */
136 T peek();
137
138 /**
139 * This method's accuracy is subject to concurrent modifications happening as the size is estimated and as
140 * such is a best effort rather than absolute value. For some implementations this method may be O(n)
141 * rather than O(1).
142 *
143 * @return number of messages in the queue, between 0 and [email protected] Integer#MAX_VALUE} but less or equals to
144 * capacity (if bounded).
145 */
146 int size();
147
148 /**
149 * Removes all items from the queue. Called from the consumer thread subject to the restrictions
150 * appropriate to the implementation and according to the [email protected] Queue#clear()} interface.
151 */
152 void clear();
153
154 /**
155 * This method's accuracy is subject to concurrent modifications happening as the observation is carried
156 * out.
157 *
158 * @return true if empty, false otherwise
159 */
160 boolean isEmpty();
161
162 /**
163 * @return the capacity of this queue or UNBOUNDED_CAPACITY if not bounded
164 */
165 int capacity();
166
167 /**
168 * Called from a producer thread subject to the restrictions appropriate to the implementation. As opposed
169 * to [email protected] Queue#offer(Object)} this method may return false without the queue being full.
170 *
171 * @param e not null, will throw NPE if it is
172 * @return true if element was inserted into the queue, false if unable to offer
173 */
174 boolean relaxedOffer(T e);
175
176 /**
177 * Called from the consumer thread subject to the restrictions appropriate to the implementation. As
178 * opposed to [email protected] Queue#poll()} this method may return null without the queue being empty.
179 *
180 * @return a message from the queue if one is available, null if unable to poll
181 */
182 T relaxedPoll();
183
184 /**
185 * Called from the consumer thread subject to the restrictions appropriate to the implementation. As
186 * opposed to [email protected] Queue#peek()} this method may return null without the queue being empty.
187 *
188 * @return a message from the queue if one is available, null if unable to peek
189 */
190 T relaxedPeek();
191
192 /**
193 * Remove all available item from the queue and hand to consume. This should be semantically similar to:
194 * <pre><code>
195 * M m;
196 * while((m = relaxedPoll()) != null){
197 * c.accept(m);
198 * }
199 * </code></pre>
200 * There's no strong commitment to the queue being empty at the end of a drain. Called from a
201 * consumer thread subject to the restrictions appropriate to the implementation.
202 *
203 * @return the number of polled elements
204 */
205 int drain(Consumer<T> c);
206
207 /**
208 * Stuff the queue with elements from the supplier. Semantically similar to:
209 * <pre><code>
210 * while(relaxedOffer(s.get());
211 * </code></pre>
212 * There's no strong commitment to the queue being full at the end of a fill. Called from a
213 * producer thread subject to the restrictions appropriate to the implementation.
214 *
215 * @return the number of offered elements
216 */
217 int fill(Supplier<T> s);
218
219 /**
220 * Remove up to <i>limit</i> elements from the queue and hand to consume. This should be semantically
221 * similar to:
222 *
223 * <pre><code>
224 * M m;
225 * while((m = relaxedPoll()) != null){
226 * c.accept(m);
227 * }
228 * </code></pre>
229 *
230 * There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer
231 * thread subject to the restrictions appropriate to the implementation.
232 *
233 * @return the number of polled elements
234 */
235 int drain(Consumer<T> c, int limit);
236
237 /**
238 * Stuff the queue with up to <i>limit</i> elements from the supplier. Semantically similar to:
239 *
240 * <pre>
241 * <code>
242 * for(int i=0; i < limit && relaxedOffer(s.get(); i++);
243 * </code>
244 * </pre>
245 *
246 * There's no strong commitment to the queue being full at the end of a fill. Called from a producer
247 * thread subject to the restrictions appropriate to the implementation.
248 *
249 * @return the number of offered elements
250 */
251 int fill(Supplier<T> s, int limit);
252
253 /**
254 * Remove elements from the queue and hand to consume forever. Semantically similar to:
255 *
256 * <pre>
257 * <code>
258 * int idleCounter = 0;
259 * while (exit.keepRunning()) {
260 * E e = relaxedPoll();
261 * if(e==null){
262 * idleCounter = wait.idle(idleCounter);
263 * continue;
264 * }
265 * idleCounter = 0;
266 * c.accept(e);
267 * }
268 * </code>
269 * </pre>
270 *
271 * Called from a consumer thread subject to the restrictions appropriate to the implementation.
272 *
273 */
274 void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit);
275
276 /**
277 * Stuff the queue with elements from the supplier forever. Semantically similar to:
278 *
279 * <pre>
280 * <code>
281 * int idleCounter = 0;
282 * while (exit.keepRunning()) {
283 * E e = s.get();
284 * while (!relaxedOffer(e)) {
285 * idleCounter = wait.idle(idleCounter);
286 * continue;
287 * }
288 * idleCounter = 0;
289 * }
290 * </code>
291 * </pre>
292 *
293 * Called from a producer thread subject to the restrictions appropriate to the implementation.
294 *
295 */
296 void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit);
297 }