001    package biz.hammurapi.metrics;
002    
003    import java.io.Serializable;
004    import java.util.LinkedList;
005    import java.util.List;
006    import java.util.Timer;
007    import java.util.TimerTask;
008    
009    import biz.hammurapi.config.Component;
010    import biz.hammurapi.config.ConfigurationException;
011    import biz.hammurapi.metrics.SimpleSlice;
012    import biz.hammurapi.metrics.Slice;
013    import biz.hammurapi.metrics.SliceConsumer;
014    
015    
016    /**
017     * Collects slices for a period of time and processes them in batches.
018     * @author Pavel Vlasov
019     */
020    public abstract class BatchingSliceConsumer implements SliceConsumer, Component {
021            private static final int MAX_SLICES = 10000; // maximum number of slices to keep in the send queue
022            
023            private int maxSlices = MAX_SLICES;
024            private long interval = 60000; // Default batching period is one minute.
025            
026            /**
027             * Sets maximum number of slices to be retained if processing fails.
028             * @param maxSlices
029             */
030            public void setMaxSlices(int maxSlices) {
031                    this.maxSlices = maxSlices;
032            }
033            
034            /**
035             * Sets batching interval. Default interval is one minute.
036             * @param interval
037             */
038            public void setInterval(long interval) {
039                    this.interval = interval;
040            }
041            
042            private LinkedList slices = new LinkedList();
043            
044            private static final Timer timer = new Timer(true);
045            private Thread shutdownHook;
046    
047            private TimerTask task;
048            
049            /**
050             * Helper class to serialize accumulated slices.
051             * @author Pavel Vlasov
052             */
053            public static class SliceEntry implements Serializable {
054                    String category;
055                    Slice slice;
056                    
057                    SliceEntry(String category, Slice slice) {
058                            this.category = category;
059                            if (slice instanceof SimpleSlice) {
060                                    this.slice = slice;
061                            } else {
062                                    this.slice = new SimpleSlice(slice);
063                            }
064                    }
065                    
066                    public String getCategory() {
067                            return category;
068                    }
069                    
070                    public Slice getSlice() {
071                            return slice;
072                    }
073            }
074    
075            public synchronized boolean consumeSlice(String category, Slice slice) {
076                    slices.add(new SliceEntry(category, slice));
077                    return true;
078            }
079    
080            public void setOwner(Object owner) {
081                    // Nothing
082                    
083            }
084    
085            public void start() throws ConfigurationException {
086                    if (interval<=0) {
087                            throw new ConfigurationException("Invalid interval: "+interval);
088                    }
089                    task = new TimerTask() {
090    
091                            public void run() {
092                                    LinkedList slicesToProcess;
093                                    synchronized (BatchingSliceConsumer.this) {
094                                            if (slices.isEmpty()) {
095                                                    return;
096                                            }
097                                            slicesToProcess = slices;
098                                            slices = new LinkedList();
099                                    }
100                                    
101                                    if (!processSlices(slicesToProcess)) {
102                                            synchronized (BatchingSliceConsumer.this) {
103                                                    slicesToProcess.addAll(slices);
104                                                    slices = slicesToProcess;
105                                                    while (slicesToProcess.size()>maxSlices) {
106                                                            slicesToProcess.removeFirst();
107                                                    }
108                                            }
109                                    }                               
110                            }
111                            
112                    };
113                    timer.schedule(task, interval, interval);
114                    shutdownHook = new Thread(task);
115                    Runtime.getRuntime().addShutdownHook(shutdownHook);
116            }
117    
118            public void stop() throws ConfigurationException {
119                    if (task!=null) {
120                            task.cancel();
121                            if (shutdownHook!=null) {
122                                    try {
123                                            Runtime.getRuntime().removeShutdownHook(shutdownHook);
124                                    } catch (Exception e) {
125                                            // We don't care
126                                    }
127                            }
128                            task.run();
129                    }               
130            }
131            
132            /**
133             * Subclasses shall implement this method.
134             * @param slices
135             * @return true if slices was successfully processed.
136             */
137            protected abstract boolean processSlices(List slices);
138    
139    }