001 package biz.hammurapi.metrics; 002 003 import java.io.Serializable; 004 import java.util.LinkedList; 005 import java.util.List; 006 import java.util.Timer; 007 import java.util.TimerTask; 008 009 import biz.hammurapi.config.Component; 010 import biz.hammurapi.config.ConfigurationException; 011 import biz.hammurapi.metrics.SimpleSlice; 012 import biz.hammurapi.metrics.Slice; 013 import biz.hammurapi.metrics.SliceConsumer; 014 015 016 /** 017 * Collects slices for a period of time and processes them in batches. 018 * @author Pavel Vlasov 019 */ 020 public abstract class BatchingSliceConsumer implements SliceConsumer, Component { 021 private static final int MAX_SLICES = 10000; // maximum number of slices to keep in the send queue 022 023 private int maxSlices = MAX_SLICES; 024 private long interval = 60000; // Default batching period is one minute. 025 026 /** 027 * Sets maximum number of slices to be retained if processing fails. 028 * @param maxSlices 029 */ 030 public void setMaxSlices(int maxSlices) { 031 this.maxSlices = maxSlices; 032 } 033 034 /** 035 * Sets batching interval. Default interval is one minute. 036 * @param interval 037 */ 038 public void setInterval(long interval) { 039 this.interval = interval; 040 } 041 042 private LinkedList slices = new LinkedList(); 043 044 private static final Timer timer = new Timer(true); 045 private Thread shutdownHook; 046 047 private TimerTask task; 048 049 /** 050 * Helper class to serialize accumulated slices. 051 * @author Pavel Vlasov 052 */ 053 public static class SliceEntry implements Serializable { 054 String category; 055 Slice slice; 056 057 SliceEntry(String category, Slice slice) { 058 this.category = category; 059 if (slice instanceof SimpleSlice) { 060 this.slice = slice; 061 } else { 062 this.slice = new SimpleSlice(slice); 063 } 064 } 065 066 public String getCategory() { 067 return category; 068 } 069 070 public Slice getSlice() { 071 return slice; 072 } 073 } 074 075 public synchronized boolean consumeSlice(String category, Slice slice) { 076 slices.add(new SliceEntry(category, slice)); 077 return true; 078 } 079 080 public void setOwner(Object owner) { 081 // Nothing 082 083 } 084 085 public void start() throws ConfigurationException { 086 if (interval<=0) { 087 throw new ConfigurationException("Invalid interval: "+interval); 088 } 089 task = new TimerTask() { 090 091 public void run() { 092 LinkedList slicesToProcess; 093 synchronized (BatchingSliceConsumer.this) { 094 if (slices.isEmpty()) { 095 return; 096 } 097 slicesToProcess = slices; 098 slices = new LinkedList(); 099 } 100 101 if (!processSlices(slicesToProcess)) { 102 synchronized (BatchingSliceConsumer.this) { 103 slicesToProcess.addAll(slices); 104 slices = slicesToProcess; 105 while (slicesToProcess.size()>maxSlices) { 106 slicesToProcess.removeFirst(); 107 } 108 } 109 } 110 } 111 112 }; 113 timer.schedule(task, interval, interval); 114 shutdownHook = new Thread(task); 115 Runtime.getRuntime().addShutdownHook(shutdownHook); 116 } 117 118 public void stop() throws ConfigurationException { 119 if (task!=null) { 120 task.cancel(); 121 if (shutdownHook!=null) { 122 try { 123 Runtime.getRuntime().removeShutdownHook(shutdownHook); 124 } catch (Exception e) { 125 // We don't care 126 } 127 } 128 task.run(); 129 } 130 } 131 132 /** 133 * Subclasses shall implement this method. 134 * @param slices 135 * @return true if slices was successfully processed. 136 */ 137 protected abstract boolean processSlices(List slices); 138 139 }