001    /*
002    @license.text@
003     */
004    package biz.hammurapi.util;
005    
006    import java.util.LinkedList;
007    
008    import biz.hammurapi.config.ComponentBase;
009    import biz.hammurapi.config.ConfigurationException;
010    import biz.hammurapi.metrics.MeasurementConsumer;
011    
012    
013    /**
014     * Distributes work among multiple threads.
015     * @author Pavel Vlasov
016     * @revision $Revision$
017     */
018    public class ThreadPool extends ComponentBase implements Worker {
019            
020            private int numberOfThreads=10;
021            private int priority=Thread.NORM_PRIORITY; 
022            private ExceptionSink exceptionSink;
023            private int maxQueue=10;
024            private boolean stopped;
025            
026            public ThreadPool() {
027                    // Default constructor
028            }
029            
030            /**
031             * @param numberOfThreads Number of threads to create.
032             * @param priority Threads priority.
033             * @param maxQueue Maximum number of jobs in execution queue. When 
034             * execution queue reaches its maximum post() processes job in the invoking thread. Values <1 mean no limit.
035             * @param exceptionSink
036             */
037            public ThreadPool(int numberOfThreads, int priority, int maxQueue, ExceptionSink exceptionSink) {
038                    super();
039                    this.numberOfThreads=numberOfThreads;
040                    this.priority=priority;
041                    this.exceptionSink=exceptionSink;
042                    this.maxQueue=maxQueue;
043            }
044            
045            public ThreadPool(int numberOfThreads, int priority, int maxQueue, ExceptionSink exceptionSink, String name) {
046                    this(numberOfThreads, priority, maxQueue, exceptionSink);
047                    if (name!=null) {
048                            this.name=name;
049                    }
050            }
051    
052            private LinkedList jobQueue=new LinkedList();
053            private int[] threads={0};
054            private String name = toString();
055            
056            public boolean post(Runnable job) {
057                    int queueSize;
058                    synchronized (jobQueue) {
059                            if (stopped) {
060                                    return false;
061                            }
062                            
063                            queueSize=jobQueue.size();
064                            
065                            if (maxQueue==0 || queueSize<maxQueue) {
066                                    // Add job to processing queue
067                                    jobQueue.add(job);                      
068                                    jobQueue.notify(); // wake up one thread.
069                                    
070                                    addMeasurement("queue", queueSize, 0);
071                                    addMeasurement("post", 1, 0);
072                                    return true;
073                            }
074                    }
075                    
076                    addMeasurement("queue", queueSize, 0);
077                    addMeasurement("post", 1, 0);
078                    
079                    // process in current thread.
080                    process(job);
081                    return true;
082            }
083            
084            private class WorkerThread extends Thread {
085                    
086                    public void run() {
087                            synchronized (threads) {
088                                    threads[0]++;
089                            }
090                            
091                            try {
092                                    while (true) {
093                                            Runnable job;
094                                            int queueSize;
095                                            synchronized (jobQueue) {
096                                                    while (!stopped && jobQueue.isEmpty()) {
097                                                            try {
098                                                                    jobQueue.wait();
099                                                            } catch (InterruptedException e) {
100                                                                    if (exceptionSink==null) {
101                                                                            e.printStackTrace();
102                                                                    } else {
103                                                                            exceptionSink.consume(this, e);
104                                                                    }
105                                                                    return;
106                                                            }                                                               
107                                                    }
108                                                    
109                                                    if (stopped && jobQueue.isEmpty()) {
110                                                            return;
111                                                    }
112                                                    
113                                                    job=(Runnable) jobQueue.removeFirst();
114                                                    
115                                                    queueSize=jobQueue.size();
116                                            }       
117                                            
118                                            addMeasurement("queue", queueSize, 0);
119                                            
120                                            process(job);
121                                    }                                                                                       
122                            } finally {
123                                    synchronized (threads) {
124                                            threads[0]--;
125                                            if (threads[0]<=0) {
126                                                    threads.notifyAll();
127                                            }
128                                    }
129                            }
130                    }
131            }
132    
133            public void start() throws ConfigurationException {
134                    for (int i=0; i<numberOfThreads; i++) {
135                            Thread th = new WorkerThread();                 
136                            th.setPriority(priority);
137                            th.setName(this.name +"-pool-thread-"+i);
138                            th.start();
139                    }
140            }
141            
142            /**
143             * Creates threads to replace terminated threads.
144             * Client posting jobs to thread pool may implement liveness check routines and
145             * terminate pool threads in a job hangs. This method allows the client code
146             * to replenish the thread pool after termination of one of worker threads.
147             */
148            public void replenish() {
149                    if (!stopped) {
150                            synchronized (threads) {
151                                    for (int i=threads[0]; i<numberOfThreads; i++) {
152                                            Thread th = new WorkerThread();                 
153                                            th.setPriority(priority);
154                                            th.setName(this.name +"-pool-thread-"+i);
155                                            th.start();
156                                    }                               
157                            }
158                    }
159            }
160    
161            public void stop() throws ConfigurationException {
162                    synchronized (threads) {                        
163                            stopped=true;
164                            synchronized (jobQueue) {
165                                    jobQueue.notifyAll();
166                            }
167                            
168                            while (threads[0]>0) {
169                                    try {
170                                            threads.wait();
171                                    } catch (InterruptedException e) {
172                                            throw new ConfigurationException("Stop() interrupted", e);
173                                    }
174                            }
175                    }
176            }
177    
178            /**
179             * @param job
180             */
181            private void process(Runnable job) {
182                    long start=System.currentTimeMillis();
183                    try {
184                            job.run();
185                    } catch (Exception e) {
186                            if (exceptionSink==null) {
187                                    e.printStackTrace();
188                            } else {
189                                    exceptionSink.consume(job, e);
190                            }
191                    } finally {
192                            long now = System.currentTimeMillis();
193                            long duration = now - start;
194                            addMeasurement("run", duration, now);
195                            
196                            // Jobs can collect metrics if they implement MeasurementConsumer.
197                            if (job instanceof MeasurementConsumer) {
198                                    ((MeasurementConsumer) job).addMeasurement("run", duration, now);
199                            }
200                    }
201            }
202    
203            public int getMaxQueue() {
204                    return maxQueue;
205            }
206    
207            public void setMaxQueue(int maxQueue) {
208                    this.maxQueue = maxQueue;
209            }
210    
211            public String getName() {
212                    return name;
213            }
214    
215            public void setName(String name) {
216                    this.name = name;
217            }
218    
219            public int getNumberOfThreads() {
220                    return numberOfThreads;
221            }
222    
223            public void setNumberOfThreads(int numberOfThreads) {
224                    this.numberOfThreads = numberOfThreads;
225            }
226    
227            public int getPriority() {
228                    return priority;
229            }
230    
231            public void setPriority(int priority) {
232                    this.priority = priority;
233            }
234            
235            public void setExceptionSink(ExceptionSink exceptionSink) {
236                    this.exceptionSink = exceptionSink;
237            }
238    
239    }