1 /*
2 * Copyright (c) 2003-2008 by Cosylab d. d.
3 *
4 * This file is part of Java-Common.
5 *
6 * Java-Common is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * Java-Common is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with Java-Common. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 package com.cosylab.util;
21
22 import java.util.List;
23 import java.util.Stack;
24
25
26 /**
27 * Scheduler implementation that prevents single tasks from locking up entire
28 * scheduler. This implementation tries to use minimal number of threads to
29 * execute given tasks.
30 *
31 * <p>
32 * Tasks are scheduled using a priority queue, sorted by next execution time.
33 * Time is measured using wait method, so it is bound to be incorrect. This
34 * implementation is not suitable for scheduling times below 10ms, but it
35 * depends on platform (limits are about 50ms for Windows9x, 10ms for Windows
36 * 2000/XP/NT and 1ms for Linux/Unix).<br>
37 * Each task will run only once at any given time (if task is scheduled to be
38 * executed every 10ms, but it takes 100ms to run, same task will not be
39 * executed 10 times in between).<br>
40 * If a task exceeds its next scheduled execution time while running, next
41 * execution time will be assigned by adding
42 * <code>SchedulerTask.getInterval()</code> after current task has completed.
43 * </p>
44 *
45 * @author <a href="mailto:ales.pucelj@cosylab.com">Ales Pucelj</a>
46 * @version $id$
47 */
48 public class Scheduler extends Thread
49 {
50 private TaskEntry first;
51 private TaskEntry last;
52
53 /** List of threads executing tasks. */
54 private Stack runners = new Stack();
55
56 /** Flag indicating that Scheduler should stop. */
57 private volatile boolean done = false;
58
59 /** Minimum number of runner threads. */
60 private int minRunners = 1;
61 private int maxRunners = 0;
62
63 // closes tisk exparation
64 private long minNextTime;
65 private Object lock = new Object();
66 private final boolean debug = false;
67
68 /**
69 * Thread that runs single task. These threads are automatically allocated
70 * and stopped based on number of tasks that run simultaneously.
71 */
72 private class SchedulerRunner extends Thread
73 {
74 /** Lock on which the thread waits until new task is assigned to it. */
75 private final Object lock = new Object();
76
77 /** Task to execute. */
78 private TaskEntry task;
79
80 /** Flag indicating this thread should stop. */
81 private volatile boolean done;
82
83 /**
84 * Default constructor for ScheduleRunner.
85 */
86 public SchedulerRunner()
87 {
88 done = false;
89 task = null;
90 }
91
92 /**
93 * Assign a task to be executed. This is the task this thread will
94 * execute. New tasks will not be accepted until current completes.
95 *
96 * @param task Task to execute.
97 */
98 public void executeTask(TaskEntry task)
99 {
100 //setName("SchedulerRunner: running '" + task.getName() + "'");
101 this.task = task;
102
103 synchronized (lock) {
104 lock.notifyAll();
105 }
106 }
107
108 /**
109 * Stop this thread. Will wait until current task completes.
110 */
111 public void cancel()
112 {
113 done = true;
114
115 synchronized (lock) {
116 lock.notifyAll();
117 }
118 }
119
120 /**
121 * Blocks until new task is assigned to this thread. Returns task that
122 * was assigned.
123 *
124 * @return Task to execute
125 *
126 * @throws InterruptedException
127 */
128 private TaskEntry getTask() throws InterruptedException
129 {
130 synchronized (lock) {
131 while (task == null) {
132 lock.wait();
133
134 if (done) {
135 return null;
136 }
137 }
138 }
139
140 return task;
141 }
142
143 /**
144 * Loop that executes and runs tasks. This method will block on
145 * getTask() until interrupted or task is assigned.
146 *
147 * @see java.lang.Runnable#run()
148 */
149 public void run()
150 {
151 try {
152 while (!done) {
153 // Make a copy of task to prevent undefined behaviour
154 // in case new task is assigned while running current
155 TaskEntry t = getTask();
156
157 if (done) {
158 return;
159 }
160
161 long delta = 0;
162
163 if (debug) {
164 delta = System.currentTimeMillis() - t.nextTime;
165 }
166
167 t.run();
168 task = null;
169
170 if (!t.isCanceled()) {
171 if (debug) {
172 System.out.println("DELTA= " + delta + " DUTY= "
173 + t.duty + " TASK= " + t.task);
174 }
175
176 returnTask(t);
177
178 // wake scheduler
179 synchronized (Scheduler.this.lock) {
180 Scheduler.this.lock.notifyAll();
181 }
182 } else {
183 if (debug) {
184 System.out.println("IN= "
185 + (System.currentTimeMillis() - t.created)
186 + " DUTY= " + t.duty + " TASK= " + t.task);
187 }
188 }
189
190 returnRunner(this);
191 }
192 } catch (InterruptedException e) {
193 // Do nothing since we exit the loop anyway.
194 }
195 }
196 }
197
198 /**
199 * This is a wrapper arround the Task interface. It allows the Task
200 * interface to be much simpler, since all scheduling related parameters
201 * are stored here. For most methods it maps to the Task interface itself.
202 */
203 private class TaskEntry implements Comparable
204 {
205 /** Next entry in linked list */
206 public TaskEntry next;
207
208 /** Next previous in linked list */
209 public TaskEntry previous;
210
211 // time of creation
212 private long created;
213
214 // time of creation
215 private long duty;
216
217 /**
218 * Next time when this task is to be executed. Can be in the past,
219 * indicating immediate execution.
220 */
221 private long nextTime;
222
223 /** Actual task to execute. */
224 private SchedulerTask task;
225
226 /** Should this task be executed only once or repeated indefinitely. */
227 private boolean repeated;
228
229 /** Is this task to be canceled on next execution. */
230 private boolean canceled = false;
231
232 /**
233 * Creates new TaskEntry, specifying task that will be executed
234 * immediately and only once.
235 *
236 * @param t Task to schedule.
237 */
238 public TaskEntry(SchedulerTask t)
239 {
240 this(t, 0, false);
241 }
242
243 /**
244 * Creates new TaskEntry, specifying the task to be executed after a
245 * given delay and whether this task should be repeated.
246 *
247 * @param t Task to schedule.
248 * @param delay Delay before first execution.
249 * @param isRepeated Should it be repeated.
250 */
251 public TaskEntry(SchedulerTask t, long delay, boolean isRepeated)
252 {
253 task = t;
254 nextTime = System.currentTimeMillis() + delay;
255 repeated = isRepeated;
256 created = System.currentTimeMillis();
257 }
258
259 /**
260 * Returns next time when this task should execute.
261 *
262 * @return Time in milliseconds.
263 */
264 public long getNextTime()
265 {
266 return nextTime;
267 }
268
269 /**
270 * Retunrs name of this task. Name is obtained from
271 * <code>SchedulerTask</code> or set to 'anonymous' if null.
272 *
273 * @return Name of this task.
274 */
275 public String getName()
276 {
277 String name = task.getTaskName();
278
279 if (name == null) {
280 name = "<anonymous>";
281 }
282
283 return name;
284 }
285
286 /**
287 * Will this task be canceled before next execution. Non repeated tasks
288 * return true after they have run once.
289 *
290 * @return True if the task should be canceled.
291 */
292 public boolean isCanceled()
293 {
294 if (repeated) {
295 return task.isCanceled();
296 } else {
297 return canceled;
298 }
299 }
300
301 /**
302 * Comparator implementation for use with priority queue. Compares
303 * scheduled execution times.
304 *
305 * @see java.lang.Comparable#compareTo(java.lang.Object)
306 */
307 public int compareTo(Object o)
308 {
309 long other = ((TaskEntry)o).getNextTime();
310
311 if (nextTime < other) {
312 return -1;
313 }
314
315 if (nextTime > other) {
316 return 1;
317 }
318
319 return 0;
320 }
321
322 /**
323 * Executes the task and sets the new execution times. Also marks
324 * non-repeated tasks for cancelation.
325 */
326 public void run()
327 {
328 duty = System.currentTimeMillis();
329 task.run();
330 duty = System.currentTimeMillis() - duty;
331 canceled = true;
332 nextTime = System.currentTimeMillis() + task.getInterval();
333 }
334
335 /**
336 * Name of the associated task.
337 *
338 * @see java.lang.Object#toString()
339 */
340 public String toString()
341 {
342 return task.toString() + " " + nextTime;
343 }
344 }
345
346 /**
347 * Creates new scheduler and starts it.
348 */
349 public Scheduler()
350 {
351 this(3);
352 }
353
354 /**
355 * Creates a new scheduler and specifies the minimum number of runner
356 * threads alive, regardless the number of tasks.
357 *
358 * @param minimumRunners Number of threads.
359 */
360 public Scheduler(int minimumRunners)
361 {
362 super();
363 minRunners = Math.max(1, minimumRunners);
364 setPriority(7);
365 start();
366
367 //timer= new Timer();
368 }
369
370 public Scheduler(int min, int max) {
371 this(min);
372 maxRunners=max;
373 }
374
375 /**
376 * Returns runner to the pool. If there are too many runners in the pool,
377 * runner will be stopped.
378 *
379 * @param runner Runner thread to return to pool.
380 */
381 protected void returnRunner(SchedulerRunner runner)
382 {
383 if (maxRunners>0) {
384 synchronized(runners) {
385 if ((runners == null) || (runners.size() >= maxRunners)) {
386 runner.cancel();
387 } else {
388 runners.push(runner);
389 }
390 runners.notify();
391 }
392 }else {
393 // runners can be null during shutdown, in which case, do no return to
394 // pool.
395 if ((runners == null) || (runners.size() > minRunners)) {
396 runner.cancel();
397 } else {
398 runners.push(runner);
399 }
400 }
401 }
402
403 /**
404 * Puts task back to the queue. If task was marked to be canceled, it will
405 * be destroyed.
406 *
407 * @param task Task to return.
408 */
409 protected synchronized void returnTask(TaskEntry task)
410 {
411 if (task.isCanceled()) {
412 return;
413 }
414
415
416 //if (debug) System.out.println("RETURNED "+task);
417 out:
418 if (first == null) {
419 first = task;
420 first.previous = null;
421 first.next = null;
422 last = task;
423 } else {
424 if (task.nextTime >= last.nextTime) {
425 last.next = task;
426 task.previous = last;
427 last = task;
428 } else {
429 TaskEntry t = last;
430
431 while (t.previous != null) {
432 if (task.nextTime >= t.previous.nextTime) {
433 t.previous.next = task;
434 task.previous = t.previous;
435
436 t.previous = task;
437 task.next = t;
438
439 break out;
440 } else {
441 t = t.previous;
442 }
443 }
444
445 first = task;
446 first.previous = null;
447 first.next = t;
448 t.previous = first;
449 }
450 }
451
452 /*TaskEntry t = first;
453
454 while(t!=null) {
455 System.out.println("> "+t);
456 t= t.next;
457 }*/
458 minNextTime = first.nextTime;
459 }
460
461 /**
462 * Schedule new task for immediate single time execution.
463 *
464 * @param task Task to schedule.
465 */
466 public void schedule(SchedulerTask task)
467 {
468 schedule(task, 0, false);
469 }
470
471 /**
472 * Schedule task for delayed execution.
473 *
474 * @param task Task to schedule
475 * @param delay Delay before initial execution.
476 * @param repeated Should this task be repeated or executed only once.
477 */
478 public void schedule(SchedulerTask task, long delay, boolean repeated)
479 {
480 if (task == null) {
481 return;
482 }
483
484
485 /* class Task extends TimerTask {
486 private boolean repeated=false;
487 private SchedulerTask task;
488 private long created;
489 private long duty;
490
491 public Task(SchedulerTask task, boolean repeated) {
492 created= System.currentTimeMillis();
493 this.task=task;
494 this.repeated=repeated;
495 }
496 public void run() {
497 if (!repeated) {
498 cancel();
499 }
500 duty= System.currentTimeMillis();
501 task.run();
502 duty= System.currentTimeMillis()-duty;
503
504 if (repeated) {
505 System.out.println("DELTA= "
506 + (System.currentTimeMillis() - scheduledExecutionTime())
507 + " DUTY= " + duty + " TASK= " + task);
508 } else {
509 System.out.println("IN= "
510 + (System.currentTimeMillis() - created)
511 + " DUTY= " + duty + " TASK= " + task);
512 }
513
514 }
515
516 }
517
518 Task t= new Task(task,repeated);
519
520 if (repeated){
521 timer.scheduleAtFixedRate(t,delay,task.getInterval());
522 } else {
523 timer.schedule(t,delay);
524 }
525 */
526 TaskEntry entry = new TaskEntry(task, delay, repeated);
527
528 returnTask(entry);
529
530 // wake scheduler
531 synchronized (lock) {
532 lock.notifyAll();
533 }
534 }
535
536 /**
537 * Gets new thread that executes tasks. If a task is available from the
538 * pool, this instance will be used, otherwise new instance will be
539 * created.
540 *
541 * @return Instance of task runner.
542 */
543 private SchedulerRunner getRunner()
544 {
545 if (maxRunners > 0) {
546 synchronized (runners) {
547 while (runners.size() >= maxRunners) {
548 try {
549 runners.wait(60000);
550 } catch (Exception e) {
551 // IGNORE
552 }
553 }
554
555 if (runners.isEmpty()) {
556 SchedulerRunner sr = new SchedulerRunner();
557 sr.start();
558
559 return sr;
560 } else {
561 return (SchedulerRunner)runners.pop();
562 }
563 }
564 }
565
566 if (runners.isEmpty()) {
567 SchedulerRunner sr = new SchedulerRunner();
568 sr.start();
569
570 return sr;
571 } else {
572 return (SchedulerRunner)runners.pop();
573 }
574 }
575
576 /**
577 * Main loop that executes tasks.
578 *
579 * @see java.lang.Runnable#run()
580 */
581 public void run()
582 {
583 long delta;
584
585 try {
586 while (!done) {
587 while (first == null && !done) {
588 try {
589 synchronized (lock) {
590 lock.wait();
591 }
592 } catch (InterruptedException e) {
593 }
594 }
595
596 if (done) {
597 return;
598 }
599
600 delta = minNextTime - System.currentTimeMillis();
601
602 //System.out.println("delta "+delta);
603 // Wait until next task is due or we are interrupted
604 // by new task being added to the queue.
605 if (delta > 0) {
606 synchronized (lock) {
607 //System.out.println("delta "+delta);
608 lock.wait(delta);
609 }
610 } else {
611 // Do not execute canceled tasks.
612 TaskEntry task = removeTask();
613
614 if (!done && task != null && !task.isCanceled()) {
615 getRunner().executeTask(task);
616 }
617 }
618 }
619 } catch (InterruptedException e) {
620 // Ignore, since we quit anyway.
621 }
622 }
623
624 /**
625 * Cancels all tasks, runners and stops the scheduler.
626 */
627 public void cancel()
628 {
629 done = true;
630
631 // Stop this thread.
632 synchronized (lock) {
633 lock.notifyAll();
634 }
635
636 // Clear runners list, so that any runners still active will not
637 // get returned to the list.
638 List r = runners;
639 runners = null;
640
641 while (!r.isEmpty()) {
642 ((SchedulerRunner)r.get(0)).cancel();
643 r.remove(0);
644 }
645
646 /**
647 * Clear all defined tasks.
648 */
649 synchronized (this) {
650 first = null;
651 }
652 }
653
654 private synchronized TaskEntry removeTask()
655 {
656 if (first == null) {
657 return null;
658 }
659
660 TaskEntry t = first;
661 first = t.next;
662
663 if (first == null) {
664 last = null;
665 minNextTime = 0;
666 } else {
667 first.previous = null;
668 minNextTime = first.nextTime;
669 }
670
671 t.next = null;
672 t.previous = null;
673
674 /*TaskEntry t1 = first;
675
676 System.out.println("REM "+t);
677 while(t1!=null) {
678 System.out.println("> "+t1);
679 t1= t1.next;
680 }*/
681 return t;
682 }
683 }
684
685 /* __oOo__ */