View Javadoc

1   package de.desy.acop.displayers.tools;
2   
3   /*
4    * @(#)ArrayBlockingQueue.java	1.14 06/06/01
5    *
6    * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
7    * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
8    */
9   
10  
11  import java.util.concurrent.BlockingQueue;
12  import java.util.concurrent.TimeUnit;
13  import java.util.concurrent.locks.*;
14  import java.util.*;
15  
16  /**
17   * Implementation of BlockingQueueu with fixed capacity. If maximum capacity is reached the
18   * last item in the queue is replaced with the newly added item.
19   * 
20   * @author Doug Lea
21   * @author Jaka Bobnar, Cosylab
22   * @param <E> the type of elements held in this collection
23   */
24  public class DismissableBlockingQueue<E> extends AbstractQueue<E>
25          implements BlockingQueue<E>, java.io.Serializable {
26  
27      /**
28       * Serialization ID. This class relies on default serialization
29       * even for the items array, which is default-serialized, even if
30       * it is empty. Otherwise it could not be declared final, which is
31       * necessary here.
32       */
33      private static final long serialVersionUID = -817911632652898426L;
34  
35      /** The queued items  */
36      private final E[] items;
37      /** items index for next take, poll or remove */
38      private int takeIndex;
39      /** items index for next put, offer, or add. */
40      private int putIndex;
41      /** Number of items in the queue */
42      private int count;
43  
44      /*
45       * Concurrency control uses the classic two-condition algorithm
46       * found in any textbook.
47       */
48  
49      /** Main lock guarding all access */
50      private final ReentrantLock lock;
51      /** Condition for waiting takes */
52      private final Condition notEmpty;
53      /** Condition for waiting puts */
54      private final Condition notFull;
55  
56      // Internal helper methods
57  
58      /**
59       * Circularly increment i.
60       */
61      final int inc(int i) {
62          return (++i == items.length)? 0 : i;
63      }
64  
65      final int dec(int i) {
66          return (--i == -1)?  items.length -1 : i;
67      }
68      
69      /**
70       * Inserts element at current put position, advances, and signals.
71       * Call only when holding lock.
72       */
73      private void insert(E x) {
74      	if (count == items.length) {
75      		if (putIndex == 0) {
76      			items[items.length-1] = x;
77      		} else {
78      			items[putIndex-1] = x;
79      		}
80      		notEmpty.signal();
81      	} else {
82  	        items[putIndex] = x;
83  	        putIndex = inc(putIndex);
84  	        ++count;
85  	        notEmpty.signal();
86      	}
87      }
88  
89      /**
90       * Extracts element at current take position, advances, and signals.
91       * Call only when holding lock.
92       */
93      private E extract() {
94          final E[] items = this.items;
95          E x = items[takeIndex];
96          items[takeIndex] = null;
97          takeIndex = inc(takeIndex);
98          --count;
99          notFull.signal();
100         return x;
101     }
102 
103     /**
104      * Utility for remove and iterator.remove: Delete item at position i.
105      * Call only when holding lock.
106      */
107     void removeAt(int i) {
108         final E[] items = this.items;
109         // if removing front item, just advance
110         if (i == takeIndex) {
111             items[takeIndex] = null;
112             takeIndex = inc(takeIndex);
113         } else {
114             // slide over all others up through putIndex.
115             for (;;) {
116                 int nexti = inc(i);
117                 if (nexti != putIndex) {
118                     items[i] = items[nexti];
119                     i = nexti;
120                 } else {
121                     items[i] = null;
122                     putIndex = i;
123                     break;
124                 }
125             }
126         }
127         --count;
128         notFull.signal();
129     }
130 
131     /**
132      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
133      * capacity and default access policy.
134      *
135      * @param capacity the capacity of this queue
136      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
137      */
138     public DismissableBlockingQueue(int capacity) {
139         this(capacity, false);
140     }
141 
142     /**
143      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
144      * capacity and the specified access policy.
145      *
146      * @param capacity the capacity of this queue
147      * @param fair if <tt>true</tt> then queue accesses for threads blocked
148      *        on insertion or removal, are processed in FIFO order;
149      *        if <tt>false</tt> the access order is unspecified.
150      * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
151      */
152     @SuppressWarnings("unchecked")
153 	public DismissableBlockingQueue(int capacity, boolean fair) {
154         if (capacity <= 0)
155             throw new IllegalArgumentException();
156         this.items = (E[]) new Object[capacity];
157         lock = new ReentrantLock(fair);
158         notEmpty = lock.newCondition();
159         notFull =  lock.newCondition();
160     }
161 
162     /**
163      * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
164      * capacity, the specified access policy and initially containing the
165      * elements of the given collection,
166      * added in traversal order of the collection's iterator.
167      *
168      * @param capacity the capacity of this queue
169      * @param fair if <tt>true</tt> then queue accesses for threads blocked
170      *        on insertion or removal, are processed in FIFO order;
171      *        if <tt>false</tt> the access order is unspecified.
172      * @param c the collection of elements to initially contain
173      * @throws IllegalArgumentException if <tt>capacity</tt> is less than
174      *         <tt>c.size()</tt>, or less than 1.
175      * @throws NullPointerException if the specified collection or any
176      *         of its elements are null
177      */
178     public DismissableBlockingQueue(int capacity, boolean fair,
179                               Collection<? extends E> c) {
180         this(capacity, fair);
181         if (capacity < c.size())
182             throw new IllegalArgumentException();
183 
184         for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
185             add(it.next());
186     }
187 
188     /**
189      * Inserts the specified element at the tail of this queue if it is
190      * possible to do so immediately without exceeding the queue's capacity,
191      * returning <tt>true</tt> upon success and throwing an
192      * <tt>IllegalStateException</tt> if this queue is full.
193      *
194      * @param e the element to add
195      * @return <tt>true</tt> (as specified by {@link Collection#add})
196      * @throws IllegalStateException if this queue is full
197      * @throws NullPointerException if the specified element is null
198      */
199     public boolean add(E e) {
200 	return super.add(e);
201     }
202 
203     /**
204      * Inserts the specified element at the tail of this queue if it is
205      * possible to do so immediately without exceeding the queue's capacity,
206      * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
207      * is full.  This method is generally preferable to method {@link #add},
208      * which can fail to insert an element only by throwing an exception.
209      *
210      * @throws NullPointerException if the specified element is null
211      */
212     public boolean offer(E e) {
213         if (e == null) throw new NullPointerException();
214         final ReentrantLock lock = this.lock;
215         lock.lock();
216         try {
217         	insert(e);
218         	return true;
219         } finally {
220             lock.unlock();
221         }
222     }
223 
224     /**
225      * Inserts the specified element at the tail of this queue, waiting
226      * for space to become available if the queue is full.
227      *
228      * @throws InterruptedException {@inheritDoc}
229      * @throws NullPointerException {@inheritDoc}
230      */
231     public void put(E e) throws InterruptedException {
232         if (e == null) throw new NullPointerException();
233         offer(e);
234     }
235 
236     /**
237      * Inserts the specified element at the tail of this queue, waiting
238      * up to the specified wait time for space to become available if
239      * the queue is full.
240      *
241      * @throws InterruptedException {@inheritDoc}
242      * @throws NullPointerException {@inheritDoc}
243      */
244     public boolean offer(E e, long timeout, TimeUnit unit)
245         throws InterruptedException {
246 
247         if (e == null) throw new NullPointerException();
248 	long nanos = unit.toNanos(timeout);
249         final ReentrantLock lock = this.lock;
250         lock.lockInterruptibly();
251         try {
252             for (;;) {
253                 if (count != items.length) {
254                     insert(e);
255                     return true;
256                 }
257                 if (nanos <= 0)
258                     return false;
259                 try {
260                     nanos = notFull.awaitNanos(nanos);
261                 } catch (InterruptedException ie) {
262                     notFull.signal(); // propagate to non-interrupted thread
263                     throw ie;
264                 }
265             }
266         } finally {
267             lock.unlock();
268         }
269     }
270 
271     public E poll() {
272         final ReentrantLock lock = this.lock;
273         lock.lock();
274         try {
275             if (count == 0)
276                 return null;
277             E x = extract();
278             return x;
279         } finally {
280             lock.unlock();
281         }
282     }
283 
284     public E take() throws InterruptedException {
285         final ReentrantLock lock = this.lock;
286         lock.lockInterruptibly();
287         try {
288             try {
289                 while (count == 0)
290                     notEmpty.await();
291             } catch (InterruptedException ie) {
292                 notEmpty.signal(); // propagate to non-interrupted thread
293                 throw ie;
294             }
295             E x = extract();
296             return x;
297         } finally {
298             lock.unlock();
299         }
300     }
301 
302     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
303 	long nanos = unit.toNanos(timeout);
304         final ReentrantLock lock = this.lock;
305         lock.lockInterruptibly();
306         try {
307             for (;;) {
308                 if (count != 0) {
309                     E x = extract();
310                     return x;
311                 }
312                 if (nanos <= 0)
313                     return null;
314                 try {
315                     nanos = notEmpty.awaitNanos(nanos);
316                 } catch (InterruptedException ie) {
317                     notEmpty.signal(); // propagate to non-interrupted thread
318                     throw ie;
319                 }
320 
321             }
322         } finally {
323             lock.unlock();
324         }
325     }
326 
327     public E peek() {
328         final ReentrantLock lock = this.lock;
329         lock.lock();
330         try {
331             return (count == 0) ? null : items[takeIndex];
332         } finally {
333             lock.unlock();
334         }
335     }
336 
337     // this doc comment is overridden to remove the reference to collections
338     // greater in size than Integer.MAX_VALUE
339     /**
340      * Returns the number of elements in this queue.
341      *
342      * @return the number of elements in this queue
343      */
344     public int size() {
345         final ReentrantLock lock = this.lock;
346         lock.lock();
347         try {
348             return count;
349         } finally {
350             lock.unlock();
351         }
352     }
353 
354     // this doc comment is a modified copy of the inherited doc comment,
355     // without the reference to unlimited queues.
356     /**
357      * Returns the number of additional elements that this queue can ideally
358      * (in the absence of memory or resource constraints) accept without
359      * blocking. This is always equal to the initial capacity of this queue
360      * less the current <tt>size</tt> of this queue.
361      *
362      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
363      * an element will succeed by inspecting <tt>remainingCapacity</tt>
364      * because it may be the case that another thread is about to
365      * insert or remove an element.
366      */
367     public int remainingCapacity() {
368         final ReentrantLock lock = this.lock;
369         lock.lock();
370         try {
371             return items.length - count;
372         } finally {
373             lock.unlock();
374         }
375     }
376 
377     /**
378      * Removes a single instance of the specified element from this queue,
379      * if it is present.  More formally, removes an element <tt>e</tt> such
380      * that <tt>o.equals(e)</tt>, if this queue contains one or more such
381      * elements.
382      * Returns <tt>true</tt> if this queue contained the specified element
383      * (or equivalently, if this queue changed as a result of the call).
384      *
385      * @param o element to be removed from this queue, if present
386      * @return <tt>true</tt> if this queue changed as a result of the call
387      */
388     public boolean remove(Object o) {
389         if (o == null) return false;
390         final E[] items = this.items;
391         final ReentrantLock lock = this.lock;
392         lock.lock();
393         try {
394             int i = takeIndex;
395             int k = 0;
396             for (;;) {
397                 if (k++ >= count)
398                     return false;
399                 if (o.equals(items[i])) {
400                     removeAt(i);
401                     return true;
402                 }
403                 i = inc(i);
404             }
405 
406         } finally {
407             lock.unlock();
408         }
409     }
410 
411     /**
412      * Returns <tt>true</tt> if this queue contains the specified element.
413      * More formally, returns <tt>true</tt> if and only if this queue contains
414      * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
415      *
416      * @param o object to be checked for containment in this queue
417      * @return <tt>true</tt> if this queue contains the specified element
418      */
419     public boolean contains(Object o) {
420         if (o == null) return false;
421         final E[] items = this.items;
422         final ReentrantLock lock = this.lock;
423         lock.lock();
424         try {
425             int i = takeIndex;
426             int k = 0;
427             while (k++ < count) {
428                 if (o.equals(items[i]))
429                     return true;
430                 i = inc(i);
431             }
432             return false;
433         } finally {
434             lock.unlock();
435         }
436     }
437 
438     /**
439      * Returns an array containing all of the elements in this queue, in
440      * proper sequence.
441      *
442      * <p>The returned array will be "safe" in that no references to it are
443      * maintained by this queue.  (In other words, this method must allocate
444      * a new array).  The caller is thus free to modify the returned array.
445      *
446      * <p>This method acts as bridge between array-based and collection-based
447      * APIs.
448      *
449      * @return an array containing all of the elements in this queue
450      */
451     public Object[] toArray() {
452         final E[] items = this.items;
453         final ReentrantLock lock = this.lock;
454         lock.lock();
455         try {
456             Object[] a = new Object[count];
457             int k = 0;
458             int i = takeIndex;
459             while (k < count) {
460                 a[k++] = items[i];
461                 i = inc(i);
462             }
463             return a;
464         } finally {
465             lock.unlock();
466         }
467     }
468 
469     /**
470      * Returns an array containing all of the elements in this queue, in
471      * proper sequence; the runtime type of the returned array is that of
472      * the specified array.  If the queue fits in the specified array, it
473      * is returned therein.  Otherwise, a new array is allocated with the
474      * runtime type of the specified array and the size of this queue.
475      *
476      * <p>If this queue fits in the specified array with room to spare
477      * (i.e., the array has more elements than this queue), the element in
478      * the array immediately following the end of the queue is set to
479      * <tt>null</tt>.
480      *
481      * <p>Like the {@link #toArray()} method, this method acts as bridge between
482      * array-based and collection-based APIs.  Further, this method allows
483      * precise control over the runtime type of the output array, and may,
484      * under certain circumstances, be used to save allocation costs.
485      *
486      * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
487      * The following code can be used to dump the queue into a newly
488      * allocated array of <tt>String</tt>:
489      *
490      * <pre>
491      *     String[] y = x.toArray(new String[0]);</pre>
492      *
493      * Note that <tt>toArray(new Object[0])</tt> is identical in function to
494      * <tt>toArray()</tt>.
495      *
496      * @param a the array into which the elements of the queue are to
497      *          be stored, if it is big enough; otherwise, a new array of the
498      *          same runtime type is allocated for this purpose
499      * @return an array containing all of the elements in this queue
500      * @throws ArrayStoreException if the runtime type of the specified array
501      *         is not a supertype of the runtime type of every element in
502      *         this queue
503      * @throws NullPointerException if the specified array is null
504      */
505     @SuppressWarnings("unchecked")
506 	public <T> T[] toArray(T[] a) {
507         final E[] items = this.items;
508         final ReentrantLock lock = this.lock;
509         lock.lock();
510         try {
511             if (a.length < count)
512                 a = (T[])java.lang.reflect.Array.newInstance(
513                     a.getClass().getComponentType(),
514                     count
515                     );
516 
517             int k = 0;
518             int i = takeIndex;
519             while (k < count) {
520                 a[k++] = (T)items[i];
521                 i = inc(i);
522             }
523             if (a.length > count)
524                 a[count] = null;
525             return a;
526         } finally {
527             lock.unlock();
528         }
529     }
530 
531     public String toString() {
532         final ReentrantLock lock = this.lock;
533         lock.lock();
534         try {
535             return super.toString();
536         } finally {
537             lock.unlock();
538         }
539     }
540 
541     /**
542      * Atomically removes all of the elements from this queue.
543      * The queue will be empty after this call returns.
544      */
545     public void clear() {
546         final E[] items = this.items;
547         final ReentrantLock lock = this.lock;
548         lock.lock();
549         try {
550             int i = takeIndex;
551             int k = count;
552             while (k-- > 0) {
553                 items[i] = null;
554                 i = inc(i);
555             }
556             count = 0;
557             putIndex = 0;
558             takeIndex = 0;
559             notFull.signalAll();
560         } finally {
561             lock.unlock();
562         }
563     }
564 
565     /**
566      * @throws UnsupportedOperationException {@inheritDoc}
567      * @throws ClassCastException            {@inheritDoc}
568      * @throws NullPointerException          {@inheritDoc}
569      * @throws IllegalArgumentException      {@inheritDoc}
570      */
571     public int drainTo(Collection<? super E> c) {
572         if (c == null)
573             throw new NullPointerException();
574         if (c == this)
575             throw new IllegalArgumentException();
576         final E[] items = this.items;
577         final ReentrantLock lock = this.lock;
578         lock.lock();
579         try {
580             int i = takeIndex;
581             int n = 0;
582             int max = count;
583             while (n < max) {
584                 c.add(items[i]);
585                 items[i] = null;
586                 i = inc(i);
587                 ++n;
588             }
589             if (n > 0) {
590                 count = 0;
591                 putIndex = 0;
592                 takeIndex = 0;
593                 notFull.signalAll();
594             }
595             return n;
596         } finally {
597             lock.unlock();
598         }
599     }
600 
601     /**
602      * @throws UnsupportedOperationException {@inheritDoc}
603      * @throws ClassCastException            {@inheritDoc}
604      * @throws NullPointerException          {@inheritDoc}
605      * @throws IllegalArgumentException      {@inheritDoc}
606      */
607     public int drainTo(Collection<? super E> c, int maxElements) {
608         if (c == null)
609             throw new NullPointerException();
610         if (c == this)
611             throw new IllegalArgumentException();
612         if (maxElements <= 0)
613             return 0;
614         final E[] items = this.items;
615         final ReentrantLock lock = this.lock;
616         lock.lock();
617         try {
618             int i = takeIndex;
619             int n = 0;
620             int max = (maxElements < count)? maxElements : count;
621             while (n < max) {
622                 c.add(items[i]);
623                 items[i] = null;
624                 i = inc(i);
625                 ++n;
626             }
627             if (n > 0) {
628                 count -= n;
629                 takeIndex = i;
630                 notFull.signalAll();
631             }
632             return n;
633         } finally {
634             lock.unlock();
635         }
636     }
637 
638 
639     /**
640      * Returns an iterator over the elements in this queue in proper sequence.
641      * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
642      * will never throw {@link ConcurrentModificationException},
643      * and guarantees to traverse elements as they existed upon
644      * construction of the iterator, and may (but is not guaranteed to)
645      * reflect any modifications subsequent to construction.
646      *
647      * @return an iterator over the elements in this queue in proper sequence
648      */
649     public Iterator<E> iterator() {
650         final ReentrantLock lock = this.lock;
651         lock.lock();
652         try {
653             return new Itr();
654         } finally {
655             lock.unlock();
656         }
657     }
658 
659     /**
660      * Iterator for ArrayBlockingQueue
661      */
662     private class Itr implements Iterator<E> {
663         /**
664          * Index of element to be returned by next,
665          * or a negative number if no such.
666          */
667         private int nextIndex;
668 
669         /**
670          * nextItem holds on to item fields because once we claim
671          * that an element exists in hasNext(), we must return it in
672          * the following next() call even if it was in the process of
673          * being removed when hasNext() was called.
674          */
675         private E nextItem;
676 
677         /**
678          * Index of element returned by most recent call to next.
679          * Reset to -1 if this element is deleted by a call to remove.
680          */
681         private int lastRet;
682 
683         Itr() {
684             lastRet = -1;
685             if (count == 0)
686                 nextIndex = -1;
687             else {
688                 nextIndex = takeIndex;
689                 nextItem = items[takeIndex];
690             }
691         }
692 
693         public boolean hasNext() {
694             /*
695              * No sync. We can return true by mistake here
696              * only if this iterator passed across threads,
697              * which we don't support anyway.
698              */
699             return nextIndex >= 0;
700         }
701 
702         /**
703          * Checks whether nextIndex is valid; if so setting nextItem.
704          * Stops iterator when either hits putIndex or sees null item.
705          */
706         private void checkNext() {
707             if (nextIndex == putIndex) {
708                 nextIndex = -1;
709                 nextItem = null;
710             } else {
711                 nextItem = items[nextIndex];
712                 if (nextItem == null)
713                     nextIndex = -1;
714             }
715         }
716 
717         public E next() {
718             final ReentrantLock lock = DismissableBlockingQueue.this.lock;
719             lock.lock();
720             try {
721                 if (nextIndex < 0)
722                     throw new NoSuchElementException();
723                 lastRet = nextIndex;
724                 E x = nextItem;
725                 nextIndex = inc(nextIndex);
726                 checkNext();
727                 return x;
728             } finally {
729                 lock.unlock();
730             }
731         }
732 
733         public void remove() {
734             final ReentrantLock lock = DismissableBlockingQueue.this.lock;
735             lock.lock();
736             try {
737                 int i = lastRet;
738                 if (i == -1)
739                     throw new IllegalStateException();
740                 lastRet = -1;
741 
742                 int ti = takeIndex;
743                 removeAt(i);
744                 // back up cursor (reset to front if was first element)
745                 nextIndex = (i == ti) ? takeIndex : i;
746                 checkNext();
747             } finally {
748                 lock.unlock();
749             }
750         }
751     }
752 }