001    /*
002     @license.text@
003      */
004    package biz.hammurapi.metrics;
005    
006    import java.util.Collection;
007    import java.util.HashMap;
008    import java.util.Iterator;
009    import java.util.LinkedList;
010    import java.util.Map;
011    import java.util.Timer;
012    import java.util.TimerTask;
013    
014    import biz.hammurapi.config.Component;
015    import biz.hammurapi.config.ConfigurationException;
016    
017    /**
018     * Slices metrics.
019     * @author Pavel Vlasov
020     * @version $Revision: 1.3 $
021     */
022    public class SlicingMeasurementConsumer implements MeasurementConsumer, Component {
023            
024            private class SliceEntry {
025                    String category;
026                    Slice slice;
027                    /**
028                     * @param category
029                     * @param slice
030                     */
031                    SliceEntry(String category, Slice slice) {
032                            super();
033                            this.category = category;
034                            this.slice = slice;
035                    }
036            }
037            
038            private Map slices=new HashMap();
039            private LinkedList sliceQueue=new LinkedList();
040            private boolean keepMeasurements=false;
041            private long tick=60000;
042            private int maxQueue=1000;
043            private SliceConsumer sliceConsumer=new ConsoleSliceConsumer();
044            private Timer timer;
045            private boolean isOwnTimer;
046            
047            protected SliceConsumer getSliceConsumer() {
048                    return sliceConsumer;
049            }
050            
051            /**
052             * Creates a new instance with internal timer.
053             * @param tick Slice size in milliseconds
054             * @param keepMeasurements If true individual measurements are reported, only aggregated values otherwise
055             * @param maxQueue Maximum number of slices pending to be consumed. 0 - no limit. If sampling ratio is higher than
056             * consuming ration then excessive slices will be dropped with a notice on console.
057             */
058            public SlicingMeasurementConsumer(long tick, boolean keepMeasurements, int maxQueue, SliceConsumer sliceConsumer) {
059                    this(tick, keepMeasurements, maxQueue, sliceConsumer, null);
060            }
061            
062            /**
063             * Creates a new instance with internal timer.
064             * @param tick Slice size in milliseconds
065             * @param keepMeasurements If true individual measurements are reported, only aggregated values otherwise
066             * @param maxQueue Maximum number of slices pending to be consumed. 0 - no limit. If sampling ratio is higher than
067             * consuming ration then excessive slices will be dropped with a notice on console.
068             * @param timer Timer to use for slicing metrics and passing them to slice consumer. If it is null then an internal timer is
069             * created.
070             */
071            public SlicingMeasurementConsumer(long tick, boolean keepMeasurements, int maxQueue, SliceConsumer sliceConsumer, Timer timer) {
072                    super();
073                    this.keepMeasurements = keepMeasurements;
074                    this.tick=tick;
075                    this.maxQueue=maxQueue;
076                    this.sliceConsumer=sliceConsumer;
077                    this.timer=timer;
078            }
079            
080            /**
081             * Default constructor with default settings.
082             */
083            public SlicingMeasurementConsumer() {
084                    super();
085            }
086                    
087            public void addMeasurement(String name, double value, long time) {
088                    synchronized (slices) {
089                            Slice slice=(Slice) slices.get(name);
090                            if (slice==null) {
091                                    slice=new SimpleSlice(name, keepMeasurements);
092                                    slices.put(name, slice);
093                            }
094                            slice.add(value, time);
095                            
096                            if (slice.getTo()-slice.getFrom()>=tick) {
097                                    slices.remove(name);
098                                    addSliceToQueue(null, slice);
099                            }
100                    }
101            }
102            
103            private Map instances=new HashMap();
104            
105            private class CategorizedConsumer implements MeasurementConsumer, Component {
106                    
107                    
108                    String category;
109                    Map slices=new HashMap();
110                    
111                    /**
112                     * @param category
113                     */
114                    public CategorizedConsumer(String category) {
115                            super();
116                            this.category = category;
117                    }
118                    
119                    public void addMeasurement(String name, double value, long time) {
120                            synchronized (slices) {
121                                    Slice slice=(Slice) slices.get(name);
122                                    if (slice==null) {
123                                            slice=new SimpleSlice(name, keepMeasurements);
124                                            slices.put(name, slice);
125                                    }
126                                    slice.add(value, time);
127                                    
128                                    if (slice.getTo()-slice.getFrom()>=tick) {
129                                            slices.remove(name);
130                                            addSliceToQueue(category, slice);
131                                    }
132                            }
133                    }
134    
135                    public void start() throws ConfigurationException {
136                            SlicingMeasurementConsumer.this.start();
137                    }
138    
139                    public void stop() throws ConfigurationException {
140                            SlicingMeasurementConsumer.this.stop();
141                    }
142    
143                    public void setOwner(Object owner) {
144                            // Ignore               
145                    }       
146            }
147            
148            /**
149             * @param category
150             * @return Instance for a category.
151             */
152            public MeasurementConsumer getCategoryInstance(final String category) {
153                    synchronized (instances) {
154                            MeasurementConsumer ret = (MeasurementConsumer) instances.get(category);
155                            if (ret==null) {
156                                    ret = new CategorizedConsumer(category);                                        
157                                    instances.put(category, ret);
158                            }
159                            return ret;
160                    }
161            }
162            
163            private int droppedCounter;
164            private long firstDropped;
165            private Thread slicingThread;
166            
167            public void shutdown() {
168                    to=System.currentTimeMillis();
169            
170            synchronized (slices) {
171                    Iterator it=slices.values().iterator();
172                    while (it.hasNext()) {
173                            Slice slice=(Slice) it.next();
174                            it.remove();
175                            addSliceToQueue(null, slice);
176                    }
177            }
178            
179            synchronized (instances) {
180                    Iterator iit=instances.values().iterator();
181                    while (iit.hasNext()) {
182                            CategorizedConsumer cConsumer=(CategorizedConsumer) iit.next();
183                            synchronized (cConsumer.slices) {
184                            Iterator it=cConsumer.slices.values().iterator();
185                            while (it.hasNext()) {
186                                    Slice slice=(Slice) it.next();
187                                    it.remove();
188                                    addSliceToQueue(cConsumer.category, slice);
189                            }
190                            }
191                    }
192            }                       
193                    
194    //              MeasurementCategoryFactory.unregister(this); - Unregister explicitly!!!
195                addSliceToQueue(null,null);
196                try {
197                            slicingThread.join();
198                    } catch (InterruptedException e) {
199                            throw new MetricsException(e);
200                    }
201                    
202                    // Stop the time is it is our own.
203                    if (isOwnTimer) {
204                            timer.cancel();
205                    }
206            }
207            
208            /**
209             * 
210             */
211            private void addSliceToQueue(String category, final Slice slice) {
212                    synchronized (sliceQueue) {
213                            if (slice!=null && maxQueue!=0 && sliceQueue.size()>maxQueue) {
214                                firstDropped=slice.getTo();
215                                    droppedCounter++;
216                            } else {
217                                    if (droppedCounter>0) {
218                                        sliceQueue.add(
219                                            new SliceEntry(
220                                                            "DroppedSlices",
221                                                                    new Slice() {
222    
223                                            public long getFrom() {                            
224                                                return firstDropped;
225                                            }
226                    
227                                            public long getTo() {
228                                                return slice.getTo();
229                                            }
230                    
231                                            public int getNumber() {
232                                                return droppedCounter;
233                                            }
234                    
235                                            public double getMin() {
236                                                return 0;
237                                            }
238                    
239                                            public double getMax() {
240                                                return 0;
241                                            }
242                    
243                                            public double getAvg() {
244                                                return 0;
245                                            }
246                    
247                                            public double getTotal() {
248                                                return 0;
249                                            }
250                    
251                                            public void add(double value, long time) {
252                                                throw new UnsupportedOperationException();
253                                            }
254                    
255                                            public void add(Metric metric) {
256                                                throw new UnsupportedOperationException();
257                                            }
258                    
259                                            public Collection getMeasurements() {
260                                                return null;
261                                            }
262                    
263                                            public String getName() {
264                                                return "DROPPED SLICES";
265                                            }
266                    
267                                            public double getDeviation() {
268                                                return 0;
269                                            }
270                                                            
271                                                        }));
272                                            droppedCounter=0;
273                                    }
274                                    
275                                    sliceQueue.add(slice==null ? null : new SliceEntry(category, slice));
276                                    sliceQueue.notifyAll();
277                            }
278                    }
279            }
280    
281            /**
282         * 
283         */
284        private void onTick() {
285            long now=System.currentTimeMillis();
286            
287            synchronized (slices) {
288                    Iterator it=slices.values().iterator();
289                    while (it.hasNext()) {
290                            Slice slice=(Slice) it.next();
291                            if (now-slice.getFrom()>=tick) {
292                                    it.remove();
293                                    addSliceToQueue(null, slice);
294                            }
295                    }
296            }
297            
298            synchronized (instances) {
299                    Iterator iit=instances.values().iterator();
300                    while (iit.hasNext()) {
301                            CategorizedConsumer cConsumer=(CategorizedConsumer) iit.next();
302                            synchronized (cConsumer.slices) {
303                            Iterator it=cConsumer.slices.values().iterator();
304                            while (it.hasNext()) {
305                                    Slice slice=(Slice) it.next();
306                                    if (now-slice.getFrom()>=tick) {
307                                            it.remove();
308                                            addSliceToQueue(cConsumer.category, slice);
309                                    }
310                            }
311                            }
312                    }
313            }
314            
315            if (sliceConsumer instanceof HousekeepingSliceConsumer) {
316                    ((HousekeepingSliceConsumer) sliceConsumer).onTick(now);
317            }
318        }
319        
320        {           
321                    slicingThread=new Thread() {
322                            {
323                                    setName("Slice queue processor");
324                                    setDaemon(true);
325                                    setPriority(Thread.MIN_PRIORITY);
326                                    start();
327                            }
328                            
329                            private SliceEntry getSliceEntry() throws InterruptedException {
330                                    synchronized (sliceQueue) {
331                                            while (sliceQueue.isEmpty()) {
332                                                    sliceQueue.wait();
333                                            }
334                                    
335                                            return (SliceEntry) sliceQueue.removeFirst();                                   
336                                    }
337                            }
338                            
339                            public void run() {
340                                    while (true) {
341                                            try {
342                                                    SliceEntry entry=getSliceEntry();
343                                                    if (entry==null) {
344                                                            if (sliceConsumer instanceof Component) {
345                                                                    try {
346                                                                            ((Component) sliceConsumer).stop();
347                                                                    } catch (ConfigurationException e1) {
348                                                                            e1.printStackTrace();
349                                                                    }
350                                                            }
351                                                            return;
352                                                    }
353                                                    
354                                                    // Put entry back to queue if slice consumer is unable to consume
355                                                    // and sleep.
356                                                    if (!sliceConsumer.consumeSlice(entry.category, entry.slice)) {
357                                                            synchronized (sliceQueue) {
358                                                                    sliceQueue.add(entry);
359                                                            }
360                                                            sleep(tick);
361                                                    }
362                                            } catch (InterruptedException e) {
363                                                    return;
364                                            }
365                                    }                               
366                            }
367                    };              
368            }
369    
370        private int useCounter;
371        
372        protected long from;
373        protected long to;
374        
375        private TimerTask tickTask;
376        
377        /**
378         * Increments use counter
379         */
380            public void start() throws ConfigurationException {
381                    if (timer==null) {
382                            timer=new Timer(true);
383                            isOwnTimer=true;
384                    }
385                    
386                    tickTask = new TimerTask() {
387                            public void run() {
388                                    onTick();
389                            }
390                    };
391                    
392                    timer.scheduleAtFixedRate(tickTask, tick, tick);
393                    
394                    if (useCounter==0) {
395                            from=System.currentTimeMillis();
396                            if (sliceConsumer instanceof Component) {
397                                    ((Component) sliceConsumer).start();
398                            }
399                    }
400                    
401                    ++useCounter;
402            }
403    
404            /**
405             * Decrements use counter and invokes shutdown() when counter==0
406             */
407            public void stop() throws ConfigurationException {
408                    if (--useCounter==0) {
409                            shutdown();
410                    }
411            }    
412    
413            public void setOwner(Object owner) {
414                    // Ignore               
415            }
416    
417            public int getMaxQueue() {
418                    return maxQueue;
419            }
420    
421            public void setMaxQueue(int maxQueue) {
422                    this.maxQueue = maxQueue;
423            }
424    
425            public long getTick() {
426                    return tick;
427            }
428    
429            public void setTick(long tick) {
430                    this.tick = tick;
431            }
432    
433            public void setSliceConsumer(SliceConsumer sliceConsumer) {
434                    this.sliceConsumer = sliceConsumer;
435            }
436    
437            public boolean isKeepMeasurements() {
438                    return keepMeasurements;
439            }
440    
441            public void setKeepMeasurements(boolean keepMeasurements) {
442                    this.keepMeasurements = keepMeasurements;
443            }       
444            
445            
446    }