001    package biz.hammurapi.jms.adapter;
002    
003    import java.util.Hashtable;
004    import java.util.logging.Level;
005    import java.util.logging.Logger;
006    
007    import javax.jms.Destination;
008    import javax.jms.JMSException;
009    import javax.jms.Message;
010    import javax.jms.MessageConsumer;
011    import javax.jms.MessageListener;
012    import javax.jms.MessageProducer;
013    import javax.jms.Session;
014    
015    import org.apache.commons.pool.ObjectPool;
016    import org.apache.commons.pool.PoolableObjectFactory;
017    import org.apache.commons.pool.impl.SoftReferenceObjectPool;
018    
019    import biz.hammurapi.config.Component;
020    import biz.hammurapi.config.ConfigurationException;
021    import biz.hammurapi.config.PoolableComponent;
022    import biz.hammurapi.config.ServiceBase;
023    import biz.hammurapi.config.Wrapper;
024    import biz.hammurapi.metrics.MeasurementCollector;
025    import biz.hammurapi.metrics.MeasurementConsumer;
026    import biz.hammurapi.util.ExceptionSink;
027    import biz.hammurapi.util.Worker;
028    
029    /**
030     * Listens for JMS messages, converts them to objects using Converter and processes
031     * them using processor. Processors can implement Observer/Observable pattern and pass received
032     * objects to registered observers/listeners. 
033     * @author Pavel
034     */
035    public class Listener extends ServiceBase implements MessageListener, Wrapper {
036    
037            private Logger logger = Logger.getLogger(Listener.class.getName()); 
038            
039            private biz.hammurapi.jms.adapter.definition.Listener definition;
040            private Hashtable properties;
041            private Processor processor;
042            private ObjectPool pool;
043            
044            public Listener(final biz.hammurapi.jms.adapter.definition.Listener definition) throws ConfigurationException {
045                    this.definition = definition;
046                    properties = JmsAdapter.instantiate(definition.getPropertyArray());
047                    if (definition.getPooled()) {
048                            PoolableObjectFactory objectFactory = new PoolableObjectFactory() {
049    
050                                    public void activateObject(Object obj) throws Exception {
051                                            if (obj instanceof PoolableComponent) {
052                                                    ((PoolableComponent) obj).activate();
053                                            }                                       
054                                    }
055    
056                                    public void destroyObject(Object obj) throws Exception {
057                                            if (obj instanceof Component) {
058                                                    ((Component) obj).stop();
059                                            }                                       
060                                    }
061    
062                                    public Object makeObject() throws Exception {
063                                            Object ret = (Processor) JmsAdapter.instantiate(definition.getProcessor());
064                                            if (ret instanceof Component) {
065                                                    ((Component) ret).setOwner(this);
066                                                    ((Component) ret).start();
067                                            }
068                                            if (ret instanceof MeasurementCollector) {
069                                                    MeasurementConsumer cmc = new MeasurementConsumer() {
070                                                            public void addMeasurement(String mName, double value, long time) {
071                                                                    MeasurementConsumer measurementConsumer = getMeasurementConsumer();
072                                                                    if (measurementConsumer != null) {
073                                                                            measurementConsumer.addMeasurement("processor."+mName, value, time==0 ? System.currentTimeMillis() : time);
074                                                                    }                                               
075                                                            }
076                                                    };
077                                                    ((MeasurementCollector) ret).setMeasurementConsumer(cmc);
078                                            }                                       
079                                            return ret;
080                                    }
081    
082                                    public void passivateObject(Object obj) throws Exception {
083                                            if (obj instanceof PoolableComponent) {
084                                                    ((PoolableComponent) obj).passivate();
085                                            }                                       
086                                    }
087    
088                                    public boolean validateObject(Object obj) {
089                                            if (obj instanceof PoolableComponent) {
090                                                    return ((PoolableComponent) obj).validate();
091                                            }       
092                                            
093                                            return true;
094                                    }
095                                    
096                            };
097                            pool = new SoftReferenceObjectPool(objectFactory);
098                    } else {
099                            processor = (Processor) JmsAdapter.instantiate(definition.getProcessor());
100                            if (processor instanceof Component) {
101                                    ((Component) processor).setOwner(this);
102                            }
103                            if (processor instanceof MeasurementCollector) {
104                                    MeasurementConsumer cmc = new MeasurementConsumer() {
105                                            public void addMeasurement(String mName, double value, long time) {
106                                                    MeasurementConsumer measurementConsumer = getMeasurementConsumer();
107                                                    if (measurementConsumer != null) {
108                                                            measurementConsumer.addMeasurement("processor."+mName, value, time==0 ? System.currentTimeMillis() : time);
109                                                    }                                               
110                                            }
111                                    };
112                                    ((MeasurementCollector) processor).setMeasurementConsumer(cmc);
113                            }                                       
114                    }
115            }
116            
117            private Destination getDestination(Session session) throws JMSException {
118                    if (definition.getQueueFromSession()) {
119                            return session.createQueue(definition.getDestination());
120                    }
121                    
122                    if (definition.getTopicFromSession()) {
123                            return session.createTopic(definition.getDestination());                                                
124                    }
125                    
126                    return destination;
127            }
128            
129            private Destination getReplyDestination(Session session) throws JMSException {
130                    if (definition.getQueueFromSession()) {
131                            return session.createQueue(definition.getReplyDestination());
132                    }
133                    
134                    if (definition.getTopicFromSession()) {
135                            return session.createTopic(definition.getReplyDestination());                                           
136                    }
137                    
138                    return replyDestination;
139            }
140            
141            private Destination destination;
142            private Destination replyDestination;
143            private Converter converter;
144            private Worker worker;
145    
146            protected void stopInternal() throws ConfigurationException {
147                    if (processor instanceof Component) {
148                            ((Component) processor).stop();
149                    }
150                    if (pool!=null) {
151                            try {
152                                    pool.close();
153                            } catch (Exception e) {
154                                    throw new ConfigurationException("Could not close pool: "+e, e);
155                            }
156                    }
157            }
158    
159            public void startInternal() throws ConfigurationException {
160                    if (!definition.getQueueFromSession() && !definition.getTopicFromSession()) {           
161                            destination = ((DestinationResolver) getOwner(DestinationResolver.class)).lookupDestination(definition.getDestination());
162                            if (!JmsAdapter.isBlank(definition.getReplyDestination())) {
163                                    replyDestination = ((DestinationResolver) getOwner(DestinationResolver.class)).lookupDestination(definition.getReplyDestination());
164                            }
165                    }
166                    
167                    converter = (Converter) get("/bind-types/"+definition.getBindType());
168                    if (processor instanceof Component) {
169                            ((Component) processor).start();
170                    }
171                    
172                    if (definition.getWorker()!=null) {
173                            worker = (Worker) get("/workers/"+definition.getWorker());
174                    }
175            }
176    
177            /**
178             * Adds listener to JMS session
179             * @param session
180             * @throws JMSException
181             */
182            public void enroll(Session session) throws JMSException {
183                    MessageConsumer consumer = session.createConsumer(getDestination(session), definition.getMessageSelector(), definition.getNoLocal());
184                    consumer.setMessageListener(this);
185            }
186            
187            private Processor borrowProcessor() throws Exception {
188                    if (definition.getPooled()) {
189                            return (Processor) pool.borrowObject();
190                    }
191                    
192                    return processor; 
193            }
194            
195            private void returnProcessor(Processor processor) throws Exception {
196                    if (definition.getPooled()) {
197                            pool.returnObject(processor);
198                    }
199            }
200    
201            /**
202             * Converts message to object and passes to processor.
203             */
204            public void onMessage(final Message message) {  
205                    long start = System.currentTimeMillis();
206                    try {
207                            final Processor prc = borrowProcessor();
208                            final Destination replyTo = message.getJMSReplyTo();
209                            final int deliveryMode = message.getJMSDeliveryMode();
210                            final int priority = message.getJMSPriority();
211                            
212                            try {
213                                    final Object obj = converter.convert(message, properties);
214                                    
215                                    if (worker==null) {
216                                            try {
217                                                    try {
218                                                            Object ret;
219                                                            long prcStart = System.currentTimeMillis();
220                                                            try {
221                                                                    ret = prc.process(obj);
222                                                            } finally {
223                                                                    long now = System.currentTimeMillis();
224                                                                    addMeasurement("message.process", now - prcStart, now);
225                                                            }
226                                                            if (ret!=null) {
227                                                                    reply(replyTo, deliveryMode, priority, message, ret);
228                                                            }
229                                                    } finally {
230                                                            returnProcessor(prc);
231                                                    }
232                                            } catch (Exception e) {
233                                                    if (prc instanceof ExceptionSink) {
234                                                            ((ExceptionSink) prc).consume(message, e);
235                                                    } else {
236                                                            logger.log(Level.SEVERE, "Processing error, sending error: "+e, e);
237                                                            replyError(replyTo, deliveryMode, priority, message, e);
238                                                    }
239                                            }
240                                    } else {
241                                            Runnable job = new Runnable() {
242                    
243                                                    public void run() {
244                                                            try {
245                                                                    try {
246                                                                            Object ret;
247                                                                            long prcStart = System.currentTimeMillis();
248                                                                            try {
249                                                                                    ret = prc.process(obj);
250                                                                            } finally {
251                                                                                    long now = System.currentTimeMillis();
252                                                                                    addMeasurement("message.process", now - prcStart, now);
253                                                                            }
254                                                                            if (ret!=null) {
255                                                                                    reply(replyTo, deliveryMode, priority, message, ret);
256                                                                            }
257                                                                    } finally {
258                                                                            returnProcessor(prc);
259                                                                    }
260                                                            } catch (Exception e) {
261                                                                    if (prc instanceof ExceptionSink) {
262                                                                            ((ExceptionSink) prc).consume(Listener.this, e);
263                                                                    } else {
264                                                                            logger.log(Level.SEVERE, "Processing error, sending error: "+e, e);
265                                                                            replyError(replyTo, deliveryMode, priority, message, e);
266                                                                    }
267                                                            }
268                                                    }
269                                                    
270                                            };
271                                            
272                                            // Execute in the current thread if worker doesn't accept job.
273                                            if (!worker.post(job)) {
274                                                    job.run();
275                                            }
276                                    }
277                            } catch (Exception e) {
278                                    if (prc instanceof ExceptionSink) {
279                                            ((ExceptionSink) prc).consume(message, e);
280                                    } else {
281                                            logger.log(Level.SEVERE, "Processing error: "+e, e);
282                                            replyError(replyTo, deliveryMode, priority, message, e);
283                                    }
284                            }
285                    } catch (Exception e) {
286                            logger.log(Level.SEVERE, "Processing error: "+e, e);                    
287                    } finally {
288                            long now = System.currentTimeMillis();
289                            addMeasurement("message", now - start, now);
290                    }
291            }
292    
293            private void replyError(final Destination replyTo, final int deliveryMode, final int priority, Message request, Exception e) {
294                    try {
295                            JmsConnection connection = (JmsConnection) getOwner(JmsConnection.class);
296                            Session session = connection.borrowSession();
297                            try {
298                                    Message message = converter.convert(e, session, properties, request);
299                                    sendReply(
300                                                    session, 
301                                                    message, 
302                                                    replyTo, 
303                                                    deliveryMode, 
304                                                    priority);
305                            } finally {
306                                    connection.returnSession(session);
307                            }                                                                       
308                    } catch (Exception ex) {
309                            logger.log(Level.SEVERE, "Could not send error: "+e, e);                                                                        
310                    }
311            }
312            
313            private void reply(final Destination replyTo, final int deliveryMode, final int priority, Message request, Object ret) throws Exception {
314                    JmsConnection connection = (JmsConnection) getOwner(JmsConnection.class);
315                    Session session = connection.borrowSession();
316                    try {
317                            Message message = converter.convert(ret, session, properties, request);
318                            sendReply(
319                                            session, 
320                                            message, 
321                                            replyTo, 
322                                            deliveryMode, 
323                                            priority);
324                    } finally {
325                            connection.returnSession(session);
326                    }                                                                       
327            }
328            
329            private void sendReply(Session session, Message message, Destination replyTo, int deliveryMode, int priority) throws JMSException {
330                    Destination finalDestination = replyTo==null ? getReplyDestination(session) : replyTo;
331                    if (finalDestination != null) {
332                            MessageProducer producer = session.createProducer(finalDestination);
333                            
334                            if (definition.getReplyTimeToLive()>0) {
335                                    producer.setTimeToLive(definition.getReplyTimeToLive());
336                            }
337                            
338                            producer.setDeliveryMode(deliveryMode);         
339                            producer.setPriority(priority);
340                                                            
341                            try {
342                                    producer.send(message);
343                            } finally {
344                                    producer.close();
345                            }
346                    }
347            }
348    
349            /**
350             * Returns processor.
351             */
352            public Object getMaster() {
353                    return processor;
354            }
355    
356    }