001    package biz.hammurapi.metrics;
002    
003    import java.io.IOException;
004    import java.io.ObjectInputStream;
005    import java.net.InetAddress;
006    import java.net.ServerSocket;
007    import java.net.Socket;
008    import java.util.Iterator;
009    import java.util.List;
010    
011    import biz.hammurapi.config.Component;
012    import biz.hammurapi.config.ConfigurationException;
013    import biz.hammurapi.metrics.ConsoleSliceConsumer;
014    import biz.hammurapi.metrics.SliceConsumer;
015    import biz.hammurapi.metrics.BatchingSliceConsumer.SliceEntry;
016    
017    
018    public class SocketSliceConsumerServer implements Component {
019            
020            private SliceConsumer delegate = new ConsoleSliceConsumer();
021            
022            private int port = SocketSliceConsumer.DEFAULT_PORT;
023            
024            /**
025             * Set server listening port
026             * @param port
027             */
028            public void setPort(int port) {
029                    this.port = port;
030            }
031            
032            ServerSocket serverSocket;
033    
034            public void setOwner(Object owner) {
035                    // Nothing
036    
037            }
038    
039            public void start() throws ConfigurationException {
040                    try {
041                            serverSocket = new ServerSocket(port);
042                            Thread listeningThread = new Thread() {
043                                    public void run() {
044                                            while (true) {
045                                                    try {
046                                                            process(serverSocket.accept());
047                                                    } catch (IOException e) {
048                                                            consumeException(e);
049                                                            return;
050                                                    }
051                                            }
052                                    }
053                            };
054                            listeningThread.setDaemon(true);
055                            listeningThread.setName("Slice consumer listener");
056                            listeningThread.start();
057                    } catch (IOException e) {
058                            throw new ConfigurationException("Could not create server socket: "+e,e);
059                    }
060            }
061    
062            public void stop() throws ConfigurationException {
063                    if (serverSocket!=null) {
064                            try {
065                                    serverSocket.close();
066                            } catch (IOException e) {
067                                    throw new ConfigurationException("Could not close server socket: "+e, e);
068                            }
069                    }
070            }
071            
072            /**
073             * @return SliceConsumer to delegate processing. This class returns ConsoleSliceConsumer.
074             * Subclasses can override this method.
075             */
076            protected SliceConsumer getDelegate() {
077                    return delegate;
078            }
079            
080            /**
081             * This method iterates over slices in the batch
082             * and sends them to the delegate SliceConsumer returned
083             * by getDelegate() method.  
084             * @param slices List of slice entries obtained from clients
085             */
086            protected void processSlices(List slices, InetAddress address, String id) {
087                    Iterator it = slices.iterator();
088                    while (it.hasNext()) {
089                            SliceEntry se = (SliceEntry) it.next();
090                            getDelegate().consumeSlice("["+address+":"+id+"] "+se.getCategory(), se.getSlice());
091                    }               
092            }
093            
094            /**
095             * Spawns a new thread for each socket
096             * @param socket
097             */
098            protected void process(final Socket socket) {
099                    Runnable job = new Runnable() {
100                            public void run() {
101                                    try {
102                                            ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
103                                            try {
104                                                    String id = (String) ois.readObject();
105                                                    processSlices((List) ois.readObject(), socket.getInetAddress(), id);
106                                            } finally {
107                                                    ois.close();
108                                                    socket.close();
109                                            }
110                                    } catch (Exception e) {
111                                            consumeException(e);
112                                    }
113                            }
114                    };
115                    process(job);           
116            }
117            
118            /**
119             * Spawns a new thread for each job. 
120             * Override to use thread pools on high loads.
121             * @param job
122             */
123            protected void process(Runnable job) {
124                    new Thread(job, "Slice consumer").start();
125            }
126            
127            /**
128             * Consumes exceptions in processing threads.
129             * @param e
130             */
131            protected void consumeException(Exception e) {
132                    e.printStackTrace();            
133            }
134            
135            public static void main(String[] args) throws ConfigurationException {
136                    System.out.println("Usage: java [options] biz.hammurapi.metrics.SocketSliceConsumerServer [port]");
137                    int port = SocketSliceConsumer.DEFAULT_PORT;
138                    System.out.println("Default port: "+port);
139                    if (args.length>0) {
140                            try {
141                                    port = Integer.parseInt(args[0]);
142                            } catch (NumberFormatException e) {
143                                    System.err.println("Invalid port number: "+args[0]);
144                                    System.exit(1);
145                            }
146                    }
147                    
148                    SocketSliceConsumerServer server = new SocketSliceConsumerServer();
149                    server.setPort(port);
150                    server.start();
151            }
152    
153    }