/* File: TableDataSet.java * Copyright (C) 2002-2004 The University of Iowa * * Created on June 1, 2004, 11:46 AM * by Edward West * * This file is part of the das2 library. * * das2 is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ package org.das2.system; import org.das2.util.ExceptionHandler; import org.das2.util.DasExceptionHandler; import java.lang.ref.WeakReference; import java.util.*; import java.util.Map.Entry; import java.util.logging.Level; import java.util.logging.Logger; import org.das2.DasApplication; import org.das2.graph.DasCanvasComponent; import org.das2.util.LoggerManager; /** Utility class for synchronous execution. * This class maintains a pool of threads that are used to execute arbitrary * code. This class also serves as a central place to catch and handle * unchecked exceptions. * * The {@link #invokeLater(java.lang.Runnable)} method is similar to the * SwingUtilities {@link javax.swing.SwingUtilities#invokeLater(java.lang.Runnable)} * method, except that the request is not executed on the event thread. * * The {@link #invokeLater(java.lang.Runnable, java.lang.Object)}, * the {@link #invokeAfter(java.lang.Runnable, java.lang.Object)}, * and the {@link #waitFor(java.lang.Object)} methods are designed to work * together. Both of the first two methods execute code asynchronously with * respect to the calling thread. Multiple requests made with a call to * invokeLater that specified the same lock can execute at the same time, * but not while a request made with the invokeAfter with the same lock * is processing. Any requests made before an invokeAfter request with the * same lock will finish before that invokeAfter request begins. An * invokeAfter request will finish before any requests with the same lock made * after that invokeAfter request begins. The {@link #waitFor(java.lang.Object)} * method will cause the calling thread to block until all requests with the * specified lock finish. */ public final class RequestProcessor { private static final BlockingRequestQueue queue = new BlockingRequestQueue(); private static final WeakHashMap runnableQueueMap = new WeakHashMap(); private static final Runner runner = new Runner(); private static final Runnable SHUTDOWN = new Runnable() { public void run() {} }; private static int maxThreadCount = 8; private static int threadCount = 0; private static final Object THREAD_COUNT_LOCK = new Object(); private final static Logger logger= LoggerManager.getLogger( "das2.system.requestprocessor"); private static int threadOrdinal = 0; private RequestProcessor() {} public static void printStatus() { List c= new ArrayList( queue.list ); System.err.println( String.format( "== RequestProcessor (%d jobs) ==", c.size() ) ); for ( Object t: c ) { System.err.println(t); } Set c2= new LinkedHashSet(); c2.addAll( runnableQueueMap.entrySet() ); System.err.println( String.format( "== RequestProcessor runnableQueueMap (%d) ==", c2.size() ) ); for ( Object t: c2 ) { Entry e= (Entry)t; RunnableQueue q= (RunnableQueue)e.getValue(); List c3= new ArrayList( q.list ); System.err.println( String.format( " === (%d jobs) ===", c3.size() ) ); for ( Object t3: c3 ) { System.err.println(t3); } System.err.println(t); } } private static void setJob(Runnable job) { RequestThread thread = (RequestThread)Thread.currentThread(); thread.setJob(job); } private static class RequestThread extends Thread { private WeakReference job; private RequestThread(Runnable run, String name) { super(run, name); } private void setJob(Runnable job) { this.job = new WeakReference(job); } private Runnable getJob() { return (Runnable)job.get(); } } private static void newThread() { String name = "RequestProcessor[" + (threadOrdinal++) + "]"; RequestThread t = new RequestThread(runner, name); t.setPriority(Thread.NORM_PRIORITY); t.start(); } /** Executes run.run() asynchronously on a thread from the thread pool. * @param run the task to be executed. */ public static void invokeLater(Runnable run) { logger.log(Level.FINE, "invokeLater {0}", String.valueOf(run)); synchronized (THREAD_COUNT_LOCK) { if (threadCount < maxThreadCount) { newThread(); } } queue.add(run); } /** * reset the maximum number of threads used. If there are more threads * existing already, than the extra threads will be shut down as they finish * their current jobs. * @param t */ public static void setThreadCount( int t ) { if ( t<4 ) throw new IllegalArgumentException("must be at least 1"); if ( t>32) throw new IllegalArgumentException("must be no more than 32"); maxThreadCount= t; } /** Executes run.run() asynchronously on a thread from the thread pool. * The task will not be executed until after all requests made with * {@link #invokeAfter(java.lang.Runnable, java.lang.Object)} with the same * lock have finished. * @param run the task to be executed. * @param lock associates run with other tasks. * @see #waitFor(java.lang.Object) */ public static void invokeLater(Runnable run, Object lock) { logger.log(Level.FINE, "invokeLater {0} {1}", new Object[]{String.valueOf(run), lock}); synchronized (THREAD_COUNT_LOCK) { if (threadCount < maxThreadCount) { newThread(); } } synchronized (runnableQueueMap) { RunnableQueue rq = (RunnableQueue)runnableQueueMap.get(lock); if (rq == null) { String name= ( lock instanceof DasCanvasComponent ) ? ((DasCanvasComponent)lock).getDasName() : lock.toString(); rq = new RunnableQueue("RQ_"+name ); runnableQueueMap.put(lock, rq); } rq.add(run, false); queue.add(rq); } } /** Executes run.run() asynchronously on a thread from the thread pool. * The task will not be executed until after all requests made with * {@link #invokeAfter(java.lang.Runnable, java.lang.Object)} or * {@link #invokeLater(java.lang.Runnable, java.lang.Object)} with the same * lock have finished. * @param run the task to be executed. * @param lock associates run with other tasks. * @see #waitFor(java.lang.Object) */ public static void invokeAfter(Runnable run, Object lock) { logger.log(Level.FINE, "invokeAfter {0} {1}", new Object[]{String.valueOf(run), lock}); synchronized (THREAD_COUNT_LOCK) { if (threadCount < maxThreadCount) { newThread(); } } synchronized (runnableQueueMap) { RunnableQueue rq = (RunnableQueue)runnableQueueMap.get(lock); if (rq == null) { String name= ( lock instanceof DasCanvasComponent ) ? ((DasCanvasComponent)lock).getDasName() : lock.toString(); rq = new RunnableQueue("RQ_"+name ); runnableQueueMap.put(lock, rq); } rq.add(run, true); queue.add(rq); } } /** Blocks until all tasks with the same lock have finished. * @param lock * @throws InterruptedException if the current thread is * interrupted while waiting. */ public static void waitFor(Object lock) throws InterruptedException { WaitTask wt = new WaitTask(); synchronized (wt) { while (true) { invokeLater(wt, lock); wt.wait(); return; } } } public static void shutdown() { queue.add(SHUTDOWN); } /* public static int getMaximumThreadCount(int i) { return maxThreadCount; } public static void setMaximumThreadCount(int i) { if (i < 5) { throw new IllegalArgumentException("Must be >= 5: " + i); } maxThreadCount = i; } */ private static class Runner implements Runnable { public void run() { synchronized (THREAD_COUNT_LOCK) { threadCount++; } try { while (true) { try { Runnable run = queue.remove(); if (run == SHUTDOWN) { queue.add(run); // all the runners need to see this message, requeue for them. break; } logger.log(Level.FINE, "running {0}", String.valueOf(run)); if (run != null) { setJob(run); run.run(); logger.log(Level.FINE, "completed {0}", String.valueOf(run)); run= null; // Maybe fix GC leak } synchronized (THREAD_COUNT_LOCK) { if (threadCount > maxThreadCount) { break; } } } catch (ThreadDeath td) { // See documentation for ThreadDeath. If this error is caught but not thrown, then the thread doesn't die. throw td; } catch (Throwable t) { logger.log(Level.INFO, "uncaught exception "+t.getMessage(), t); //Clear interrupted status (if set) ExceptionHandler eh= DasApplication.getDefaultApplication().getExceptionHandler(); eh.handleUncaught(t); Thread.interrupted(); } } } finally { synchronized (THREAD_COUNT_LOCK) { threadCount--; } } } } private static class WaitTask implements Runnable { public synchronized void run() { notifyAll(); } } private static class RunnableQueue implements Runnable { private LinkedList list = new LinkedList(); private int readCount = 0; private Object writer; private String name; private RunnableQueue( String name ) { this.name= name; } public void run() { Runnable run = null; RequestEntry entry = null; while (run == null) { synchronized (this) { //entry = (RequestEntry)list.removeFirst(); entry = (RequestEntry)list.getFirst(); if (entry.async && readCount == 0 && writer == null) { list.removeFirst(); writer = entry; run = entry.run; } else if (!entry.async && writer == null) { list.removeFirst(); readCount++; run = entry.run; } } } logger.log(Level.FINE, "Starting :{0}", String.valueOf(run)); assert run!=null; assert entry!=null; run.run(); logger.log(Level.FINE, "Finished :{0}", String.valueOf(run)); synchronized (this) { if (entry.async) { writer = null; } else { readCount--; } notifyAll(); } } synchronized void add(Runnable run, boolean async) { RequestEntry entry = new RequestEntry(); entry.run = run; entry.async = async; list.add(entry); } @Override public String toString() { return "RunnableQueue["+name+"]"; } } private static class RequestEntry { Runnable run; boolean async; } private static class BlockingRequestQueue { private LinkedList list; BlockingRequestQueue() { list = new LinkedList(); } synchronized void add(Runnable r) { list.add(r); notify(); } synchronized Runnable remove() { while (list.isEmpty()) { try { wait(); } catch (InterruptedException ie) {}; } return (Runnable)list.removeFirst(); } } }