View Javadoc

1   /*
2    * Copyright (c) 2003-2008 by Cosylab d. d.
3    *
4    * This file is part of Java-Common.
5    *
6    * Java-Common is free software: you can redistribute it and/or modify
7    * it under the terms of the GNU General Public License as published by
8    * the Free Software Foundation, either version 3 of the License, or
9    * (at your option) any later version.
10   *
11   * Java-Common is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14   * GNU General Public License for more details.
15   *
16   * You should have received a copy of the GNU General Public License
17   * along with Java-Common.  If not, see <http://www.gnu.org/licenses/>.
18   */
19  
20  package com.cosylab.util;
21  
22  import java.util.List;
23  import java.util.Stack;
24  
25  
26  /**
27   * Scheduler implementation that prevents single tasks from locking up entire
28   * scheduler. This implementation tries to use minimal number of threads to
29   * execute given tasks.
30   * 
31   * <p>
32   * Tasks are scheduled using a priority queue, sorted by next execution time.
33   * Time is measured using wait method, so it is bound to be incorrect. This
34   * implementation is not suitable for scheduling times below 10ms, but it
35   * depends on platform (limits are about 50ms for Windows9x, 10ms for Windows
36   * 2000/XP/NT and 1ms for Linux/Unix).<br>
37   * Each task will run only once at any given time (if task is scheduled to be
38   * executed every 10ms, but it takes 100ms to run, same task will not be
39   * executed 10 times in between).<br>
40   * If a task exceeds its next scheduled execution time while running, next
41   * execution time will be assigned by adding
42   * <code>SchedulerTask.getInterval()</code> after current task has completed.
43   * </p>
44   *
45   * @author <a href="mailto:ales.pucelj@cosylab.com">Ales Pucelj</a>
46   * @version $id$
47   */
48  public class Scheduler extends Thread
49  {
50  	private TaskEntry first;
51  	private TaskEntry last;
52  
53  	/** List of threads executing tasks. */
54  	private Stack runners = new Stack();
55  
56  	/** Flag indicating that Scheduler should stop. */
57  	private volatile boolean done = false;
58  
59  	/** Minimum number of runner threads. */
60  	private int minRunners = 1;
61  	private int maxRunners = 0;
62  
63  	// closes tisk exparation
64  	private long minNextTime;
65  	private Object lock = new Object();
66  	private final boolean debug = false;
67  
68  	/**
69  	 * Thread that runs single task. These threads are automatically allocated
70  	 * and stopped based on number of tasks that run simultaneously.
71  	 */
72  	private class SchedulerRunner extends Thread
73  	{
74  		/** Lock on which the thread waits until new task is assigned to it. */
75  		private final Object lock = new Object();
76  
77  		/** Task to execute. */
78  		private TaskEntry task;
79  
80  		/** Flag indicating this thread should stop. */
81  		private volatile boolean done;
82  
83  		/**
84  		 * Default constructor for ScheduleRunner.
85  		 */
86  		public SchedulerRunner()
87  		{
88  			done = false;
89  			task = null;
90  		}
91  
92  		/**
93  		 * Assign a task to be executed. This is the task this thread will
94  		 * execute. New tasks will not be accepted until current completes.
95  		 *
96  		 * @param task Task to execute.
97  		 */
98  		public void executeTask(TaskEntry task)
99  		{
100 			//setName("SchedulerRunner: running '" + task.getName() + "'");
101 			this.task = task;
102 
103 			synchronized (lock) {
104 				lock.notifyAll();
105 			}
106 		}
107 
108 		/**
109 		 * Stop this thread. Will wait until current task completes.
110 		 */
111 		public void cancel()
112 		{
113 			done = true;
114 
115 			synchronized (lock) {
116 				lock.notifyAll();
117 			}
118 		}
119 
120 		/**
121 		 * Blocks until new task is assigned to this thread. Returns task that
122 		 * was assigned.
123 		 *
124 		 * @return Task to execute
125 		 *
126 		 * @throws InterruptedException
127 		 */
128 		private TaskEntry getTask() throws InterruptedException
129 		{
130 			synchronized (lock) {
131 				while (task == null) {
132 					lock.wait();
133 
134 					if (done) {
135 						return null;
136 					}
137 				}
138 			}
139 
140 			return task;
141 		}
142 
143 		/**
144 		 * Loop that executes and runs tasks. This method will block on
145 		 * getTask() until interrupted or task is assigned.
146 		 *
147 		 * @see java.lang.Runnable#run()
148 		 */
149 		public void run()
150 		{
151 			try {
152 				while (!done) {
153 					// Make a copy of task to prevent undefined behaviour
154 					// in case new task is assigned while running current
155 					TaskEntry t = getTask();
156 
157 					if (done) {
158 						return;
159 					}
160 
161 					long delta = 0;
162 
163 					if (debug) {
164 						delta = System.currentTimeMillis() - t.nextTime;
165 					}
166 
167 					t.run();
168 					task = null;
169 
170 					if (!t.isCanceled()) {
171 						if (debug) {
172 							System.out.println("DELTA= " + delta + " DUTY= "
173 							    + t.duty + " TASK= " + t.task);
174 						}
175 
176 						returnTask(t);
177 
178 						// wake scheduler
179 						synchronized (Scheduler.this.lock) {
180 							Scheduler.this.lock.notifyAll();
181 						}
182 					} else {
183 						if (debug) {
184 							System.out.println("IN= "
185 							    + (System.currentTimeMillis() - t.created)
186 							    + " DUTY= " + t.duty + " TASK= " + t.task);
187 						}
188 					}
189 
190 					returnRunner(this);
191 				}
192 			} catch (InterruptedException e) {
193 				// Do nothing since we exit the loop anyway.
194 			}
195 		}
196 	}
197 
198 	/**
199 	 * This is a wrapper arround the Task interface. It allows the Task
200 	 * interface to be much simpler, since all scheduling related parameters
201 	 * are stored here. For most methods it maps to the Task interface itself.
202 	 */
203 	private class TaskEntry implements Comparable
204 	{
205 		/** Next entry in linked list */
206 		public TaskEntry next;
207 
208 		/** Next previous in linked list */
209 		public TaskEntry previous;
210 
211 		// time of creation
212 		private long created;
213 
214 		// time of creation
215 		private long duty;
216 
217 		/**
218 		 * Next time when this task is to be executed. Can be in the past,
219 		 * indicating immediate execution.
220 		 */
221 		private long nextTime;
222 
223 		/** Actual task to execute. */
224 		private SchedulerTask task;
225 
226 		/** Should this task be executed only once or repeated indefinitely. */
227 		private boolean repeated;
228 
229 		/** Is this task to be canceled on next execution. */
230 		private boolean canceled = false;
231 
232 		/**
233 		 * Creates new TaskEntry, specifying task that will be executed
234 		 * immediately and only once.
235 		 *
236 		 * @param t Task to schedule.
237 		 */
238 		public TaskEntry(SchedulerTask t)
239 		{
240 			this(t, 0, false);
241 		}
242 
243 		/**
244 		 * Creates new TaskEntry, specifying the task to be executed after a
245 		 * given delay and whether this task should be repeated.
246 		 *
247 		 * @param t Task to schedule.
248 		 * @param delay Delay before first execution.
249 		 * @param isRepeated Should it be repeated.
250 		 */
251 		public TaskEntry(SchedulerTask t, long delay, boolean isRepeated)
252 		{
253 			task = t;
254 			nextTime = System.currentTimeMillis() + delay;
255 			repeated = isRepeated;
256 			created = System.currentTimeMillis();
257 		}
258 
259 		/**
260 		 * Returns next time when this task should execute.
261 		 *
262 		 * @return Time in milliseconds.
263 		 */
264 		public long getNextTime()
265 		{
266 			return nextTime;
267 		}
268 
269 		/**
270 		 * Retunrs name of this task. Name is obtained from
271 		 * <code>SchedulerTask</code> or set to 'anonymous' if null.
272 		 *
273 		 * @return Name of this task.
274 		 */
275 		public String getName()
276 		{
277 			String name = task.getTaskName();
278 
279 			if (name == null) {
280 				name = "<anonymous>";
281 			}
282 
283 			return name;
284 		}
285 
286 		/**
287 		 * Will this task be canceled before next execution. Non repeated tasks
288 		 * return true after they have run once.
289 		 *
290 		 * @return True if the task should be canceled.
291 		 */
292 		public boolean isCanceled()
293 		{
294 			if (repeated) {
295 				return task.isCanceled();
296 			} else {
297 				return canceled;
298 			}
299 		}
300 
301 		/**
302 		 * Comparator implementation for use with priority queue. Compares
303 		 * scheduled execution times.
304 		 *
305 		 * @see java.lang.Comparable#compareTo(java.lang.Object)
306 		 */
307 		public int compareTo(Object o)
308 		{
309 			long other = ((TaskEntry)o).getNextTime();
310 
311 			if (nextTime < other) {
312 				return -1;
313 			}
314 
315 			if (nextTime > other) {
316 				return 1;
317 			}
318 
319 			return 0;
320 		}
321 
322 		/**
323 		 * Executes the task and sets the new execution times. Also marks
324 		 * non-repeated tasks for cancelation.
325 		 */
326 		public void run()
327 		{
328 			duty = System.currentTimeMillis();
329 			task.run();
330 			duty = System.currentTimeMillis() - duty;
331 			canceled = true;
332 			nextTime = System.currentTimeMillis() + task.getInterval();
333 		}
334 
335 		/**
336 		 * Name of the associated task.
337 		 *
338 		 * @see java.lang.Object#toString()
339 		 */
340 		public String toString()
341 		{
342 			return task.toString() + " " + nextTime;
343 		}
344 	}
345 
346 	/**
347 	 * Creates new scheduler and starts it.
348 	 */
349 	public Scheduler()
350 	{
351 		this(3);
352 	}
353 
354 	/**
355 	 * Creates a new scheduler and specifies the minimum number of runner
356 	 * threads alive, regardless the number of tasks.
357 	 *
358 	 * @param minimumRunners Number of threads.
359 	 */
360 	public Scheduler(int minimumRunners)
361 	{
362 		super();
363 		minRunners = Math.max(1, minimumRunners);
364 		setPriority(7);
365 		start();
366 
367 		//timer= new Timer();
368 	}
369 	
370 	public Scheduler(int min, int max) {
371 		this(min);
372 		maxRunners=max;
373 	}
374 
375 	/**
376 	 * Returns runner to the pool. If there are too many runners in the pool,
377 	 * runner will be stopped.
378 	 *
379 	 * @param runner Runner thread to return to pool.
380 	 */
381 	protected void returnRunner(SchedulerRunner runner)
382 	{
383 		if (maxRunners>0) {
384 			synchronized(runners) {
385 				if ((runners == null) || (runners.size() >= maxRunners)) {
386 					runner.cancel();
387 				} else {
388 					runners.push(runner);
389 				}
390 				runners.notify();
391 			}
392 		}else {
393 		// runners can be null during shutdown, in which case, do no return to
394 		// pool. 
395 		if ((runners == null) || (runners.size() > minRunners)) {
396 			runner.cancel();
397 		} else {
398 			runners.push(runner);
399 		}
400 		}
401 	}
402 
403 	/**
404 	 * Puts task back to the queue. If task was marked to be canceled, it will
405 	 * be destroyed.
406 	 *
407 	 * @param task Task to return.
408 	 */
409 	protected synchronized void returnTask(TaskEntry task)
410 	{
411 		if (task.isCanceled()) {
412 			return;
413 		}
414 
415 
416 //if (debug) System.out.println("RETURNED "+task);
417 out: 
418 		if (first == null) {
419 			first = task;
420 			first.previous = null;
421 			first.next = null;
422 			last = task;
423 		} else {
424 			if (task.nextTime >= last.nextTime) {
425 				last.next = task;
426 				task.previous = last;
427 				last = task;
428 			} else {
429 				TaskEntry t = last;
430 
431 				while (t.previous != null) {
432 					if (task.nextTime >= t.previous.nextTime) {
433 						t.previous.next = task;
434 						task.previous = t.previous;
435 
436 						t.previous = task;
437 						task.next = t;
438 
439 						break out;
440 					} else {
441 						t = t.previous;
442 					}
443 				}
444 
445 				first = task;
446 				first.previous = null;
447 				first.next = t;
448 				t.previous = first;
449 			}
450 		}
451 
452 		/*TaskEntry t = first;
453 
454 		while(t!=null) {
455 		    System.out.println("> "+t);
456 		    t= t.next;
457 		}*/
458 		minNextTime = first.nextTime;
459 	}
460 
461 	/**
462 	 * Schedule new task for immediate single time execution.
463 	 *
464 	 * @param task Task to schedule.
465 	 */
466 	public void schedule(SchedulerTask task)
467 	{
468 		schedule(task, 0, false);
469 	}
470 
471 	/**
472 	 * Schedule task for delayed execution.
473 	 *
474 	 * @param task Task to schedule
475 	 * @param delay Delay before initial execution.
476 	 * @param repeated Should this task be repeated or executed only once.
477 	 */
478 	public void schedule(SchedulerTask task, long delay, boolean repeated)
479 	{
480 		if (task == null) {
481 			return;
482 		}
483 
484 
485 		/*        class Task extends TimerTask {
486 		            private boolean repeated=false;
487 		            private SchedulerTask task;
488 		            private long created;
489 		            private long duty;
490 
491 		            public Task(SchedulerTask task, boolean repeated) {
492 		                created= System.currentTimeMillis();
493 		                this.task=task;
494 		                this.repeated=repeated;
495 		            }
496 		            public void run() {
497 		                if (!repeated) {
498 		                    cancel();
499 		                }
500 		                duty= System.currentTimeMillis();
501 		                task.run();
502 		                duty= System.currentTimeMillis()-duty;
503 
504 		                if (repeated) {
505 		                    System.out.println("DELTA= "
506 		                        + (System.currentTimeMillis() - scheduledExecutionTime())
507 		                        + " DUTY= " + duty + " TASK= " + task);
508 		                } else {
509 		                    System.out.println("IN= "
510 		                        + (System.currentTimeMillis() - created)
511 		                        + " DUTY= " + duty + " TASK= " + task);
512 		                }
513 
514 		            }
515 
516 		        }
517 
518 		        Task t= new Task(task,repeated);
519 
520 		        if (repeated){
521 		            timer.scheduleAtFixedRate(t,delay,task.getInterval());
522 		        } else {
523 		            timer.schedule(t,delay);
524 		        }
525 		*/
526 		TaskEntry entry = new TaskEntry(task, delay, repeated);
527 
528 		returnTask(entry);
529 
530 		// wake scheduler
531 		synchronized (lock) {
532 			lock.notifyAll();
533 		}
534 	}
535 
536 	/**
537 	 * Gets new thread that executes tasks. If a task is available from the
538 	 * pool, this instance will be used, otherwise new instance will be
539 	 * created.
540 	 *
541 	 * @return Instance of task runner.
542 	 */
543 	private SchedulerRunner getRunner()
544 	{
545 		if (maxRunners > 0) {
546 			synchronized (runners) {
547 				while (runners.size() >= maxRunners) {
548 					try {
549 						runners.wait(60000);
550 					} catch (Exception e) {
551 						// IGNORE
552 					}
553 				}
554 
555 				if (runners.isEmpty()) {
556 					SchedulerRunner sr = new SchedulerRunner();
557 					sr.start();
558 
559 					return sr;
560 				} else {
561 					return (SchedulerRunner)runners.pop();
562 				}
563 			}
564 		}
565 
566 		if (runners.isEmpty()) {
567 			SchedulerRunner sr = new SchedulerRunner();
568 			sr.start();
569 
570 			return sr;
571 		} else {
572 			return (SchedulerRunner)runners.pop();
573 		}
574 	}
575 
576 	/**
577 	 * Main loop that executes tasks.
578 	 *
579 	 * @see java.lang.Runnable#run()
580 	 */
581 	public void run()
582 	{
583 		long delta;
584 
585 		try {
586 			while (!done) {
587 				while (first == null && !done) {
588 					try {
589 						synchronized (lock) {
590 							lock.wait();
591 						}
592 					} catch (InterruptedException e) {
593 					}
594 				}
595 
596 				if (done) {
597 					return;
598 				}
599 
600 				delta = minNextTime - System.currentTimeMillis();
601 
602 				//System.out.println("delta "+delta);
603 				// Wait until next task is due or we are interrupted
604 				// by new task being added to the queue.
605 				if (delta > 0) {
606 					synchronized (lock) {
607 						//System.out.println("delta "+delta);
608 						lock.wait(delta);
609 					}
610 				} else {
611 					// Do not execute canceled tasks.
612 					TaskEntry task = removeTask();
613 
614 					if (!done && task != null && !task.isCanceled()) {
615 						getRunner().executeTask(task);
616 					}
617 				}
618 			}
619 		} catch (InterruptedException e) {
620 			// Ignore, since we quit anyway.
621 		}
622 	}
623 
624 	/**
625 	 * Cancels all tasks, runners and stops the scheduler.
626 	 */
627 	public void cancel()
628 	{
629 		done = true;
630 
631 		// Stop this thread.
632 		synchronized (lock) {
633 			lock.notifyAll();
634 		}
635 
636 		// Clear runners list, so that any runners still active will not 
637 		// get returned to the list.
638 		List r = runners;
639 		runners = null;
640 
641 		while (!r.isEmpty()) {
642 			((SchedulerRunner)r.get(0)).cancel();
643 			r.remove(0);
644 		}
645 
646 		/**
647 		 * Clear all defined tasks.
648 		 */
649 		synchronized (this) {
650 			first = null;
651 		}
652 	}
653 
654 	private synchronized TaskEntry removeTask()
655 	{
656 		if (first == null) {
657 			return null;
658 		}
659 
660 		TaskEntry t = first;
661 		first = t.next;
662 
663 		if (first == null) {
664 			last = null;
665 			minNextTime = 0;
666 		} else {
667 			first.previous = null;
668 			minNextTime = first.nextTime;
669 		}
670 
671 		t.next = null;
672 		t.previous = null;
673 
674 		/*TaskEntry t1 = first;
675 
676 		System.out.println("REM "+t);
677 		while(t1!=null) {
678 		    System.out.println("> "+t1);
679 		    t1= t1.next;
680 		}*/
681 		return t;
682 	}
683 }
684 
685 /* __oOo__ */