1 /* 2 * Copyright 2016 The Netty Project 3 * 4 * The Netty Project licenses this file to you under the Apache License, 5 * version 2.0 (the "License"); you may not use this file except in compliance 6 * with the License. You may obtain a copy of the License at: 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 /* 17 * Licensed under the Apache License, Version 2.0 (the "License"); 18 * you may not use this file except in compliance with the License. 19 * You may obtain a copy of the License at 20 * 21 * http://www.apache.org/licenses/LICENSE-2.0 22 * 23 * Unless required by applicable law or agreed to in writing, software 24 * distributed under the License is distributed on an "AS IS" BASIS, 25 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 26 * See the License for the specific language governing permissions and 27 * limitations under the License. 28 */ 29 package io.netty.util.internal; 30 31 import java.util.Queue; 32 33 /** 34 * Forked from <a href="https://github.com/JCTools/JCTools">JCTools</a>. 35 * 36 * This is a tagging interface for the queues in this library which implement a subset of the [email protected] Queue} 37 * interface sufficient for concurrent message passing.<br> 38 * Message passing queues provide happens before semantics to messages passed through, namely that writes made 39 * by the producer before offering the message are visible to the consuming thread after the message has been 40 * polled out of the queue. 41 * 42 * @param <T> 43 * the event/message type 44 */ 45 public interface MessagePassingQueue<T> { 46 int UNBOUNDED_CAPACITY = -1; 47 48 interface Supplier<T> { 49 /** 50 * This method will return the next value to be written to the queue. As such the queue 51 * implementations are commited to insert the value once the call is made. 52 * <p> 53 * Users should be aware that underlying queue implementations may upfront claim parts of the queue 54 * for batch operations and this will effect the view on the queue from the supplier method. In 55 * particular size and any offer methods may take the view that the full batch has already happened. 56 * 57 * @return new element, NEVER null 58 */ 59 T get(); 60 } 61 62 interface Consumer<T> { 63 /** 64 * This method will process an element already removed from the queue. This method is expected to 65 * never throw an exception. 66 * <p> 67 * Users should be aware that underlying queue implementations may upfront claim parts of the queue 68 * for batch operations and this will effect the view on the queue from the accept method. In 69 * particular size and any poll/peek methods may take the view that the full batch has already 70 * happened. 71 * 72 * @param e not null 73 */ 74 void accept(T e); 75 } 76 77 interface WaitStrategy { 78 /** 79 * This method can implement static or dynamic backoff. Dynamic backoff will rely on the counter for 80 * estimating how long the caller has been idling. The expected usage is: 81 * 82 * <pre> 83 * <code> 84 * int ic = 0; 85 * while(true) { 86 * if(!isGodotArrived()) { 87 * ic = w.idle(ic); 88 * continue; 89 * } 90 * ic = 0; 91 * // party with Godot until he goes again 92 * } 93 * </code> 94 * </pre> 95 * 96 * @param idleCounter idle calls counter, managed by the idle method until reset 97 * @return new counter value to be used on subsequent idle cycle 98 */ 99 int idle(int idleCounter); 100 } 101 102 interface ExitCondition { 103 104 /** 105 * This method should be implemented such that the flag read or determination cannot be hoisted out of 106 * a loop which notmally means a volatile load, but with JDK9 VarHandles may mean getOpaque. 107 * 108 * @return true as long as we should keep running 109 */ 110 boolean keepRunning(); 111 } 112 113 /** 114 * Called from a producer thread subject to the restrictions appropriate to the implementation and 115 * according to the [email protected] Queue#offer(Object)} interface. 116 * 117 * @param e not null, will throw NPE if it is 118 * @return true if element was inserted into the queue, false iff full 119 */ 120 boolean offer(T e); 121 122 /** 123 * Called from the consumer thread subject to the restrictions appropriate to the implementation and 124 * according to the [email protected] Queue#poll()} interface. 125 * 126 * @return a message from the queue if one is available, null iff empty 127 */ 128 T poll(); 129 130 /** 131 * Called from the consumer thread subject to the restrictions appropriate to the implementation and 132 * according to the [email protected] Queue#peek()} interface. 133 * 134 * @return a message from the queue if one is available, null iff empty 135 */ 136 T peek(); 137 138 /** 139 * This method's accuracy is subject to concurrent modifications happening as the size is estimated and as 140 * such is a best effort rather than absolute value. For some implementations this method may be O(n) 141 * rather than O(1). 142 * 143 * @return number of messages in the queue, between 0 and [email protected] Integer#MAX_VALUE} but less or equals to 144 * capacity (if bounded). 145 */ 146 int size(); 147 148 /** 149 * Removes all items from the queue. Called from the consumer thread subject to the restrictions 150 * appropriate to the implementation and according to the [email protected] Queue#clear()} interface. 151 */ 152 void clear(); 153 154 /** 155 * This method's accuracy is subject to concurrent modifications happening as the observation is carried 156 * out. 157 * 158 * @return true if empty, false otherwise 159 */ 160 boolean isEmpty(); 161 162 /** 163 * @return the capacity of this queue or UNBOUNDED_CAPACITY if not bounded 164 */ 165 int capacity(); 166 167 /** 168 * Called from a producer thread subject to the restrictions appropriate to the implementation. As opposed 169 * to [email protected] Queue#offer(Object)} this method may return false without the queue being full. 170 * 171 * @param e not null, will throw NPE if it is 172 * @return true if element was inserted into the queue, false if unable to offer 173 */ 174 boolean relaxedOffer(T e); 175 176 /** 177 * Called from the consumer thread subject to the restrictions appropriate to the implementation. As 178 * opposed to [email protected] Queue#poll()} this method may return null without the queue being empty. 179 * 180 * @return a message from the queue if one is available, null if unable to poll 181 */ 182 T relaxedPoll(); 183 184 /** 185 * Called from the consumer thread subject to the restrictions appropriate to the implementation. As 186 * opposed to [email protected] Queue#peek()} this method may return null without the queue being empty. 187 * 188 * @return a message from the queue if one is available, null if unable to peek 189 */ 190 T relaxedPeek(); 191 192 /** 193 * Remove all available item from the queue and hand to consume. This should be semantically similar to: 194 * <pre><code> 195 * M m; 196 * while((m = relaxedPoll()) != null){ 197 * c.accept(m); 198 * } 199 * </code></pre> 200 * There's no strong commitment to the queue being empty at the end of a drain. Called from a 201 * consumer thread subject to the restrictions appropriate to the implementation. 202 * 203 * @return the number of polled elements 204 */ 205 int drain(Consumer<T> c); 206 207 /** 208 * Stuff the queue with elements from the supplier. Semantically similar to: 209 * <pre><code> 210 * while(relaxedOffer(s.get()); 211 * </code></pre> 212 * There's no strong commitment to the queue being full at the end of a fill. Called from a 213 * producer thread subject to the restrictions appropriate to the implementation. 214 * 215 * @return the number of offered elements 216 */ 217 int fill(Supplier<T> s); 218 219 /** 220 * Remove up to <i>limit</i> elements from the queue and hand to consume. This should be semantically 221 * similar to: 222 * 223 * <pre><code> 224 * M m; 225 * while((m = relaxedPoll()) != null){ 226 * c.accept(m); 227 * } 228 * </code></pre> 229 * 230 * There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer 231 * thread subject to the restrictions appropriate to the implementation. 232 * 233 * @return the number of polled elements 234 */ 235 int drain(Consumer<T> c, int limit); 236 237 /** 238 * Stuff the queue with up to <i>limit</i> elements from the supplier. Semantically similar to: 239 * 240 * <pre> 241 * <code> 242 * for(int i=0; i < limit && relaxedOffer(s.get(); i++); 243 * </code> 244 * </pre> 245 * 246 * There's no strong commitment to the queue being full at the end of a fill. Called from a producer 247 * thread subject to the restrictions appropriate to the implementation. 248 * 249 * @return the number of offered elements 250 */ 251 int fill(Supplier<T> s, int limit); 252 253 /** 254 * Remove elements from the queue and hand to consume forever. Semantically similar to: 255 * 256 * <pre> 257 * <code> 258 * int idleCounter = 0; 259 * while (exit.keepRunning()) { 260 * E e = relaxedPoll(); 261 * if(e==null){ 262 * idleCounter = wait.idle(idleCounter); 263 * continue; 264 * } 265 * idleCounter = 0; 266 * c.accept(e); 267 * } 268 * </code> 269 * </pre> 270 * 271 * Called from a consumer thread subject to the restrictions appropriate to the implementation. 272 * 273 */ 274 void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit); 275 276 /** 277 * Stuff the queue with elements from the supplier forever. Semantically similar to: 278 * 279 * <pre> 280 * <code> 281 * int idleCounter = 0; 282 * while (exit.keepRunning()) { 283 * E e = s.get(); 284 * while (!relaxedOffer(e)) { 285 * idleCounter = wait.idle(idleCounter); 286 * continue; 287 * } 288 * idleCounter = 0; 289 * } 290 * </code> 291 * </pre> 292 * 293 * Called from a producer thread subject to the restrictions appropriate to the implementation. 294 * 295 */ 296 void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit); 297 }