001 /*
002 * This file is part of the Jikes RVM project (http://jikesrvm.org).
003 *
004 * This file is licensed to You under the Eclipse Public License (EPL);
005 * You may not use this file except in compliance with the License. You
006 * may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/eclipse-1.0.php
009 *
010 * See the COPYRIGHT.txt file distributed with this work for information
011 * regarding copyright ownership.
012 */
013 package org.mmtk.utility.deque;
014
015 import org.mmtk.utility.Constants;
016
017 import org.mmtk.vm.VM;
018
019 import org.vmmagic.pragma.*;
020 import org.vmmagic.unboxed.*;
021
022 /**
023 * This class implements a local (<i>unsynchronized</i>) queue.
024 * A queue is strictly FIFO.<p>
025 *
026 * Each instance stores word-sized values into a local buffer. When
027 * the buffer is full, or if the <code>flushLocal()</code> method is
028 * called, the buffer enqueued at the tail of a
029 * <code>SharedDeque</code>.
030 *
031 * The implementation is intended to be as efficient as possible, in
032 * time and space, and is the basis for the <code>TraceBuffer</code> used by
033 * heap trace generation. Each instance adds a single field to those inherited
034 * from the SSB: a bump pointer.
035 *
036 * Preconditions: Buffers are always aligned on buffer-size address
037 * boundaries.<p>
038 *
039 * Invariants: Buffers are filled such that tuples (of the specified
040 * arity) are packed to the low end of the buffer. Thus buffer
041 * underflows will always arise when then cursor is buffer-size aligned.
042 */
043 @Uninterruptible class LocalQueue extends LocalSSB implements Constants {
044
045 /**
046 * Constructor
047 *
048 * @param queue The shared queue to which this local ssb will append
049 * its buffers (when full or flushed).
050 */
051 LocalQueue(SharedDeque queue) {
052 super(queue);
053 }
054
055 /****************************************************************************
056 *
057 * Protected instance methods and fields
058 */
059
060 /** the start of the buffer */
061 @Entrypoint
062 protected Address head;
063
064 @Override
065 public void resetLocal() {
066 super.resetLocal();
067 head = Deque.HEAD_INITIAL_VALUE;
068 }
069
070 /**
071 * Check whether there are values in the buffer for a pending dequeue.
072 * If there is not data, grab the first buffer on the shared queue
073 * (after freeing the buffer).
074 *
075 * @param arity The arity of the values stored in this queue: the
076 * buffer must contain enough space for this many words.
077 */
078 @Inline
079 protected final boolean checkDequeue(int arity) {
080 if (bufferOffset(head).isZero()) {
081 return dequeueUnderflow(arity);
082 } else {
083 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(bufferOffset(head).sGE(Word.fromIntZeroExtend(arity).lsh(LOG_BYTES_IN_ADDRESS).toOffset()));
084 return true;
085 }
086 }
087
088 /**
089 * Dequeue a value from the buffer. This is <i>unchecked</i>. The
090 * caller must first call <code>checkDequeue()</code> to ensure the
091 * buffer has and entry to be removed.
092 *
093 * @return The first entry on the queue.
094 */
095 @Inline
096 protected final Address uncheckedDequeue(){
097 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(bufferOffset(head).sGE(Offset.fromIntZeroExtend(BYTES_IN_ADDRESS)));
098 head = head.minus(BYTES_IN_ADDRESS);
099 return head.loadAddress();
100 }
101
102 /**
103 * The head is empty (or null), and the shared queue has no buffers
104 * available. If the tail has sufficient entries, consume the tail.
105 * Otherwise try wait on the global queue until either all other
106 * clients of the queue reach exhaustion or a buffer becomes
107 * available.
108 *
109 * @param arity The arity of this buffer
110 * @return True if the consumer has eaten all the entries
111 */
112 protected final boolean headStarved(int arity) {
113 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == queue.getArity());
114
115 // If the tail has entries...
116 if (tail.NE(tailBufferEnd)) {
117 head = normalizeTail(arity).plus(BYTES_IN_ADDRESS);
118 tail = Deque.TAIL_INITIAL_VALUE;
119 tailBufferEnd = Deque.TAIL_INITIAL_VALUE;
120 // Return that we acquired more entries
121 return false;
122 }
123 // Wait for another entry to materialize...
124 head = queue.dequeueAndWait(arity);
125 // return true if a) there is a head buffer, and b) it is non-empty
126 return (head.EQ(Deque.HEAD_INITIAL_VALUE) || bufferOffset(head).isZero());
127 }
128
129 /****************************************************************************
130 *
131 * Private instance methods
132 */
133
134 /**
135 * There are not sufficient entries in the head buffer for a pending
136 * dequeue. Acquire a new head buffer. If the shared queue has no
137 * buffers available, consume the tail if necessary. Return false
138 * if entries cannot be acquired.
139 *
140 * @param arity The arity of this buffer (used for sanity test only).
141 * @return True if there the head buffer has been successfully
142 * replenished.
143 */
144 @NoInline
145 private boolean dequeueUnderflow(int arity) {
146 if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == queue.getArity());
147 do {
148 if (head.NE(Deque.HEAD_INITIAL_VALUE))
149 queue.free(head);
150 head = queue.dequeue(arity);
151 } while (head.NE(Deque.HEAD_INITIAL_VALUE) && bufferOffset(head).isZero());
152
153 if (head.EQ(Deque.HEAD_INITIAL_VALUE))
154 return !headStarved(arity);
155
156 return true;
157 }
158 }