1 package de.desy.acop.displayers.tools;
2
3
4
5
6
7
8
9
10
11 import java.util.concurrent.BlockingQueue;
12 import java.util.concurrent.TimeUnit;
13 import java.util.concurrent.locks.*;
14 import java.util.*;
15
16
17
18
19
20
21
22
23
24 public class DismissableBlockingQueue<E> extends AbstractQueue<E>
25 implements BlockingQueue<E>, java.io.Serializable {
26
27
28
29
30
31
32
33 private static final long serialVersionUID = -817911632652898426L;
34
35
36 private final E[] items;
37
38 private int takeIndex;
39
40 private int putIndex;
41
42 private int count;
43
44
45
46
47
48
49
50 private final ReentrantLock lock;
51
52 private final Condition notEmpty;
53
54 private final Condition notFull;
55
56
57
58
59
60
61 final int inc(int i) {
62 return (++i == items.length)? 0 : i;
63 }
64
65 final int dec(int i) {
66 return (--i == -1)? items.length -1 : i;
67 }
68
69
70
71
72
73 private void insert(E x) {
74 if (count == items.length) {
75 if (putIndex == 0) {
76 items[items.length-1] = x;
77 } else {
78 items[putIndex-1] = x;
79 }
80 notEmpty.signal();
81 } else {
82 items[putIndex] = x;
83 putIndex = inc(putIndex);
84 ++count;
85 notEmpty.signal();
86 }
87 }
88
89
90
91
92
93 private E extract() {
94 final E[] items = this.items;
95 E x = items[takeIndex];
96 items[takeIndex] = null;
97 takeIndex = inc(takeIndex);
98 --count;
99 notFull.signal();
100 return x;
101 }
102
103
104
105
106
107 void removeAt(int i) {
108 final E[] items = this.items;
109
110 if (i == takeIndex) {
111 items[takeIndex] = null;
112 takeIndex = inc(takeIndex);
113 } else {
114
115 for (;;) {
116 int nexti = inc(i);
117 if (nexti != putIndex) {
118 items[i] = items[nexti];
119 i = nexti;
120 } else {
121 items[i] = null;
122 putIndex = i;
123 break;
124 }
125 }
126 }
127 --count;
128 notFull.signal();
129 }
130
131
132
133
134
135
136
137
138 public DismissableBlockingQueue(int capacity) {
139 this(capacity, false);
140 }
141
142
143
144
145
146
147
148
149
150
151
152 @SuppressWarnings("unchecked")
153 public DismissableBlockingQueue(int capacity, boolean fair) {
154 if (capacity <= 0)
155 throw new IllegalArgumentException();
156 this.items = (E[]) new Object[capacity];
157 lock = new ReentrantLock(fair);
158 notEmpty = lock.newCondition();
159 notFull = lock.newCondition();
160 }
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178 public DismissableBlockingQueue(int capacity, boolean fair,
179 Collection<? extends E> c) {
180 this(capacity, fair);
181 if (capacity < c.size())
182 throw new IllegalArgumentException();
183
184 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
185 add(it.next());
186 }
187
188
189
190
191
192
193
194
195
196
197
198
199 public boolean add(E e) {
200 return super.add(e);
201 }
202
203
204
205
206
207
208
209
210
211
212 public boolean offer(E e) {
213 if (e == null) throw new NullPointerException();
214 final ReentrantLock lock = this.lock;
215 lock.lock();
216 try {
217 insert(e);
218 return true;
219 } finally {
220 lock.unlock();
221 }
222 }
223
224
225
226
227
228
229
230
231 public void put(E e) throws InterruptedException {
232 if (e == null) throw new NullPointerException();
233 offer(e);
234 }
235
236
237
238
239
240
241
242
243
244 public boolean offer(E e, long timeout, TimeUnit unit)
245 throws InterruptedException {
246
247 if (e == null) throw new NullPointerException();
248 long nanos = unit.toNanos(timeout);
249 final ReentrantLock lock = this.lock;
250 lock.lockInterruptibly();
251 try {
252 for (;;) {
253 if (count != items.length) {
254 insert(e);
255 return true;
256 }
257 if (nanos <= 0)
258 return false;
259 try {
260 nanos = notFull.awaitNanos(nanos);
261 } catch (InterruptedException ie) {
262 notFull.signal();
263 throw ie;
264 }
265 }
266 } finally {
267 lock.unlock();
268 }
269 }
270
271 public E poll() {
272 final ReentrantLock lock = this.lock;
273 lock.lock();
274 try {
275 if (count == 0)
276 return null;
277 E x = extract();
278 return x;
279 } finally {
280 lock.unlock();
281 }
282 }
283
284 public E take() throws InterruptedException {
285 final ReentrantLock lock = this.lock;
286 lock.lockInterruptibly();
287 try {
288 try {
289 while (count == 0)
290 notEmpty.await();
291 } catch (InterruptedException ie) {
292 notEmpty.signal();
293 throw ie;
294 }
295 E x = extract();
296 return x;
297 } finally {
298 lock.unlock();
299 }
300 }
301
302 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
303 long nanos = unit.toNanos(timeout);
304 final ReentrantLock lock = this.lock;
305 lock.lockInterruptibly();
306 try {
307 for (;;) {
308 if (count != 0) {
309 E x = extract();
310 return x;
311 }
312 if (nanos <= 0)
313 return null;
314 try {
315 nanos = notEmpty.awaitNanos(nanos);
316 } catch (InterruptedException ie) {
317 notEmpty.signal();
318 throw ie;
319 }
320
321 }
322 } finally {
323 lock.unlock();
324 }
325 }
326
327 public E peek() {
328 final ReentrantLock lock = this.lock;
329 lock.lock();
330 try {
331 return (count == 0) ? null : items[takeIndex];
332 } finally {
333 lock.unlock();
334 }
335 }
336
337
338
339
340
341
342
343
344 public int size() {
345 final ReentrantLock lock = this.lock;
346 lock.lock();
347 try {
348 return count;
349 } finally {
350 lock.unlock();
351 }
352 }
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367 public int remainingCapacity() {
368 final ReentrantLock lock = this.lock;
369 lock.lock();
370 try {
371 return items.length - count;
372 } finally {
373 lock.unlock();
374 }
375 }
376
377
378
379
380
381
382
383
384
385
386
387
388 public boolean remove(Object o) {
389 if (o == null) return false;
390 final E[] items = this.items;
391 final ReentrantLock lock = this.lock;
392 lock.lock();
393 try {
394 int i = takeIndex;
395 int k = 0;
396 for (;;) {
397 if (k++ >= count)
398 return false;
399 if (o.equals(items[i])) {
400 removeAt(i);
401 return true;
402 }
403 i = inc(i);
404 }
405
406 } finally {
407 lock.unlock();
408 }
409 }
410
411
412
413
414
415
416
417
418
419 public boolean contains(Object o) {
420 if (o == null) return false;
421 final E[] items = this.items;
422 final ReentrantLock lock = this.lock;
423 lock.lock();
424 try {
425 int i = takeIndex;
426 int k = 0;
427 while (k++ < count) {
428 if (o.equals(items[i]))
429 return true;
430 i = inc(i);
431 }
432 return false;
433 } finally {
434 lock.unlock();
435 }
436 }
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451 public Object[] toArray() {
452 final E[] items = this.items;
453 final ReentrantLock lock = this.lock;
454 lock.lock();
455 try {
456 Object[] a = new Object[count];
457 int k = 0;
458 int i = takeIndex;
459 while (k < count) {
460 a[k++] = items[i];
461 i = inc(i);
462 }
463 return a;
464 } finally {
465 lock.unlock();
466 }
467 }
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505 @SuppressWarnings("unchecked")
506 public <T> T[] toArray(T[] a) {
507 final E[] items = this.items;
508 final ReentrantLock lock = this.lock;
509 lock.lock();
510 try {
511 if (a.length < count)
512 a = (T[])java.lang.reflect.Array.newInstance(
513 a.getClass().getComponentType(),
514 count
515 );
516
517 int k = 0;
518 int i = takeIndex;
519 while (k < count) {
520 a[k++] = (T)items[i];
521 i = inc(i);
522 }
523 if (a.length > count)
524 a[count] = null;
525 return a;
526 } finally {
527 lock.unlock();
528 }
529 }
530
531 public String toString() {
532 final ReentrantLock lock = this.lock;
533 lock.lock();
534 try {
535 return super.toString();
536 } finally {
537 lock.unlock();
538 }
539 }
540
541
542
543
544
545 public void clear() {
546 final E[] items = this.items;
547 final ReentrantLock lock = this.lock;
548 lock.lock();
549 try {
550 int i = takeIndex;
551 int k = count;
552 while (k-- > 0) {
553 items[i] = null;
554 i = inc(i);
555 }
556 count = 0;
557 putIndex = 0;
558 takeIndex = 0;
559 notFull.signalAll();
560 } finally {
561 lock.unlock();
562 }
563 }
564
565
566
567
568
569
570
571 public int drainTo(Collection<? super E> c) {
572 if (c == null)
573 throw new NullPointerException();
574 if (c == this)
575 throw new IllegalArgumentException();
576 final E[] items = this.items;
577 final ReentrantLock lock = this.lock;
578 lock.lock();
579 try {
580 int i = takeIndex;
581 int n = 0;
582 int max = count;
583 while (n < max) {
584 c.add(items[i]);
585 items[i] = null;
586 i = inc(i);
587 ++n;
588 }
589 if (n > 0) {
590 count = 0;
591 putIndex = 0;
592 takeIndex = 0;
593 notFull.signalAll();
594 }
595 return n;
596 } finally {
597 lock.unlock();
598 }
599 }
600
601
602
603
604
605
606
607 public int drainTo(Collection<? super E> c, int maxElements) {
608 if (c == null)
609 throw new NullPointerException();
610 if (c == this)
611 throw new IllegalArgumentException();
612 if (maxElements <= 0)
613 return 0;
614 final E[] items = this.items;
615 final ReentrantLock lock = this.lock;
616 lock.lock();
617 try {
618 int i = takeIndex;
619 int n = 0;
620 int max = (maxElements < count)? maxElements : count;
621 while (n < max) {
622 c.add(items[i]);
623 items[i] = null;
624 i = inc(i);
625 ++n;
626 }
627 if (n > 0) {
628 count -= n;
629 takeIndex = i;
630 notFull.signalAll();
631 }
632 return n;
633 } finally {
634 lock.unlock();
635 }
636 }
637
638
639
640
641
642
643
644
645
646
647
648
649 public Iterator<E> iterator() {
650 final ReentrantLock lock = this.lock;
651 lock.lock();
652 try {
653 return new Itr();
654 } finally {
655 lock.unlock();
656 }
657 }
658
659
660
661
662 private class Itr implements Iterator<E> {
663
664
665
666
667 private int nextIndex;
668
669
670
671
672
673
674
675 private E nextItem;
676
677
678
679
680
681 private int lastRet;
682
683 Itr() {
684 lastRet = -1;
685 if (count == 0)
686 nextIndex = -1;
687 else {
688 nextIndex = takeIndex;
689 nextItem = items[takeIndex];
690 }
691 }
692
693 public boolean hasNext() {
694
695
696
697
698
699 return nextIndex >= 0;
700 }
701
702
703
704
705
706 private void checkNext() {
707 if (nextIndex == putIndex) {
708 nextIndex = -1;
709 nextItem = null;
710 } else {
711 nextItem = items[nextIndex];
712 if (nextItem == null)
713 nextIndex = -1;
714 }
715 }
716
717 public E next() {
718 final ReentrantLock lock = DismissableBlockingQueue.this.lock;
719 lock.lock();
720 try {
721 if (nextIndex < 0)
722 throw new NoSuchElementException();
723 lastRet = nextIndex;
724 E x = nextItem;
725 nextIndex = inc(nextIndex);
726 checkNext();
727 return x;
728 } finally {
729 lock.unlock();
730 }
731 }
732
733 public void remove() {
734 final ReentrantLock lock = DismissableBlockingQueue.this.lock;
735 lock.lock();
736 try {
737 int i = lastRet;
738 if (i == -1)
739 throw new IllegalStateException();
740 lastRet = -1;
741
742 int ti = takeIndex;
743 removeAt(i);
744
745 nextIndex = (i == ti) ? takeIndex : i;
746 checkNext();
747 } finally {
748 lock.unlock();
749 }
750 }
751 }
752 }