View Javadoc

1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  /*
17   * Licensed under the Apache License, Version 2.0 (the "License");
18   * you may not use this file except in compliance with the License.
19   * You may obtain a copy of the License at
20   *
21   * http://www.apache.org/licenses/LICENSE-2.0
22   *
23   * Unless required by applicable law or agreed to in writing, software
24   * distributed under the License is distributed on an "AS IS" BASIS,
25   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26   * See the License for the specific language governing permissions and
27   * limitations under the License.
28   */
29  package io.netty.util.internal;
30  
31  import java.util.Queue;
32  
33  /**
34   * Forked from <a href="https://github.com/JCTools/JCTools">JCTools</a>.
35   *
36   * This is a tagging interface for the queues in this library which implement a subset of the [email protected] Queue}
37   * interface sufficient for concurrent message passing.<br>
38   * Message passing queues provide happens before semantics to messages passed through, namely that writes made
39   * by the producer before offering the message are visible to the consuming thread after the message has been
40   * polled out of the queue.
41   *
42   * @param <T>
43   *            the event/message type
44   */
45  public interface MessagePassingQueue<T> {
46      int UNBOUNDED_CAPACITY = -1;
47  
48      interface Supplier<T> {
49          /**
50           * This method will return the next value to be written to the queue. As such the queue
51           * implementations are commited to insert the value once the call is made.
52           * <p>
53           * Users should be aware that underlying queue implementations may upfront claim parts of the queue
54           * for batch operations and this will effect the view on the queue from the supplier method. In
55           * particular size and any offer methods may take the view that the full batch has already happened.
56           *
57           * @return new element, NEVER null
58           */
59          T get();
60      }
61  
62      interface Consumer<T> {
63          /**
64           * This method will process an element already removed from the queue. This method is expected to
65           * never throw an exception.
66           * <p>
67           * Users should be aware that underlying queue implementations may upfront claim parts of the queue
68           * for batch operations and this will effect the view on the queue from the accept method. In
69           * particular size and any poll/peek methods may take the view that the full batch has already
70           * happened.
71           *
72           * @param e not null
73           */
74          void accept(T e);
75      }
76  
77      interface WaitStrategy {
78          /**
79           * This method can implement static or dynamic backoff. Dynamic backoff will rely on the counter for
80           * estimating how long the caller has been idling. The expected usage is:
81           *
82           * <pre>
83           * <code>
84           * int ic = 0;
85           * while(true) {
86           *   if(!isGodotArrived()) {
87           *     ic = w.idle(ic);
88           *     continue;
89           *   }
90           *   ic = 0;
91           *   // party with Godot until he goes again
92           * }
93           * </code>
94           * </pre>
95           *
96           * @param idleCounter idle calls counter, managed by the idle method until reset
97           * @return new counter value to be used on subsequent idle cycle
98           */
99          int idle(int idleCounter);
100     }
101 
102     interface ExitCondition {
103 
104         /**
105          * This method should be implemented such that the flag read or determination cannot be hoisted out of
106          * a loop which notmally means a volatile load, but with JDK9 VarHandles may mean getOpaque.
107          *
108          * @return true as long as we should keep running
109          */
110         boolean keepRunning();
111     }
112 
113     /**
114      * Called from a producer thread subject to the restrictions appropriate to the implementation and
115      * according to the [email protected] Queue#offer(Object)} interface.
116      *
117      * @param e not null, will throw NPE if it is
118      * @return true if element was inserted into the queue, false iff full
119      */
120     boolean offer(T e);
121 
122     /**
123      * Called from the consumer thread subject to the restrictions appropriate to the implementation and
124      * according to the [email protected] Queue#poll()} interface.
125      *
126      * @return a message from the queue if one is available, null iff empty
127      */
128     T poll();
129 
130     /**
131      * Called from the consumer thread subject to the restrictions appropriate to the implementation and
132      * according to the [email protected] Queue#peek()} interface.
133      *
134      * @return a message from the queue if one is available, null iff empty
135      */
136     T peek();
137 
138     /**
139      * This method's accuracy is subject to concurrent modifications happening as the size is estimated and as
140      * such is a best effort rather than absolute value. For some implementations this method may be O(n)
141      * rather than O(1).
142      *
143      * @return number of messages in the queue, between 0 and [email protected] Integer#MAX_VALUE} but less or equals to
144      *         capacity (if bounded).
145      */
146     int size();
147 
148     /**
149      * Removes all items from the queue. Called from the consumer thread subject to the restrictions
150      * appropriate to the implementation and according to the [email protected] Queue#clear()} interface.
151      */
152     void clear();
153 
154     /**
155      * This method's accuracy is subject to concurrent modifications happening as the observation is carried
156      * out.
157      *
158      * @return true if empty, false otherwise
159      */
160     boolean isEmpty();
161 
162     /**
163      * @return the capacity of this queue or UNBOUNDED_CAPACITY if not bounded
164      */
165     int capacity();
166 
167     /**
168      * Called from a producer thread subject to the restrictions appropriate to the implementation. As opposed
169      * to [email protected] Queue#offer(Object)} this method may return false without the queue being full.
170      *
171      * @param e not null, will throw NPE if it is
172      * @return true if element was inserted into the queue, false if unable to offer
173      */
174     boolean relaxedOffer(T e);
175 
176     /**
177      * Called from the consumer thread subject to the restrictions appropriate to the implementation. As
178      * opposed to [email protected] Queue#poll()} this method may return null without the queue being empty.
179      *
180      * @return a message from the queue if one is available, null if unable to poll
181      */
182     T relaxedPoll();
183 
184     /**
185      * Called from the consumer thread subject to the restrictions appropriate to the implementation. As
186      * opposed to [email protected] Queue#peek()} this method may return null without the queue being empty.
187      *
188      * @return a message from the queue if one is available, null if unable to peek
189      */
190     T relaxedPeek();
191 
192     /**
193      * Remove all available item from the queue and hand to consume. This should be semantically similar to:
194      * <pre><code>
195      * M m;
196      * while((m = relaxedPoll()) != null){
197      *  c.accept(m);
198      * }
199      * </code></pre>
200      * There's no strong commitment to the queue being empty at the end of a drain. Called from a
201      * consumer thread subject to the restrictions appropriate to the implementation.
202      *
203      * @return the number of polled elements
204      */
205     int drain(Consumer<T> c);
206 
207     /**
208      * Stuff the queue with elements from the supplier. Semantically similar to:
209      * <pre><code>
210      * while(relaxedOffer(s.get());
211      * </code></pre>
212      * There's no strong commitment to the queue being full at the end of a fill. Called from a
213      * producer thread subject to the restrictions appropriate to the implementation.
214      *
215      * @return the number of offered elements
216      */
217     int fill(Supplier<T> s);
218 
219     /**
220      * Remove up to <i>limit</i> elements from the queue and hand to consume. This should be semantically
221      * similar to:
222      *
223      * <pre><code>
224      *   M m;
225      *   while((m = relaxedPoll()) != null){
226      *     c.accept(m);
227      *   }
228      * </code></pre>
229      *
230      * There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer
231      * thread subject to the restrictions appropriate to the implementation.
232      *
233      * @return the number of polled elements
234      */
235     int drain(Consumer<T> c, int limit);
236 
237     /**
238      * Stuff the queue with up to <i>limit</i> elements from the supplier. Semantically similar to:
239      *
240      * <pre>
241      * <code>
242      *   for(int i=0; i < limit && relaxedOffer(s.get(); i++);
243      * </code>
244      * </pre>
245      *
246      * There's no strong commitment to the queue being full at the end of a fill. Called from a producer
247      * thread subject to the restrictions appropriate to the implementation.
248      *
249      * @return the number of offered elements
250      */
251     int fill(Supplier<T> s, int limit);
252 
253     /**
254      * Remove elements from the queue and hand to consume forever. Semantically similar to:
255      *
256      * <pre>
257      * <code>
258      *  int idleCounter = 0;
259      *  while (exit.keepRunning()) {
260      *      E e = relaxedPoll();
261      *      if(e==null){
262      *          idleCounter = wait.idle(idleCounter);
263      *          continue;
264      *      }
265      *      idleCounter = 0;
266      *      c.accept(e);
267      *  }
268      * </code>
269      * </pre>
270      *
271      * Called from a consumer thread subject to the restrictions appropriate to the implementation.
272      *
273      */
274     void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit);
275 
276     /**
277      * Stuff the queue with elements from the supplier forever. Semantically similar to:
278      *
279      * <pre>
280      * <code>
281      *  int idleCounter = 0;
282      *  while (exit.keepRunning()) {
283      *      E e = s.get();
284      *      while (!relaxedOffer(e)) {
285      *          idleCounter = wait.idle(idleCounter);
286      *          continue;
287      *      }
288      *      idleCounter = 0;
289      *  }
290      * </code>
291      * </pre>
292      *
293      * Called from a producer thread subject to the restrictions appropriate to the implementation.
294      *
295      */
296     void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit);
297 }