001 /* 002 @license.text@ 003 */ 004 package biz.hammurapi.metrics; 005 006 import java.util.Collection; 007 import java.util.HashMap; 008 import java.util.Iterator; 009 import java.util.LinkedList; 010 import java.util.Map; 011 import java.util.Timer; 012 import java.util.TimerTask; 013 014 import biz.hammurapi.config.Component; 015 import biz.hammurapi.config.ConfigurationException; 016 017 /** 018 * Slices metrics. 019 * @author Pavel Vlasov 020 * @version $Revision: 1.3 $ 021 */ 022 public class SlicingMeasurementConsumer implements MeasurementConsumer, Component { 023 024 private class SliceEntry { 025 String category; 026 Slice slice; 027 /** 028 * @param category 029 * @param slice 030 */ 031 SliceEntry(String category, Slice slice) { 032 super(); 033 this.category = category; 034 this.slice = slice; 035 } 036 } 037 038 private Map slices=new HashMap(); 039 private LinkedList sliceQueue=new LinkedList(); 040 private boolean keepMeasurements=false; 041 private long tick=60000; 042 private int maxQueue=1000; 043 private SliceConsumer sliceConsumer=new ConsoleSliceConsumer(); 044 private Timer timer; 045 private boolean isOwnTimer; 046 047 protected SliceConsumer getSliceConsumer() { 048 return sliceConsumer; 049 } 050 051 /** 052 * Creates a new instance with internal timer. 053 * @param tick Slice size in milliseconds 054 * @param keepMeasurements If true individual measurements are reported, only aggregated values otherwise 055 * @param maxQueue Maximum number of slices pending to be consumed. 0 - no limit. If sampling ratio is higher than 056 * consuming ration then excessive slices will be dropped with a notice on console. 057 */ 058 public SlicingMeasurementConsumer(long tick, boolean keepMeasurements, int maxQueue, SliceConsumer sliceConsumer) { 059 this(tick, keepMeasurements, maxQueue, sliceConsumer, null); 060 } 061 062 /** 063 * Creates a new instance with internal timer. 064 * @param tick Slice size in milliseconds 065 * @param keepMeasurements If true individual measurements are reported, only aggregated values otherwise 066 * @param maxQueue Maximum number of slices pending to be consumed. 0 - no limit. If sampling ratio is higher than 067 * consuming ration then excessive slices will be dropped with a notice on console. 068 * @param timer Timer to use for slicing metrics and passing them to slice consumer. If it is null then an internal timer is 069 * created. 070 */ 071 public SlicingMeasurementConsumer(long tick, boolean keepMeasurements, int maxQueue, SliceConsumer sliceConsumer, Timer timer) { 072 super(); 073 this.keepMeasurements = keepMeasurements; 074 this.tick=tick; 075 this.maxQueue=maxQueue; 076 this.sliceConsumer=sliceConsumer; 077 this.timer=timer; 078 } 079 080 /** 081 * Default constructor with default settings. 082 */ 083 public SlicingMeasurementConsumer() { 084 super(); 085 } 086 087 public void addMeasurement(String name, double value, long time) { 088 synchronized (slices) { 089 Slice slice=(Slice) slices.get(name); 090 if (slice==null) { 091 slice=new SimpleSlice(name, keepMeasurements); 092 slices.put(name, slice); 093 } 094 slice.add(value, time); 095 096 if (slice.getTo()-slice.getFrom()>=tick) { 097 slices.remove(name); 098 addSliceToQueue(null, slice); 099 } 100 } 101 } 102 103 private Map instances=new HashMap(); 104 105 private class CategorizedConsumer implements MeasurementConsumer, Component { 106 107 108 String category; 109 Map slices=new HashMap(); 110 111 /** 112 * @param category 113 */ 114 public CategorizedConsumer(String category) { 115 super(); 116 this.category = category; 117 } 118 119 public void addMeasurement(String name, double value, long time) { 120 synchronized (slices) { 121 Slice slice=(Slice) slices.get(name); 122 if (slice==null) { 123 slice=new SimpleSlice(name, keepMeasurements); 124 slices.put(name, slice); 125 } 126 slice.add(value, time); 127 128 if (slice.getTo()-slice.getFrom()>=tick) { 129 slices.remove(name); 130 addSliceToQueue(category, slice); 131 } 132 } 133 } 134 135 public void start() throws ConfigurationException { 136 SlicingMeasurementConsumer.this.start(); 137 } 138 139 public void stop() throws ConfigurationException { 140 SlicingMeasurementConsumer.this.stop(); 141 } 142 143 public void setOwner(Object owner) { 144 // Ignore 145 } 146 } 147 148 /** 149 * @param category 150 * @return Instance for a category. 151 */ 152 public MeasurementConsumer getCategoryInstance(final String category) { 153 synchronized (instances) { 154 MeasurementConsumer ret = (MeasurementConsumer) instances.get(category); 155 if (ret==null) { 156 ret = new CategorizedConsumer(category); 157 instances.put(category, ret); 158 } 159 return ret; 160 } 161 } 162 163 private int droppedCounter; 164 private long firstDropped; 165 private Thread slicingThread; 166 167 public void shutdown() { 168 to=System.currentTimeMillis(); 169 170 synchronized (slices) { 171 Iterator it=slices.values().iterator(); 172 while (it.hasNext()) { 173 Slice slice=(Slice) it.next(); 174 it.remove(); 175 addSliceToQueue(null, slice); 176 } 177 } 178 179 synchronized (instances) { 180 Iterator iit=instances.values().iterator(); 181 while (iit.hasNext()) { 182 CategorizedConsumer cConsumer=(CategorizedConsumer) iit.next(); 183 synchronized (cConsumer.slices) { 184 Iterator it=cConsumer.slices.values().iterator(); 185 while (it.hasNext()) { 186 Slice slice=(Slice) it.next(); 187 it.remove(); 188 addSliceToQueue(cConsumer.category, slice); 189 } 190 } 191 } 192 } 193 194 // MeasurementCategoryFactory.unregister(this); - Unregister explicitly!!! 195 addSliceToQueue(null,null); 196 try { 197 slicingThread.join(); 198 } catch (InterruptedException e) { 199 throw new MetricsException(e); 200 } 201 202 // Stop the time is it is our own. 203 if (isOwnTimer) { 204 timer.cancel(); 205 } 206 } 207 208 /** 209 * 210 */ 211 private void addSliceToQueue(String category, final Slice slice) { 212 synchronized (sliceQueue) { 213 if (slice!=null && maxQueue!=0 && sliceQueue.size()>maxQueue) { 214 firstDropped=slice.getTo(); 215 droppedCounter++; 216 } else { 217 if (droppedCounter>0) { 218 sliceQueue.add( 219 new SliceEntry( 220 "DroppedSlices", 221 new Slice() { 222 223 public long getFrom() { 224 return firstDropped; 225 } 226 227 public long getTo() { 228 return slice.getTo(); 229 } 230 231 public int getNumber() { 232 return droppedCounter; 233 } 234 235 public double getMin() { 236 return 0; 237 } 238 239 public double getMax() { 240 return 0; 241 } 242 243 public double getAvg() { 244 return 0; 245 } 246 247 public double getTotal() { 248 return 0; 249 } 250 251 public void add(double value, long time) { 252 throw new UnsupportedOperationException(); 253 } 254 255 public void add(Metric metric) { 256 throw new UnsupportedOperationException(); 257 } 258 259 public Collection getMeasurements() { 260 return null; 261 } 262 263 public String getName() { 264 return "DROPPED SLICES"; 265 } 266 267 public double getDeviation() { 268 return 0; 269 } 270 271 })); 272 droppedCounter=0; 273 } 274 275 sliceQueue.add(slice==null ? null : new SliceEntry(category, slice)); 276 sliceQueue.notifyAll(); 277 } 278 } 279 } 280 281 /** 282 * 283 */ 284 private void onTick() { 285 long now=System.currentTimeMillis(); 286 287 synchronized (slices) { 288 Iterator it=slices.values().iterator(); 289 while (it.hasNext()) { 290 Slice slice=(Slice) it.next(); 291 if (now-slice.getFrom()>=tick) { 292 it.remove(); 293 addSliceToQueue(null, slice); 294 } 295 } 296 } 297 298 synchronized (instances) { 299 Iterator iit=instances.values().iterator(); 300 while (iit.hasNext()) { 301 CategorizedConsumer cConsumer=(CategorizedConsumer) iit.next(); 302 synchronized (cConsumer.slices) { 303 Iterator it=cConsumer.slices.values().iterator(); 304 while (it.hasNext()) { 305 Slice slice=(Slice) it.next(); 306 if (now-slice.getFrom()>=tick) { 307 it.remove(); 308 addSliceToQueue(cConsumer.category, slice); 309 } 310 } 311 } 312 } 313 } 314 315 if (sliceConsumer instanceof HousekeepingSliceConsumer) { 316 ((HousekeepingSliceConsumer) sliceConsumer).onTick(now); 317 } 318 } 319 320 { 321 slicingThread=new Thread() { 322 { 323 setName("Slice queue processor"); 324 setDaemon(true); 325 setPriority(Thread.MIN_PRIORITY); 326 start(); 327 } 328 329 private SliceEntry getSliceEntry() throws InterruptedException { 330 synchronized (sliceQueue) { 331 while (sliceQueue.isEmpty()) { 332 sliceQueue.wait(); 333 } 334 335 return (SliceEntry) sliceQueue.removeFirst(); 336 } 337 } 338 339 public void run() { 340 while (true) { 341 try { 342 SliceEntry entry=getSliceEntry(); 343 if (entry==null) { 344 if (sliceConsumer instanceof Component) { 345 try { 346 ((Component) sliceConsumer).stop(); 347 } catch (ConfigurationException e1) { 348 e1.printStackTrace(); 349 } 350 } 351 return; 352 } 353 354 // Put entry back to queue if slice consumer is unable to consume 355 // and sleep. 356 if (!sliceConsumer.consumeSlice(entry.category, entry.slice)) { 357 synchronized (sliceQueue) { 358 sliceQueue.add(entry); 359 } 360 sleep(tick); 361 } 362 } catch (InterruptedException e) { 363 return; 364 } 365 } 366 } 367 }; 368 } 369 370 private int useCounter; 371 372 protected long from; 373 protected long to; 374 375 private TimerTask tickTask; 376 377 /** 378 * Increments use counter 379 */ 380 public void start() throws ConfigurationException { 381 if (timer==null) { 382 timer=new Timer(true); 383 isOwnTimer=true; 384 } 385 386 tickTask = new TimerTask() { 387 public void run() { 388 onTick(); 389 } 390 }; 391 392 timer.scheduleAtFixedRate(tickTask, tick, tick); 393 394 if (useCounter==0) { 395 from=System.currentTimeMillis(); 396 if (sliceConsumer instanceof Component) { 397 ((Component) sliceConsumer).start(); 398 } 399 } 400 401 ++useCounter; 402 } 403 404 /** 405 * Decrements use counter and invokes shutdown() when counter==0 406 */ 407 public void stop() throws ConfigurationException { 408 if (--useCounter==0) { 409 shutdown(); 410 } 411 } 412 413 public void setOwner(Object owner) { 414 // Ignore 415 } 416 417 public int getMaxQueue() { 418 return maxQueue; 419 } 420 421 public void setMaxQueue(int maxQueue) { 422 this.maxQueue = maxQueue; 423 } 424 425 public long getTick() { 426 return tick; 427 } 428 429 public void setTick(long tick) { 430 this.tick = tick; 431 } 432 433 public void setSliceConsumer(SliceConsumer sliceConsumer) { 434 this.sliceConsumer = sliceConsumer; 435 } 436 437 public boolean isKeepMeasurements() { 438 return keepMeasurements; 439 } 440 441 public void setKeepMeasurements(boolean keepMeasurements) { 442 this.keepMeasurements = keepMeasurements; 443 } 444 445 446 }