1 package de.desy.acop.displayers.tools;
2
3 /*
4 * @(#)ArrayBlockingQueue.java 1.14 06/06/01
5 *
6 * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
7 * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
8 */
9
10
11 import java.util.concurrent.BlockingQueue;
12 import java.util.concurrent.TimeUnit;
13 import java.util.concurrent.locks.*;
14 import java.util.*;
15
16 /**
17 * Implementation of BlockingQueueu with fixed capacity. If maximum capacity is reached the
18 * last item in the queue is replaced with the newly added item.
19 *
20 * @author Doug Lea
21 * @author Jaka Bobnar, Cosylab
22 * @param <E> the type of elements held in this collection
23 */
24 public class DismissableBlockingQueue<E> extends AbstractQueue<E>
25 implements BlockingQueue<E>, java.io.Serializable {
26
27 /**
28 * Serialization ID. This class relies on default serialization
29 * even for the items array, which is default-serialized, even if
30 * it is empty. Otherwise it could not be declared final, which is
31 * necessary here.
32 */
33 private static final long serialVersionUID = -817911632652898426L;
34
35 /** The queued items */
36 private final E[] items;
37 /** items index for next take, poll or remove */
38 private int takeIndex;
39 /** items index for next put, offer, or add. */
40 private int putIndex;
41 /** Number of items in the queue */
42 private int count;
43
44 /*
45 * Concurrency control uses the classic two-condition algorithm
46 * found in any textbook.
47 */
48
49 /** Main lock guarding all access */
50 private final ReentrantLock lock;
51 /** Condition for waiting takes */
52 private final Condition notEmpty;
53 /** Condition for waiting puts */
54 private final Condition notFull;
55
56 // Internal helper methods
57
58 /**
59 * Circularly increment i.
60 */
61 final int inc(int i) {
62 return (++i == items.length)? 0 : i;
63 }
64
65 final int dec(int i) {
66 return (--i == -1)? items.length -1 : i;
67 }
68
69 /**
70 * Inserts element at current put position, advances, and signals.
71 * Call only when holding lock.
72 */
73 private void insert(E x) {
74 if (count == items.length) {
75 if (putIndex == 0) {
76 items[items.length-1] = x;
77 } else {
78 items[putIndex-1] = x;
79 }
80 notEmpty.signal();
81 } else {
82 items[putIndex] = x;
83 putIndex = inc(putIndex);
84 ++count;
85 notEmpty.signal();
86 }
87 }
88
89 /**
90 * Extracts element at current take position, advances, and signals.
91 * Call only when holding lock.
92 */
93 private E extract() {
94 final E[] items = this.items;
95 E x = items[takeIndex];
96 items[takeIndex] = null;
97 takeIndex = inc(takeIndex);
98 --count;
99 notFull.signal();
100 return x;
101 }
102
103 /**
104 * Utility for remove and iterator.remove: Delete item at position i.
105 * Call only when holding lock.
106 */
107 void removeAt(int i) {
108 final E[] items = this.items;
109 // if removing front item, just advance
110 if (i == takeIndex) {
111 items[takeIndex] = null;
112 takeIndex = inc(takeIndex);
113 } else {
114 // slide over all others up through putIndex.
115 for (;;) {
116 int nexti = inc(i);
117 if (nexti != putIndex) {
118 items[i] = items[nexti];
119 i = nexti;
120 } else {
121 items[i] = null;
122 putIndex = i;
123 break;
124 }
125 }
126 }
127 --count;
128 notFull.signal();
129 }
130
131 /**
132 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
133 * capacity and default access policy.
134 *
135 * @param capacity the capacity of this queue
136 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
137 */
138 public DismissableBlockingQueue(int capacity) {
139 this(capacity, false);
140 }
141
142 /**
143 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
144 * capacity and the specified access policy.
145 *
146 * @param capacity the capacity of this queue
147 * @param fair if <tt>true</tt> then queue accesses for threads blocked
148 * on insertion or removal, are processed in FIFO order;
149 * if <tt>false</tt> the access order is unspecified.
150 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
151 */
152 @SuppressWarnings("unchecked")
153 public DismissableBlockingQueue(int capacity, boolean fair) {
154 if (capacity <= 0)
155 throw new IllegalArgumentException();
156 this.items = (E[]) new Object[capacity];
157 lock = new ReentrantLock(fair);
158 notEmpty = lock.newCondition();
159 notFull = lock.newCondition();
160 }
161
162 /**
163 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
164 * capacity, the specified access policy and initially containing the
165 * elements of the given collection,
166 * added in traversal order of the collection's iterator.
167 *
168 * @param capacity the capacity of this queue
169 * @param fair if <tt>true</tt> then queue accesses for threads blocked
170 * on insertion or removal, are processed in FIFO order;
171 * if <tt>false</tt> the access order is unspecified.
172 * @param c the collection of elements to initially contain
173 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
174 * <tt>c.size()</tt>, or less than 1.
175 * @throws NullPointerException if the specified collection or any
176 * of its elements are null
177 */
178 public DismissableBlockingQueue(int capacity, boolean fair,
179 Collection<? extends E> c) {
180 this(capacity, fair);
181 if (capacity < c.size())
182 throw new IllegalArgumentException();
183
184 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
185 add(it.next());
186 }
187
188 /**
189 * Inserts the specified element at the tail of this queue if it is
190 * possible to do so immediately without exceeding the queue's capacity,
191 * returning <tt>true</tt> upon success and throwing an
192 * <tt>IllegalStateException</tt> if this queue is full.
193 *
194 * @param e the element to add
195 * @return <tt>true</tt> (as specified by {@link Collection#add})
196 * @throws IllegalStateException if this queue is full
197 * @throws NullPointerException if the specified element is null
198 */
199 public boolean add(E e) {
200 return super.add(e);
201 }
202
203 /**
204 * Inserts the specified element at the tail of this queue if it is
205 * possible to do so immediately without exceeding the queue's capacity,
206 * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
207 * is full. This method is generally preferable to method {@link #add},
208 * which can fail to insert an element only by throwing an exception.
209 *
210 * @throws NullPointerException if the specified element is null
211 */
212 public boolean offer(E e) {
213 if (e == null) throw new NullPointerException();
214 final ReentrantLock lock = this.lock;
215 lock.lock();
216 try {
217 insert(e);
218 return true;
219 } finally {
220 lock.unlock();
221 }
222 }
223
224 /**
225 * Inserts the specified element at the tail of this queue, waiting
226 * for space to become available if the queue is full.
227 *
228 * @throws InterruptedException {@inheritDoc}
229 * @throws NullPointerException {@inheritDoc}
230 */
231 public void put(E e) throws InterruptedException {
232 if (e == null) throw new NullPointerException();
233 offer(e);
234 }
235
236 /**
237 * Inserts the specified element at the tail of this queue, waiting
238 * up to the specified wait time for space to become available if
239 * the queue is full.
240 *
241 * @throws InterruptedException {@inheritDoc}
242 * @throws NullPointerException {@inheritDoc}
243 */
244 public boolean offer(E e, long timeout, TimeUnit unit)
245 throws InterruptedException {
246
247 if (e == null) throw new NullPointerException();
248 long nanos = unit.toNanos(timeout);
249 final ReentrantLock lock = this.lock;
250 lock.lockInterruptibly();
251 try {
252 for (;;) {
253 if (count != items.length) {
254 insert(e);
255 return true;
256 }
257 if (nanos <= 0)
258 return false;
259 try {
260 nanos = notFull.awaitNanos(nanos);
261 } catch (InterruptedException ie) {
262 notFull.signal(); // propagate to non-interrupted thread
263 throw ie;
264 }
265 }
266 } finally {
267 lock.unlock();
268 }
269 }
270
271 public E poll() {
272 final ReentrantLock lock = this.lock;
273 lock.lock();
274 try {
275 if (count == 0)
276 return null;
277 E x = extract();
278 return x;
279 } finally {
280 lock.unlock();
281 }
282 }
283
284 public E take() throws InterruptedException {
285 final ReentrantLock lock = this.lock;
286 lock.lockInterruptibly();
287 try {
288 try {
289 while (count == 0)
290 notEmpty.await();
291 } catch (InterruptedException ie) {
292 notEmpty.signal(); // propagate to non-interrupted thread
293 throw ie;
294 }
295 E x = extract();
296 return x;
297 } finally {
298 lock.unlock();
299 }
300 }
301
302 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
303 long nanos = unit.toNanos(timeout);
304 final ReentrantLock lock = this.lock;
305 lock.lockInterruptibly();
306 try {
307 for (;;) {
308 if (count != 0) {
309 E x = extract();
310 return x;
311 }
312 if (nanos <= 0)
313 return null;
314 try {
315 nanos = notEmpty.awaitNanos(nanos);
316 } catch (InterruptedException ie) {
317 notEmpty.signal(); // propagate to non-interrupted thread
318 throw ie;
319 }
320
321 }
322 } finally {
323 lock.unlock();
324 }
325 }
326
327 public E peek() {
328 final ReentrantLock lock = this.lock;
329 lock.lock();
330 try {
331 return (count == 0) ? null : items[takeIndex];
332 } finally {
333 lock.unlock();
334 }
335 }
336
337 // this doc comment is overridden to remove the reference to collections
338 // greater in size than Integer.MAX_VALUE
339 /**
340 * Returns the number of elements in this queue.
341 *
342 * @return the number of elements in this queue
343 */
344 public int size() {
345 final ReentrantLock lock = this.lock;
346 lock.lock();
347 try {
348 return count;
349 } finally {
350 lock.unlock();
351 }
352 }
353
354 // this doc comment is a modified copy of the inherited doc comment,
355 // without the reference to unlimited queues.
356 /**
357 * Returns the number of additional elements that this queue can ideally
358 * (in the absence of memory or resource constraints) accept without
359 * blocking. This is always equal to the initial capacity of this queue
360 * less the current <tt>size</tt> of this queue.
361 *
362 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
363 * an element will succeed by inspecting <tt>remainingCapacity</tt>
364 * because it may be the case that another thread is about to
365 * insert or remove an element.
366 */
367 public int remainingCapacity() {
368 final ReentrantLock lock = this.lock;
369 lock.lock();
370 try {
371 return items.length - count;
372 } finally {
373 lock.unlock();
374 }
375 }
376
377 /**
378 * Removes a single instance of the specified element from this queue,
379 * if it is present. More formally, removes an element <tt>e</tt> such
380 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
381 * elements.
382 * Returns <tt>true</tt> if this queue contained the specified element
383 * (or equivalently, if this queue changed as a result of the call).
384 *
385 * @param o element to be removed from this queue, if present
386 * @return <tt>true</tt> if this queue changed as a result of the call
387 */
388 public boolean remove(Object o) {
389 if (o == null) return false;
390 final E[] items = this.items;
391 final ReentrantLock lock = this.lock;
392 lock.lock();
393 try {
394 int i = takeIndex;
395 int k = 0;
396 for (;;) {
397 if (k++ >= count)
398 return false;
399 if (o.equals(items[i])) {
400 removeAt(i);
401 return true;
402 }
403 i = inc(i);
404 }
405
406 } finally {
407 lock.unlock();
408 }
409 }
410
411 /**
412 * Returns <tt>true</tt> if this queue contains the specified element.
413 * More formally, returns <tt>true</tt> if and only if this queue contains
414 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
415 *
416 * @param o object to be checked for containment in this queue
417 * @return <tt>true</tt> if this queue contains the specified element
418 */
419 public boolean contains(Object o) {
420 if (o == null) return false;
421 final E[] items = this.items;
422 final ReentrantLock lock = this.lock;
423 lock.lock();
424 try {
425 int i = takeIndex;
426 int k = 0;
427 while (k++ < count) {
428 if (o.equals(items[i]))
429 return true;
430 i = inc(i);
431 }
432 return false;
433 } finally {
434 lock.unlock();
435 }
436 }
437
438 /**
439 * Returns an array containing all of the elements in this queue, in
440 * proper sequence.
441 *
442 * <p>The returned array will be "safe" in that no references to it are
443 * maintained by this queue. (In other words, this method must allocate
444 * a new array). The caller is thus free to modify the returned array.
445 *
446 * <p>This method acts as bridge between array-based and collection-based
447 * APIs.
448 *
449 * @return an array containing all of the elements in this queue
450 */
451 public Object[] toArray() {
452 final E[] items = this.items;
453 final ReentrantLock lock = this.lock;
454 lock.lock();
455 try {
456 Object[] a = new Object[count];
457 int k = 0;
458 int i = takeIndex;
459 while (k < count) {
460 a[k++] = items[i];
461 i = inc(i);
462 }
463 return a;
464 } finally {
465 lock.unlock();
466 }
467 }
468
469 /**
470 * Returns an array containing all of the elements in this queue, in
471 * proper sequence; the runtime type of the returned array is that of
472 * the specified array. If the queue fits in the specified array, it
473 * is returned therein. Otherwise, a new array is allocated with the
474 * runtime type of the specified array and the size of this queue.
475 *
476 * <p>If this queue fits in the specified array with room to spare
477 * (i.e., the array has more elements than this queue), the element in
478 * the array immediately following the end of the queue is set to
479 * <tt>null</tt>.
480 *
481 * <p>Like the {@link #toArray()} method, this method acts as bridge between
482 * array-based and collection-based APIs. Further, this method allows
483 * precise control over the runtime type of the output array, and may,
484 * under certain circumstances, be used to save allocation costs.
485 *
486 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
487 * The following code can be used to dump the queue into a newly
488 * allocated array of <tt>String</tt>:
489 *
490 * <pre>
491 * String[] y = x.toArray(new String[0]);</pre>
492 *
493 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
494 * <tt>toArray()</tt>.
495 *
496 * @param a the array into which the elements of the queue are to
497 * be stored, if it is big enough; otherwise, a new array of the
498 * same runtime type is allocated for this purpose
499 * @return an array containing all of the elements in this queue
500 * @throws ArrayStoreException if the runtime type of the specified array
501 * is not a supertype of the runtime type of every element in
502 * this queue
503 * @throws NullPointerException if the specified array is null
504 */
505 @SuppressWarnings("unchecked")
506 public <T> T[] toArray(T[] a) {
507 final E[] items = this.items;
508 final ReentrantLock lock = this.lock;
509 lock.lock();
510 try {
511 if (a.length < count)
512 a = (T[])java.lang.reflect.Array.newInstance(
513 a.getClass().getComponentType(),
514 count
515 );
516
517 int k = 0;
518 int i = takeIndex;
519 while (k < count) {
520 a[k++] = (T)items[i];
521 i = inc(i);
522 }
523 if (a.length > count)
524 a[count] = null;
525 return a;
526 } finally {
527 lock.unlock();
528 }
529 }
530
531 public String toString() {
532 final ReentrantLock lock = this.lock;
533 lock.lock();
534 try {
535 return super.toString();
536 } finally {
537 lock.unlock();
538 }
539 }
540
541 /**
542 * Atomically removes all of the elements from this queue.
543 * The queue will be empty after this call returns.
544 */
545 public void clear() {
546 final E[] items = this.items;
547 final ReentrantLock lock = this.lock;
548 lock.lock();
549 try {
550 int i = takeIndex;
551 int k = count;
552 while (k-- > 0) {
553 items[i] = null;
554 i = inc(i);
555 }
556 count = 0;
557 putIndex = 0;
558 takeIndex = 0;
559 notFull.signalAll();
560 } finally {
561 lock.unlock();
562 }
563 }
564
565 /**
566 * @throws UnsupportedOperationException {@inheritDoc}
567 * @throws ClassCastException {@inheritDoc}
568 * @throws NullPointerException {@inheritDoc}
569 * @throws IllegalArgumentException {@inheritDoc}
570 */
571 public int drainTo(Collection<? super E> c) {
572 if (c == null)
573 throw new NullPointerException();
574 if (c == this)
575 throw new IllegalArgumentException();
576 final E[] items = this.items;
577 final ReentrantLock lock = this.lock;
578 lock.lock();
579 try {
580 int i = takeIndex;
581 int n = 0;
582 int max = count;
583 while (n < max) {
584 c.add(items[i]);
585 items[i] = null;
586 i = inc(i);
587 ++n;
588 }
589 if (n > 0) {
590 count = 0;
591 putIndex = 0;
592 takeIndex = 0;
593 notFull.signalAll();
594 }
595 return n;
596 } finally {
597 lock.unlock();
598 }
599 }
600
601 /**
602 * @throws UnsupportedOperationException {@inheritDoc}
603 * @throws ClassCastException {@inheritDoc}
604 * @throws NullPointerException {@inheritDoc}
605 * @throws IllegalArgumentException {@inheritDoc}
606 */
607 public int drainTo(Collection<? super E> c, int maxElements) {
608 if (c == null)
609 throw new NullPointerException();
610 if (c == this)
611 throw new IllegalArgumentException();
612 if (maxElements <= 0)
613 return 0;
614 final E[] items = this.items;
615 final ReentrantLock lock = this.lock;
616 lock.lock();
617 try {
618 int i = takeIndex;
619 int n = 0;
620 int max = (maxElements < count)? maxElements : count;
621 while (n < max) {
622 c.add(items[i]);
623 items[i] = null;
624 i = inc(i);
625 ++n;
626 }
627 if (n > 0) {
628 count -= n;
629 takeIndex = i;
630 notFull.signalAll();
631 }
632 return n;
633 } finally {
634 lock.unlock();
635 }
636 }
637
638
639 /**
640 * Returns an iterator over the elements in this queue in proper sequence.
641 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
642 * will never throw {@link ConcurrentModificationException},
643 * and guarantees to traverse elements as they existed upon
644 * construction of the iterator, and may (but is not guaranteed to)
645 * reflect any modifications subsequent to construction.
646 *
647 * @return an iterator over the elements in this queue in proper sequence
648 */
649 public Iterator<E> iterator() {
650 final ReentrantLock lock = this.lock;
651 lock.lock();
652 try {
653 return new Itr();
654 } finally {
655 lock.unlock();
656 }
657 }
658
659 /**
660 * Iterator for ArrayBlockingQueue
661 */
662 private class Itr implements Iterator<E> {
663 /**
664 * Index of element to be returned by next,
665 * or a negative number if no such.
666 */
667 private int nextIndex;
668
669 /**
670 * nextItem holds on to item fields because once we claim
671 * that an element exists in hasNext(), we must return it in
672 * the following next() call even if it was in the process of
673 * being removed when hasNext() was called.
674 */
675 private E nextItem;
676
677 /**
678 * Index of element returned by most recent call to next.
679 * Reset to -1 if this element is deleted by a call to remove.
680 */
681 private int lastRet;
682
683 Itr() {
684 lastRet = -1;
685 if (count == 0)
686 nextIndex = -1;
687 else {
688 nextIndex = takeIndex;
689 nextItem = items[takeIndex];
690 }
691 }
692
693 public boolean hasNext() {
694 /*
695 * No sync. We can return true by mistake here
696 * only if this iterator passed across threads,
697 * which we don't support anyway.
698 */
699 return nextIndex >= 0;
700 }
701
702 /**
703 * Checks whether nextIndex is valid; if so setting nextItem.
704 * Stops iterator when either hits putIndex or sees null item.
705 */
706 private void checkNext() {
707 if (nextIndex == putIndex) {
708 nextIndex = -1;
709 nextItem = null;
710 } else {
711 nextItem = items[nextIndex];
712 if (nextItem == null)
713 nextIndex = -1;
714 }
715 }
716
717 public E next() {
718 final ReentrantLock lock = DismissableBlockingQueue.this.lock;
719 lock.lock();
720 try {
721 if (nextIndex < 0)
722 throw new NoSuchElementException();
723 lastRet = nextIndex;
724 E x = nextItem;
725 nextIndex = inc(nextIndex);
726 checkNext();
727 return x;
728 } finally {
729 lock.unlock();
730 }
731 }
732
733 public void remove() {
734 final ReentrantLock lock = DismissableBlockingQueue.this.lock;
735 lock.lock();
736 try {
737 int i = lastRet;
738 if (i == -1)
739 throw new IllegalStateException();
740 lastRet = -1;
741
742 int ti = takeIndex;
743 removeAt(i);
744 // back up cursor (reset to front if was first element)
745 nextIndex = (i == ti) ? takeIndex : i;
746 checkNext();
747 } finally {
748 lock.unlock();
749 }
750 }
751 }
752 }