1 /* 2 * Licensed under the Apache License, Version 2.0 (the "License"); 3 * you may not use this file except in compliance with the License. 4 * You may obtain a copy of the License at 5 * 6 * http://www.apache.org/licenses/LICENSE-2.0 7 * 8 * Unless required by applicable law or agreed to in writing, software 9 * distributed under the License is distributed on an "AS IS" BASIS, 10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 * See the License for the specific language governing permissions and 12 * limitations under the License. 13 */ 14 package io.netty.util.internal.shaded.org.jctools.queues; 15 16 import java.util.Queue; 17 18 /** 19 * Message passing queues are intended for concurrent method passing. A subset of {@link Queue} methods are provided 20 * with the same semantics, while further functionality which accomodates the concurrent usecase is also on offer. 21 * <p> 22 * Message passing queues provide happens before semantics to messages passed through, namely that writes made 23 * by the producer before offering the message are visible to the consuming thread after the message has been 24 * polled out of the queue. 25 * 26 * @param <T> the event/message type 27 */ 28 public interface MessagePassingQueue<T> 29 { 30 int UNBOUNDED_CAPACITY = -1; 31 32 interface Supplier<T> 33 { 34 /** 35 * This method will return the next value to be written to the queue. As such the queue 36 * implementations are commited to insert the value once the call is made. 37 * <p> 38 * Users should be aware that underlying queue implementations may upfront claim parts of the queue 39 * for batch operations and this will effect the view on the queue from the supplier method. In 40 * particular size and any offer methods may take the view that the full batch has already happened. 41 * 42 * <p><b>WARNING</b>: this method is assumed to never throw. Breaking this assumption can lead to a broken queue. 43 * <p><b>WARNING</b>: this method is assumed to never return {@code null}. Breaking this assumption can lead to a broken queue. 44 * 45 * @return new element, NEVER {@code null} 46 */ 47 T get(); 48 } 49 50 interface Consumer<T> 51 { 52 /** 53 * This method will process an element already removed from the queue. This method is expected to 54 * never throw an exception. 55 * <p> 56 * Users should be aware that underlying queue implementations may upfront claim parts of the queue 57 * for batch operations and this will effect the view on the queue from the accept method. In 58 * particular size and any poll/peek methods may take the view that the full batch has already 59 * happened. 60 * 61 * <p><b>WARNING</b>: this method is assumed to never throw. Breaking this assumption can lead to a broken queue. 62 * @param e not {@code null} 63 */ 64 void accept(T e); 65 } 66 67 interface WaitStrategy 68 { 69 /** 70 * This method can implement static or dynamic backoff. Dynamic backoff will rely on the counter for 71 * estimating how long the caller has been idling. The expected usage is: 72 * <p> 73 * <pre> 74 * <code> 75 * int ic = 0; 76 * while(true) { 77 * if(!isGodotArrived()) { 78 * ic = w.idle(ic); 79 * continue; 80 * } 81 * ic = 0; 82 * // party with Godot until he goes again 83 * } 84 * </code> 85 * </pre> 86 * 87 * @param idleCounter idle calls counter, managed by the idle method until reset 88 * @return new counter value to be used on subsequent idle cycle 89 */ 90 int idle(int idleCounter); 91 } 92 93 interface ExitCondition 94 { 95 96 /** 97 * This method should be implemented such that the flag read or determination cannot be hoisted out of 98 * a loop which notmally means a volatile load, but with JDK9 VarHandles may mean getOpaque. 99 * 100 * @return true as long as we should keep running 101 */ 102 boolean keepRunning(); 103 } 104 105 /** 106 * Called from a producer thread subject to the restrictions appropriate to the implementation and 107 * according to the {@link Queue#offer(Object)} interface. 108 * 109 * @param e not {@code null}, will throw NPE if it is 110 * @return true if element was inserted into the queue, false iff full 111 */ 112 boolean offer(T e); 113 114 /** 115 * Called from the consumer thread subject to the restrictions appropriate to the implementation and 116 * according to the {@link Queue#poll()} interface. 117 * 118 * @return a message from the queue if one is available, {@code null} iff empty 119 */ 120 T poll(); 121 122 /** 123 * Called from the consumer thread subject to the restrictions appropriate to the implementation and 124 * according to the {@link Queue#peek()} interface. 125 * 126 * @return a message from the queue if one is available, {@code null} iff empty 127 */ 128 T peek(); 129 130 /** 131 * This method's accuracy is subject to concurrent modifications happening as the size is estimated and as 132 * such is a best effort rather than absolute value. For some implementations this method may be O(n) 133 * rather than O(1). 134 * 135 * @return number of messages in the queue, between 0 and {@link Integer#MAX_VALUE} but less or equals to 136 * capacity (if bounded). 137 */ 138 int size(); 139 140 /** 141 * Removes all items from the queue. Called from the consumer thread subject to the restrictions 142 * appropriate to the implementation and according to the {@link Queue#clear()} interface. 143 */ 144 void clear(); 145 146 /** 147 * This method's accuracy is subject to concurrent modifications happening as the observation is carried 148 * out. 149 * 150 * @return true if empty, false otherwise 151 */ 152 boolean isEmpty(); 153 154 /** 155 * @return the capacity of this queue or {@link MessagePassingQueue#UNBOUNDED_CAPACITY} if not bounded 156 */ 157 int capacity(); 158 159 /** 160 * Called from a producer thread subject to the restrictions appropriate to the implementation. As opposed 161 * to {@link Queue#offer(Object)} this method may return false without the queue being full. 162 * 163 * @param e not {@code null}, will throw NPE if it is 164 * @return true if element was inserted into the queue, false if unable to offer 165 */ 166 boolean relaxedOffer(T e); 167 168 /** 169 * Called from the consumer thread subject to the restrictions appropriate to the implementation. As 170 * opposed to {@link Queue#poll()} this method may return {@code null} without the queue being empty. 171 * 172 * @return a message from the queue if one is available, {@code null} if unable to poll 173 */ 174 T relaxedPoll(); 175 176 /** 177 * Called from the consumer thread subject to the restrictions appropriate to the implementation. As 178 * opposed to {@link Queue#peek()} this method may return {@code null} without the queue being empty. 179 * 180 * @return a message from the queue if one is available, {@code null} if unable to peek 181 */ 182 T relaxedPeek(); 183 184 /** 185 * Remove up to <i>limit</i> elements from the queue and hand to consume. This should be semantically 186 * similar to: 187 * <p> 188 * <pre>{@code 189 * M m; 190 * int i = 0; 191 * for(;i < limit && (m = relaxedPoll()) != null; i++){ 192 * c.accept(m); 193 * } 194 * return i; 195 * }</pre> 196 * <p> 197 * There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer 198 * thread subject to the restrictions appropriate to the implementation. 199 * <p> 200 * <b>WARNING</b>: Explicit assumptions are made with regards to {@link Consumer#accept} make sure you have read 201 * and understood these before using this method. 202 * 203 * @return the number of polled elements 204 * @throws IllegalArgumentException c is {@code null} 205 * @throws IllegalArgumentException if limit is negative 206 */ 207 int drain(Consumer<T> c, int limit); 208 209 /** 210 * Stuff the queue with up to <i>limit</i> elements from the supplier. Semantically similar to: 211 * <p> 212 * <pre>{@code 213 * for(int i=0; i < limit && relaxedOffer(s.get()); i++); 214 * }</pre> 215 * <p> 216 * There's no strong commitment to the queue being full at the end of a fill. Called from a producer 217 * thread subject to the restrictions appropriate to the implementation. 218 * 219 * <b>WARNING</b>: Explicit assumptions are made with regards to {@link Supplier#get} make sure you have read 220 * and understood these before using this method. 221 * 222 * @return the number of offered elements 223 * @throws IllegalArgumentException s is {@code null} 224 * @throws IllegalArgumentException if limit is negative 225 */ 226 int fill(Supplier<T> s, int limit); 227 228 /** 229 * Remove all available item from the queue and hand to consume. This should be semantically similar to: 230 * <pre> 231 * M m; 232 * while((m = relaxedPoll()) != null){ 233 * c.accept(m); 234 * } 235 * </pre> 236 * There's no strong commitment to the queue being empty at the end of a drain. Called from a 237 * consumer thread subject to the restrictions appropriate to the implementation. 238 * <p> 239 * <b>WARNING</b>: Explicit assumptions are made with regards to {@link Consumer#accept} make sure you have read 240 * and understood these before using this method. 241 * 242 * @return the number of polled elements 243 * @throws IllegalArgumentException c is {@code null} 244 */ 245 int drain(Consumer<T> c); 246 247 /** 248 * Stuff the queue with elements from the supplier. Semantically similar to: 249 * <pre> 250 * while(relaxedOffer(s.get()); 251 * </pre> 252 * There's no strong commitment to the queue being full at the end of a fill. Called from a 253 * producer thread subject to the restrictions appropriate to the implementation. 254 * <p> 255 * Unbounded queues will fill up the queue with a fixed amount rather than fill up to oblivion. 256 * 257 * <b>WARNING</b>: Explicit assumptions are made with regards to {@link Supplier#get} make sure you have read 258 * and understood these before using this method. 259 * 260 * @return the number of offered elements 261 * @throws IllegalArgumentException s is {@code null} 262 */ 263 int fill(Supplier<T> s); 264 265 /** 266 * Remove elements from the queue and hand to consume forever. Semantically similar to: 267 * <p> 268 * <pre> 269 * int idleCounter = 0; 270 * while (exit.keepRunning()) { 271 * E e = relaxedPoll(); 272 * if(e==null){ 273 * idleCounter = wait.idle(idleCounter); 274 * continue; 275 * } 276 * idleCounter = 0; 277 * c.accept(e); 278 * } 279 * </pre> 280 * <p> 281 * Called from a consumer thread subject to the restrictions appropriate to the implementation. 282 * <p> 283 * <b>WARNING</b>: Explicit assumptions are made with regards to {@link Consumer#accept} make sure you have read 284 * and understood these before using this method. 285 * 286 * @throws IllegalArgumentException c OR wait OR exit are {@code null} 287 */ 288 void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit); 289 290 /** 291 * Stuff the queue with elements from the supplier forever. Semantically similar to: 292 * <p> 293 * <pre> 294 * <code> 295 * int idleCounter = 0; 296 * while (exit.keepRunning()) { 297 * E e = s.get(); 298 * while (!relaxedOffer(e)) { 299 * idleCounter = wait.idle(idleCounter); 300 * continue; 301 * } 302 * idleCounter = 0; 303 * } 304 * </code> 305 * </pre> 306 * <p> 307 * Called from a producer thread subject to the restrictions appropriate to the implementation. The main difference 308 * being that implementors MUST assure room in the queue is available BEFORE calling {@link Supplier#get}. 309 * 310 * <b>WARNING</b>: Explicit assumptions are made with regards to {@link Supplier#get} make sure you have read 311 * and understood these before using this method. 312 * 313 * @throws IllegalArgumentException s OR wait OR exit are {@code null} 314 */ 315 void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit); 316 }