001 /*
002 * This file is part of the Jikes RVM project (http://jikesrvm.org).
003 *
004 * This file is licensed to You under the Eclipse Public License (EPL);
005 * You may not use this file except in compliance with the License. You
006 * may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/eclipse-1.0.php
009 *
010 * See the COPYRIGHT.txt file distributed with this work for information
011 * regarding copyright ownership.
012 */
013 package org.mmtk.utility.deque;
014
015 import org.mmtk.policy.RawPageSpace;
016 import org.mmtk.policy.Space;
017 import org.mmtk.utility.Constants;
018 import org.mmtk.utility.Log;
019 import org.mmtk.vm.Lock;
020 import org.mmtk.vm.VM;
021 import org.vmmagic.pragma.Entrypoint;
022 import org.vmmagic.pragma.Inline;
023 import org.vmmagic.pragma.Uninterruptible;
024 import org.vmmagic.unboxed.Address;
025 import org.vmmagic.unboxed.Offset;
026
027 /**
028 * This supports <i>unsynchronized</i> enqueuing and dequeuing of buffers
029 * for shared use. The data can be added to and removed from either end
030 * of the deque.
031 */
032 @Uninterruptible
033 public class SharedDeque extends Deque implements Constants {
034 private static final boolean DISABLE_WAITING = true;
035 private static final Offset NEXT_OFFSET = Offset.zero();
036 private static final Offset PREV_OFFSET = Offset.fromIntSignExtend(BYTES_IN_ADDRESS);
037
038 private static final boolean TRACE = false;
039 private static final boolean TRACE_DETAIL = false;
040 private static final boolean TRACE_BLOCKERS = false;
041
042 /****************************************************************************
043 *
044 * Public instance methods
045 */
046
047 /**
048 * Constructor
049 */
050 public SharedDeque(String name, RawPageSpace rps, int arity) {
051 this.rps = rps;
052 this.arity = arity;
053 this.name = name;
054 lock = VM.newLock("SharedDeque");
055 clearCompletionFlag();
056 head = HEAD_INITIAL_VALUE;
057 tail = TAIL_INITIAL_VALUE;
058 }
059
060 /** Get the arity (words per entry) of this queue */
061 @Inline
062 final int getArity() { return arity; }
063
064 /**
065 * Enqueue a block on the head or tail of the shared queue
066 *
067 * @param buf
068 * @param arity
069 * @param toTail
070 */
071 final void enqueue(Address buf, int arity, boolean toTail) {
072 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity);
073 lock();
074 if (toTail) {
075 // Add to the tail of the queue
076 setNext(buf, Address.zero());
077 if (tail.EQ(TAIL_INITIAL_VALUE))
078 head = buf;
079 else
080 setNext(tail, buf);
081 setPrev(buf, tail);
082 tail = buf;
083 } else {
084 // Add to the head of the queue
085 setPrev(buf, Address.zero());
086 if (head.EQ(HEAD_INITIAL_VALUE))
087 tail = buf;
088 else
089 setPrev(head, buf);
090 setNext(buf, head);
091 head = buf;
092 }
093 bufsenqueued++;
094 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(checkDequeLength(bufsenqueued));
095 unlock();
096 }
097
098 public final void clearDeque(int arity) {
099 Address buf = dequeue(arity);
100 while (!buf.isZero()) {
101 free(bufferStart(buf));
102 buf = dequeue(arity);
103 }
104 setCompletionFlag();
105 }
106
107 @Inline
108 final Address dequeue(int arity) {
109 return dequeue(arity, false);
110 }
111
112 final Address dequeue(int arity, boolean fromTail) {
113 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity);
114 return dequeue(false, fromTail);
115 }
116
117 @Inline
118 final Address dequeueAndWait(int arity) {
119 return dequeueAndWait(arity, false);
120 }
121
122 final Address dequeueAndWait(int arity, boolean fromTail) {
123 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity);
124 Address buf = dequeue(false, fromTail);
125 if (buf.isZero() && (!complete())) {
126 buf = dequeue(true, fromTail); // Wait inside dequeue
127 }
128 return buf;
129 }
130
131 /**
132 * Prepare for parallel processing. All active GC threads will
133 * participate, and pop operations will block until all work
134 * is complete.
135 */
136 public final void prepare() {
137 if (DISABLE_WAITING) {
138 prepareNonBlocking();
139 } else {
140 /* This should be the normal mode of operation once performance is fixed */
141 prepare(VM.activePlan.collector().parallelWorkerCount());
142 }
143 }
144
145 /**
146 * Prepare for processing where pop operations on the deques
147 * will never block.
148 */
149 public final void prepareNonBlocking() {
150 prepare(1);
151 }
152
153 /**
154 * Prepare for parallel processing where a specific number
155 * of threads take part.
156 *
157 * @param consumers # threads taking part.
158 */
159 private void prepare(int consumers) {
160 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(numConsumersWaiting == 0);
161 setNumConsumers(consumers);
162 clearCompletionFlag();
163 }
164
165 public final void reset() {
166 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(numConsumersWaiting == 0);
167 clearCompletionFlag();
168 setNumConsumersWaiting(0);
169 assertExhausted();
170 }
171
172 public final void assertExhausted() {
173 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(head.isZero() && tail.isZero());
174 }
175
176 @Inline
177 final Address alloc() {
178 Address rtn = rps.acquire(PAGES_PER_BUFFER);
179 if (rtn.isZero()) {
180 Space.printUsageMB();
181 VM.assertions.fail("Failed to allocate space for queue. Is metadata virtual memory exhausted?");
182 }
183 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(rtn.EQ(bufferStart(rtn)));
184 return rtn;
185 }
186
187 @Inline
188 final void free(Address buf) {
189 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(buf.EQ(bufferStart(buf)) && !buf.isZero());
190 rps.release(buf);
191 }
192
193 @Inline
194 public final int enqueuedPages() {
195 return bufsenqueued * PAGES_PER_BUFFER;
196 }
197
198 /****************************************************************************
199 *
200 * Private instance methods and fields
201 */
202
203 /** The name of this shared deque - for diagnostics */
204 private final String name;
205
206 /** Raw page space from which to allocate */
207 private RawPageSpace rps;
208
209 /** Number of words per entry */
210 private final int arity;
211
212 /** Completion flag - set when all consumers have arrived at the barrier */
213 @Entrypoint
214 private volatile int completionFlag;
215
216 /** # active threads - processing is complete when # waiting == this */
217 @Entrypoint
218 private volatile int numConsumers;
219
220 /** # threads waiting */
221 @Entrypoint
222 private volatile int numConsumersWaiting;
223
224 /** Head of the shared deque */
225 @Entrypoint
226 protected volatile Address head;
227
228 /** Tail of the shared deque */
229 @Entrypoint
230 protected volatile Address tail;
231 @Entrypoint
232 private volatile int bufsenqueued;
233 private Lock lock;
234
235 private static final long WARN_PERIOD = (long)(2*1E9);
236 private static final long TIMEOUT_PERIOD = 10 * WARN_PERIOD;
237
238 /**
239 * Dequeue a block from the shared pool. If 'waiting' is true, and the
240 * queue is empty, wait for either a new block to show up or all the
241 * other consumers to join us.
242 *
243 * @param waiting
244 * @param fromTail
245 * @return the Address of the block
246 */
247 private Address dequeue(boolean waiting, boolean fromTail) {
248 lock();
249 Address rtn = ((fromTail) ? tail : head);
250 if (rtn.isZero()) {
251 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(tail.isZero() && head.isZero());
252 // no buffers available
253 if (waiting) {
254 int ordinal = TRACE ? 0 : VM.activePlan.collector().getId();
255 setNumConsumersWaiting(numConsumersWaiting + 1);
256 while (rtn.isZero()) {
257 if (numConsumersWaiting == numConsumers)
258 setCompletionFlag();
259 if (TRACE) {
260 Log.write("-- ("); Log.write(ordinal);
261 Log.write(") joining wait queue of SharedDeque(");
262 Log.write(name); Log.write(") ");
263 Log.write(numConsumersWaiting); Log.write("/");
264 Log.write(numConsumers);
265 Log.write(" consumers waiting");
266 if (complete()) Log.write(" WAIT COMPLETE");
267 Log.writeln();
268 if (TRACE_BLOCKERS)
269 VM.assertions.dumpStack();
270 }
271 unlock();
272 // Spin and wait
273 spinWait(fromTail);
274
275 if (complete()) {
276 if (TRACE) {
277 Log.write("-- ("); Log.write(ordinal); Log.writeln(") EXITING");
278 }
279 lock();
280 setNumConsumersWaiting(numConsumersWaiting - 1);
281 unlock();
282 return Address.zero();
283 }
284 lock();
285 // Re-get the list head/tail while holding the lock
286 rtn = ((fromTail) ? tail : head);
287 }
288 setNumConsumersWaiting(numConsumersWaiting - 1);
289 if (TRACE) {
290 Log.write("-- ("); Log.write(ordinal); Log.write(") resuming work ");
291 Log.write(" n="); Log.writeln(numConsumersWaiting);
292 }
293 } else {
294 unlock();
295 return Address.zero();
296 }
297 }
298 if (fromTail) {
299 // dequeue the tail buffer
300 setTail(getPrev(tail));
301 if (head.EQ(rtn)) {
302 setHead(Address.zero());
303 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(tail.isZero());
304 } else {
305 setNext(tail, Address.zero());
306 }
307 } else {
308 // dequeue the head buffer
309 setHead(getNext(head));
310 if (tail.EQ(rtn)) {
311 setTail(Address.zero());
312 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(head.isZero());
313 } else {
314 setPrev(head, Address.zero());
315 }
316 }
317 bufsenqueued--;
318 unlock();
319 return rtn;
320 }
321
322 /**
323 * Spinwait for GC work to arrive
324 *
325 * @param fromTail Check the head or the tail ?
326 */
327 private void spinWait(boolean fromTail) {
328 long startNano = 0;
329 long lastElapsedNano = 0;
330 while (true) {
331 long startCycles = VM.statistics.cycles();
332 long endCycles = startCycles + ((long) 1e9); // a few hundred milliseconds more or less.
333 long nowCycles;
334 do {
335 VM.memory.isync();
336 Address rtn = ((fromTail) ? tail : head);
337 if (!rtn.isZero() || complete()) return;
338 nowCycles = VM.statistics.cycles();
339 } while (startCycles < nowCycles && nowCycles < endCycles); /* check against both ends to guard against CPU migration */
340
341 /*
342 * According to the cycle counter, we've been spinning for a while.
343 * Time to check nanoTime and see if we should print a warning and/or fail.
344 * We lock the deque while doing this to avoid interleaved messages from multiple threads.
345 */
346 lock();
347 if (startNano == 0) {
348 startNano = VM.statistics.nanoTime();
349 } else {
350 long nowNano = VM.statistics.nanoTime();
351 long elapsedNano = nowNano - startNano;
352 if (elapsedNano - lastElapsedNano > WARN_PERIOD) {
353 Log.write("GC Warning: SharedDeque("); Log.write(name);
354 Log.write(") wait has reached "); Log.write(VM.statistics.nanosToSecs(elapsedNano));
355 Log.write(", "); Log.write(numConsumersWaiting); Log.write("/");
356 Log.write(numConsumers); Log.writeln(" threads waiting");
357 lastElapsedNano = elapsedNano;
358 }
359 if (elapsedNano > TIMEOUT_PERIOD) {
360 unlock(); // To allow other GC threads to die in turn
361 VM.assertions.fail("GC Error: SharedDeque Timeout");
362 }
363 }
364 unlock();
365 }
366 }
367
368 /**
369 * Set the "next" pointer in a buffer forming the linked buffer chain.
370 *
371 * @param buf The buffer whose next field is to be set.
372 * @param next The reference to which next should point.
373 */
374 private static void setNext(Address buf, Address next) {
375 buf.store(next, NEXT_OFFSET);
376 }
377
378 /**
379 * Get the "next" pointer in a buffer forming the linked buffer chain.
380 *
381 * @param buf The buffer whose next field is to be returned.
382 * @return The next field for this buffer.
383 */
384 protected final Address getNext(Address buf) {
385 return buf.loadAddress(NEXT_OFFSET);
386 }
387
388 /**
389 * Set the "prev" pointer in a buffer forming the linked buffer chain.
390 *
391 * @param buf The buffer whose next field is to be set.
392 * @param prev The reference to which prev should point.
393 */
394 private void setPrev(Address buf, Address prev) {
395 buf.store(prev, PREV_OFFSET);
396 }
397
398 /**
399 * Get the "next" pointer in a buffer forming the linked buffer chain.
400 *
401 * @param buf The buffer whose next field is to be returned.
402 * @return The next field for this buffer.
403 */
404 protected final Address getPrev(Address buf) {
405 return buf.loadAddress(PREV_OFFSET);
406 }
407
408 /**
409 * Check the number of buffers in the work queue (for debugging
410 * purposes).
411 *
412 * @param length The number of buffers believed to be in the queue.
413 * @return True if the length of the queue matches length.
414 */
415 private boolean checkDequeLength(int length) {
416 Address top = head;
417 int l = 0;
418 while (!top.isZero() && l <= length) {
419 top = getNext(top);
420 l++;
421 }
422 return l == length;
423 }
424
425 /**
426 * Lock this shared queue. We use one simple low-level lock to
427 * synchronize access to the shared queue of buffers.
428 */
429 private void lock() {
430 lock.acquire();
431 }
432
433 /**
434 * Release the lock. We use one simple low-level lock to synchronize
435 * access to the shared queue of buffers.
436 */
437 private void unlock() {
438 lock.release();
439 }
440
441 /**
442 * Is the current round of processing complete ?
443 */
444 private boolean complete() {
445 return completionFlag == 1;
446 }
447
448 /**
449 * Set the completion flag.
450 */
451 @Inline
452 private void setCompletionFlag() {
453 if (TRACE_DETAIL) {
454 Log.writeln("# setCompletionFlag: ");
455 }
456 completionFlag = 1;
457 }
458
459 /**
460 * Clear the completion flag.
461 */
462 @Inline
463 private void clearCompletionFlag() {
464 if (TRACE_DETAIL) {
465 Log.writeln("# clearCompletionFlag: ");
466 }
467 completionFlag = 0;
468 }
469
470 @Inline
471 private void setNumConsumers(int newNumConsumers) {
472 if (TRACE_DETAIL) {
473 Log.write("# Num consumers "); Log.writeln(newNumConsumers);
474 }
475 numConsumers = newNumConsumers;
476 }
477
478 @Inline
479 private void setNumConsumersWaiting(int newNCW) {
480 if (TRACE_DETAIL) {
481 Log.write("# Num consumers waiting "); Log.writeln(newNCW);
482 }
483 numConsumersWaiting = newNCW;
484 }
485
486 @Inline
487 private void setHead(Address newHead) {
488 head = newHead;
489 VM.memory.sync();
490 }
491
492 @Inline
493 private void setTail(Address newTail) {
494 tail = newTail;
495 VM.memory.sync();
496 }
497 }