001 /* 002 @license.text@ 003 */ 004 package biz.hammurapi.util; 005 006 import java.util.LinkedList; 007 008 import biz.hammurapi.config.ComponentBase; 009 import biz.hammurapi.config.ConfigurationException; 010 import biz.hammurapi.metrics.MeasurementConsumer; 011 012 013 /** 014 * Distributes work among multiple threads. 015 * @author Pavel Vlasov 016 * @revision $Revision$ 017 */ 018 public class ThreadPool extends ComponentBase implements Worker { 019 020 private int numberOfThreads=10; 021 private int priority=Thread.NORM_PRIORITY; 022 private ExceptionSink exceptionSink; 023 private int maxQueue=10; 024 private boolean stopped; 025 026 public ThreadPool() { 027 // Default constructor 028 } 029 030 /** 031 * @param numberOfThreads Number of threads to create. 032 * @param priority Threads priority. 033 * @param maxQueue Maximum number of jobs in execution queue. When 034 * execution queue reaches its maximum post() processes job in the invoking thread. Values <1 mean no limit. 035 * @param exceptionSink 036 */ 037 public ThreadPool(int numberOfThreads, int priority, int maxQueue, ExceptionSink exceptionSink) { 038 super(); 039 this.numberOfThreads=numberOfThreads; 040 this.priority=priority; 041 this.exceptionSink=exceptionSink; 042 this.maxQueue=maxQueue; 043 } 044 045 public ThreadPool(int numberOfThreads, int priority, int maxQueue, ExceptionSink exceptionSink, String name) { 046 this(numberOfThreads, priority, maxQueue, exceptionSink); 047 if (name!=null) { 048 this.name=name; 049 } 050 } 051 052 private LinkedList jobQueue=new LinkedList(); 053 private int[] threads={0}; 054 private String name = toString(); 055 056 public boolean post(Runnable job) { 057 int queueSize; 058 synchronized (jobQueue) { 059 if (stopped) { 060 return false; 061 } 062 063 queueSize=jobQueue.size(); 064 065 if (maxQueue==0 || queueSize<maxQueue) { 066 // Add job to processing queue 067 jobQueue.add(job); 068 jobQueue.notify(); // wake up one thread. 069 070 addMeasurement("queue", queueSize, 0); 071 addMeasurement("post", 1, 0); 072 return true; 073 } 074 } 075 076 addMeasurement("queue", queueSize, 0); 077 addMeasurement("post", 1, 0); 078 079 // process in current thread. 080 process(job); 081 return true; 082 } 083 084 private class WorkerThread extends Thread { 085 086 public void run() { 087 synchronized (threads) { 088 threads[0]++; 089 } 090 091 try { 092 while (true) { 093 Runnable job; 094 int queueSize; 095 synchronized (jobQueue) { 096 while (!stopped && jobQueue.isEmpty()) { 097 try { 098 jobQueue.wait(); 099 } catch (InterruptedException e) { 100 if (exceptionSink==null) { 101 e.printStackTrace(); 102 } else { 103 exceptionSink.consume(this, e); 104 } 105 return; 106 } 107 } 108 109 if (stopped && jobQueue.isEmpty()) { 110 return; 111 } 112 113 job=(Runnable) jobQueue.removeFirst(); 114 115 queueSize=jobQueue.size(); 116 } 117 118 addMeasurement("queue", queueSize, 0); 119 120 process(job); 121 } 122 } finally { 123 synchronized (threads) { 124 threads[0]--; 125 if (threads[0]<=0) { 126 threads.notifyAll(); 127 } 128 } 129 } 130 } 131 } 132 133 public void start() throws ConfigurationException { 134 for (int i=0; i<numberOfThreads; i++) { 135 Thread th = new WorkerThread(); 136 th.setPriority(priority); 137 th.setName(this.name +"-pool-thread-"+i); 138 th.start(); 139 } 140 } 141 142 /** 143 * Creates threads to replace terminated threads. 144 * Client posting jobs to thread pool may implement liveness check routines and 145 * terminate pool threads in a job hangs. This method allows the client code 146 * to replenish the thread pool after termination of one of worker threads. 147 */ 148 public void replenish() { 149 if (!stopped) { 150 synchronized (threads) { 151 for (int i=threads[0]; i<numberOfThreads; i++) { 152 Thread th = new WorkerThread(); 153 th.setPriority(priority); 154 th.setName(this.name +"-pool-thread-"+i); 155 th.start(); 156 } 157 } 158 } 159 } 160 161 public void stop() throws ConfigurationException { 162 synchronized (threads) { 163 stopped=true; 164 synchronized (jobQueue) { 165 jobQueue.notifyAll(); 166 } 167 168 while (threads[0]>0) { 169 try { 170 threads.wait(); 171 } catch (InterruptedException e) { 172 throw new ConfigurationException("Stop() interrupted", e); 173 } 174 } 175 } 176 } 177 178 /** 179 * @param job 180 */ 181 private void process(Runnable job) { 182 long start=System.currentTimeMillis(); 183 try { 184 job.run(); 185 } catch (Exception e) { 186 if (exceptionSink==null) { 187 e.printStackTrace(); 188 } else { 189 exceptionSink.consume(job, e); 190 } 191 } finally { 192 long now = System.currentTimeMillis(); 193 long duration = now - start; 194 addMeasurement("run", duration, now); 195 196 // Jobs can collect metrics if they implement MeasurementConsumer. 197 if (job instanceof MeasurementConsumer) { 198 ((MeasurementConsumer) job).addMeasurement("run", duration, now); 199 } 200 } 201 } 202 203 public int getMaxQueue() { 204 return maxQueue; 205 } 206 207 public void setMaxQueue(int maxQueue) { 208 this.maxQueue = maxQueue; 209 } 210 211 public String getName() { 212 return name; 213 } 214 215 public void setName(String name) { 216 this.name = name; 217 } 218 219 public int getNumberOfThreads() { 220 return numberOfThreads; 221 } 222 223 public void setNumberOfThreads(int numberOfThreads) { 224 this.numberOfThreads = numberOfThreads; 225 } 226 227 public int getPriority() { 228 return priority; 229 } 230 231 public void setPriority(int priority) { 232 this.priority = priority; 233 } 234 235 public void setExceptionSink(ExceptionSink exceptionSink) { 236 this.exceptionSink = exceptionSink; 237 } 238 239 }