001 package biz.hammurapi.metrics; 002 003 import java.io.IOException; 004 import java.io.ObjectInputStream; 005 import java.net.InetAddress; 006 import java.net.ServerSocket; 007 import java.net.Socket; 008 import java.util.Iterator; 009 import java.util.List; 010 011 import biz.hammurapi.config.Component; 012 import biz.hammurapi.config.ConfigurationException; 013 import biz.hammurapi.metrics.ConsoleSliceConsumer; 014 import biz.hammurapi.metrics.SliceConsumer; 015 import biz.hammurapi.metrics.BatchingSliceConsumer.SliceEntry; 016 017 018 public class SocketSliceConsumerServer implements Component { 019 020 private SliceConsumer delegate = new ConsoleSliceConsumer(); 021 022 private int port = SocketSliceConsumer.DEFAULT_PORT; 023 024 /** 025 * Set server listening port 026 * @param port 027 */ 028 public void setPort(int port) { 029 this.port = port; 030 } 031 032 ServerSocket serverSocket; 033 034 public void setOwner(Object owner) { 035 // Nothing 036 037 } 038 039 public void start() throws ConfigurationException { 040 try { 041 serverSocket = new ServerSocket(port); 042 Thread listeningThread = new Thread() { 043 public void run() { 044 while (true) { 045 try { 046 process(serverSocket.accept()); 047 } catch (IOException e) { 048 consumeException(e); 049 return; 050 } 051 } 052 } 053 }; 054 listeningThread.setDaemon(true); 055 listeningThread.setName("Slice consumer listener"); 056 listeningThread.start(); 057 } catch (IOException e) { 058 throw new ConfigurationException("Could not create server socket: "+e,e); 059 } 060 } 061 062 public void stop() throws ConfigurationException { 063 if (serverSocket!=null) { 064 try { 065 serverSocket.close(); 066 } catch (IOException e) { 067 throw new ConfigurationException("Could not close server socket: "+e, e); 068 } 069 } 070 } 071 072 /** 073 * @return SliceConsumer to delegate processing. This class returns ConsoleSliceConsumer. 074 * Subclasses can override this method. 075 */ 076 protected SliceConsumer getDelegate() { 077 return delegate; 078 } 079 080 /** 081 * This method iterates over slices in the batch 082 * and sends them to the delegate SliceConsumer returned 083 * by getDelegate() method. 084 * @param slices List of slice entries obtained from clients 085 */ 086 protected void processSlices(List slices, InetAddress address, String id) { 087 Iterator it = slices.iterator(); 088 while (it.hasNext()) { 089 SliceEntry se = (SliceEntry) it.next(); 090 getDelegate().consumeSlice("["+address+":"+id+"] "+se.getCategory(), se.getSlice()); 091 } 092 } 093 094 /** 095 * Spawns a new thread for each socket 096 * @param socket 097 */ 098 protected void process(final Socket socket) { 099 Runnable job = new Runnable() { 100 public void run() { 101 try { 102 ObjectInputStream ois = new ObjectInputStream(socket.getInputStream()); 103 try { 104 String id = (String) ois.readObject(); 105 processSlices((List) ois.readObject(), socket.getInetAddress(), id); 106 } finally { 107 ois.close(); 108 socket.close(); 109 } 110 } catch (Exception e) { 111 consumeException(e); 112 } 113 } 114 }; 115 process(job); 116 } 117 118 /** 119 * Spawns a new thread for each job. 120 * Override to use thread pools on high loads. 121 * @param job 122 */ 123 protected void process(Runnable job) { 124 new Thread(job, "Slice consumer").start(); 125 } 126 127 /** 128 * Consumes exceptions in processing threads. 129 * @param e 130 */ 131 protected void consumeException(Exception e) { 132 e.printStackTrace(); 133 } 134 135 public static void main(String[] args) throws ConfigurationException { 136 System.out.println("Usage: java [options] biz.hammurapi.metrics.SocketSliceConsumerServer [port]"); 137 int port = SocketSliceConsumer.DEFAULT_PORT; 138 System.out.println("Default port: "+port); 139 if (args.length>0) { 140 try { 141 port = Integer.parseInt(args[0]); 142 } catch (NumberFormatException e) { 143 System.err.println("Invalid port number: "+args[0]); 144 System.exit(1); 145 } 146 } 147 148 SocketSliceConsumerServer server = new SocketSliceConsumerServer(); 149 server.setPort(port); 150 server.start(); 151 } 152 153 }