001 package biz.hammurapi.jms.adapter; 002 003 import java.util.Hashtable; 004 import java.util.logging.Level; 005 import java.util.logging.Logger; 006 007 import javax.jms.Destination; 008 import javax.jms.JMSException; 009 import javax.jms.Message; 010 import javax.jms.MessageConsumer; 011 import javax.jms.MessageListener; 012 import javax.jms.MessageProducer; 013 import javax.jms.Session; 014 015 import org.apache.commons.pool.ObjectPool; 016 import org.apache.commons.pool.PoolableObjectFactory; 017 import org.apache.commons.pool.impl.SoftReferenceObjectPool; 018 019 import biz.hammurapi.config.Component; 020 import biz.hammurapi.config.ConfigurationException; 021 import biz.hammurapi.config.PoolableComponent; 022 import biz.hammurapi.config.ServiceBase; 023 import biz.hammurapi.config.Wrapper; 024 import biz.hammurapi.metrics.MeasurementCollector; 025 import biz.hammurapi.metrics.MeasurementConsumer; 026 import biz.hammurapi.util.ExceptionSink; 027 import biz.hammurapi.util.Worker; 028 029 /** 030 * Listens for JMS messages, converts them to objects using Converter and processes 031 * them using processor. Processors can implement Observer/Observable pattern and pass received 032 * objects to registered observers/listeners. 033 * @author Pavel 034 */ 035 public class Listener extends ServiceBase implements MessageListener, Wrapper { 036 037 private Logger logger = Logger.getLogger(Listener.class.getName()); 038 039 private biz.hammurapi.jms.adapter.definition.Listener definition; 040 private Hashtable properties; 041 private Processor processor; 042 private ObjectPool pool; 043 044 public Listener(final biz.hammurapi.jms.adapter.definition.Listener definition) throws ConfigurationException { 045 this.definition = definition; 046 properties = JmsAdapter.instantiate(definition.getPropertyArray()); 047 if (definition.getPooled()) { 048 PoolableObjectFactory objectFactory = new PoolableObjectFactory() { 049 050 public void activateObject(Object obj) throws Exception { 051 if (obj instanceof PoolableComponent) { 052 ((PoolableComponent) obj).activate(); 053 } 054 } 055 056 public void destroyObject(Object obj) throws Exception { 057 if (obj instanceof Component) { 058 ((Component) obj).stop(); 059 } 060 } 061 062 public Object makeObject() throws Exception { 063 Object ret = (Processor) JmsAdapter.instantiate(definition.getProcessor()); 064 if (ret instanceof Component) { 065 ((Component) ret).setOwner(this); 066 ((Component) ret).start(); 067 } 068 if (ret instanceof MeasurementCollector) { 069 MeasurementConsumer cmc = new MeasurementConsumer() { 070 public void addMeasurement(String mName, double value, long time) { 071 MeasurementConsumer measurementConsumer = getMeasurementConsumer(); 072 if (measurementConsumer != null) { 073 measurementConsumer.addMeasurement("processor."+mName, value, time==0 ? System.currentTimeMillis() : time); 074 } 075 } 076 }; 077 ((MeasurementCollector) ret).setMeasurementConsumer(cmc); 078 } 079 return ret; 080 } 081 082 public void passivateObject(Object obj) throws Exception { 083 if (obj instanceof PoolableComponent) { 084 ((PoolableComponent) obj).passivate(); 085 } 086 } 087 088 public boolean validateObject(Object obj) { 089 if (obj instanceof PoolableComponent) { 090 return ((PoolableComponent) obj).validate(); 091 } 092 093 return true; 094 } 095 096 }; 097 pool = new SoftReferenceObjectPool(objectFactory); 098 } else { 099 processor = (Processor) JmsAdapter.instantiate(definition.getProcessor()); 100 if (processor instanceof Component) { 101 ((Component) processor).setOwner(this); 102 } 103 if (processor instanceof MeasurementCollector) { 104 MeasurementConsumer cmc = new MeasurementConsumer() { 105 public void addMeasurement(String mName, double value, long time) { 106 MeasurementConsumer measurementConsumer = getMeasurementConsumer(); 107 if (measurementConsumer != null) { 108 measurementConsumer.addMeasurement("processor."+mName, value, time==0 ? System.currentTimeMillis() : time); 109 } 110 } 111 }; 112 ((MeasurementCollector) processor).setMeasurementConsumer(cmc); 113 } 114 } 115 } 116 117 private Destination getDestination(Session session) throws JMSException { 118 if (definition.getQueueFromSession()) { 119 return session.createQueue(definition.getDestination()); 120 } 121 122 if (definition.getTopicFromSession()) { 123 return session.createTopic(definition.getDestination()); 124 } 125 126 return destination; 127 } 128 129 private Destination getReplyDestination(Session session) throws JMSException { 130 if (definition.getQueueFromSession()) { 131 return session.createQueue(definition.getReplyDestination()); 132 } 133 134 if (definition.getTopicFromSession()) { 135 return session.createTopic(definition.getReplyDestination()); 136 } 137 138 return replyDestination; 139 } 140 141 private Destination destination; 142 private Destination replyDestination; 143 private Converter converter; 144 private Worker worker; 145 146 protected void stopInternal() throws ConfigurationException { 147 if (processor instanceof Component) { 148 ((Component) processor).stop(); 149 } 150 if (pool!=null) { 151 try { 152 pool.close(); 153 } catch (Exception e) { 154 throw new ConfigurationException("Could not close pool: "+e, e); 155 } 156 } 157 } 158 159 public void startInternal() throws ConfigurationException { 160 if (!definition.getQueueFromSession() && !definition.getTopicFromSession()) { 161 destination = ((DestinationResolver) getOwner(DestinationResolver.class)).lookupDestination(definition.getDestination()); 162 if (!JmsAdapter.isBlank(definition.getReplyDestination())) { 163 replyDestination = ((DestinationResolver) getOwner(DestinationResolver.class)).lookupDestination(definition.getReplyDestination()); 164 } 165 } 166 167 converter = (Converter) get("/bind-types/"+definition.getBindType()); 168 if (processor instanceof Component) { 169 ((Component) processor).start(); 170 } 171 172 if (definition.getWorker()!=null) { 173 worker = (Worker) get("/workers/"+definition.getWorker()); 174 } 175 } 176 177 /** 178 * Adds listener to JMS session 179 * @param session 180 * @throws JMSException 181 */ 182 public void enroll(Session session) throws JMSException { 183 MessageConsumer consumer = session.createConsumer(getDestination(session), definition.getMessageSelector(), definition.getNoLocal()); 184 consumer.setMessageListener(this); 185 } 186 187 private Processor borrowProcessor() throws Exception { 188 if (definition.getPooled()) { 189 return (Processor) pool.borrowObject(); 190 } 191 192 return processor; 193 } 194 195 private void returnProcessor(Processor processor) throws Exception { 196 if (definition.getPooled()) { 197 pool.returnObject(processor); 198 } 199 } 200 201 /** 202 * Converts message to object and passes to processor. 203 */ 204 public void onMessage(final Message message) { 205 long start = System.currentTimeMillis(); 206 try { 207 final Processor prc = borrowProcessor(); 208 final Destination replyTo = message.getJMSReplyTo(); 209 final int deliveryMode = message.getJMSDeliveryMode(); 210 final int priority = message.getJMSPriority(); 211 212 try { 213 final Object obj = converter.convert(message, properties); 214 215 if (worker==null) { 216 try { 217 try { 218 Object ret; 219 long prcStart = System.currentTimeMillis(); 220 try { 221 ret = prc.process(obj); 222 } finally { 223 long now = System.currentTimeMillis(); 224 addMeasurement("message.process", now - prcStart, now); 225 } 226 if (ret!=null) { 227 reply(replyTo, deliveryMode, priority, message, ret); 228 } 229 } finally { 230 returnProcessor(prc); 231 } 232 } catch (Exception e) { 233 if (prc instanceof ExceptionSink) { 234 ((ExceptionSink) prc).consume(message, e); 235 } else { 236 logger.log(Level.SEVERE, "Processing error, sending error: "+e, e); 237 replyError(replyTo, deliveryMode, priority, message, e); 238 } 239 } 240 } else { 241 Runnable job = new Runnable() { 242 243 public void run() { 244 try { 245 try { 246 Object ret; 247 long prcStart = System.currentTimeMillis(); 248 try { 249 ret = prc.process(obj); 250 } finally { 251 long now = System.currentTimeMillis(); 252 addMeasurement("message.process", now - prcStart, now); 253 } 254 if (ret!=null) { 255 reply(replyTo, deliveryMode, priority, message, ret); 256 } 257 } finally { 258 returnProcessor(prc); 259 } 260 } catch (Exception e) { 261 if (prc instanceof ExceptionSink) { 262 ((ExceptionSink) prc).consume(Listener.this, e); 263 } else { 264 logger.log(Level.SEVERE, "Processing error, sending error: "+e, e); 265 replyError(replyTo, deliveryMode, priority, message, e); 266 } 267 } 268 } 269 270 }; 271 272 // Execute in the current thread if worker doesn't accept job. 273 if (!worker.post(job)) { 274 job.run(); 275 } 276 } 277 } catch (Exception e) { 278 if (prc instanceof ExceptionSink) { 279 ((ExceptionSink) prc).consume(message, e); 280 } else { 281 logger.log(Level.SEVERE, "Processing error: "+e, e); 282 replyError(replyTo, deliveryMode, priority, message, e); 283 } 284 } 285 } catch (Exception e) { 286 logger.log(Level.SEVERE, "Processing error: "+e, e); 287 } finally { 288 long now = System.currentTimeMillis(); 289 addMeasurement("message", now - start, now); 290 } 291 } 292 293 private void replyError(final Destination replyTo, final int deliveryMode, final int priority, Message request, Exception e) { 294 try { 295 JmsConnection connection = (JmsConnection) getOwner(JmsConnection.class); 296 Session session = connection.borrowSession(); 297 try { 298 Message message = converter.convert(e, session, properties, request); 299 sendReply( 300 session, 301 message, 302 replyTo, 303 deliveryMode, 304 priority); 305 } finally { 306 connection.returnSession(session); 307 } 308 } catch (Exception ex) { 309 logger.log(Level.SEVERE, "Could not send error: "+e, e); 310 } 311 } 312 313 private void reply(final Destination replyTo, final int deliveryMode, final int priority, Message request, Object ret) throws Exception { 314 JmsConnection connection = (JmsConnection) getOwner(JmsConnection.class); 315 Session session = connection.borrowSession(); 316 try { 317 Message message = converter.convert(ret, session, properties, request); 318 sendReply( 319 session, 320 message, 321 replyTo, 322 deliveryMode, 323 priority); 324 } finally { 325 connection.returnSession(session); 326 } 327 } 328 329 private void sendReply(Session session, Message message, Destination replyTo, int deliveryMode, int priority) throws JMSException { 330 Destination finalDestination = replyTo==null ? getReplyDestination(session) : replyTo; 331 if (finalDestination != null) { 332 MessageProducer producer = session.createProducer(finalDestination); 333 334 if (definition.getReplyTimeToLive()>0) { 335 producer.setTimeToLive(definition.getReplyTimeToLive()); 336 } 337 338 producer.setDeliveryMode(deliveryMode); 339 producer.setPriority(priority); 340 341 try { 342 producer.send(message); 343 } finally { 344 producer.close(); 345 } 346 } 347 } 348 349 /** 350 * Returns processor. 351 */ 352 public Object getMaster() { 353 return processor; 354 } 355 356 }