001 package biz.hammurapi.jms.adapter; 002 003 import java.util.Hashtable; 004 import java.util.logging.Level; 005 import java.util.logging.Logger; 006 007 import javax.jms.DeliveryMode; 008 import javax.jms.Destination; 009 import javax.jms.JMSException; 010 import javax.jms.Message; 011 import javax.jms.MessageConsumer; 012 import javax.jms.MessageProducer; 013 import javax.jms.Session; 014 import javax.jms.TemporaryQueue; 015 016 import biz.hammurapi.config.ConfigurationException; 017 import biz.hammurapi.config.ServiceBase; 018 import biz.hammurapi.jms.JMSExceptionEx; 019 import biz.hammurapi.util.ExceptionSink; 020 import biz.hammurapi.util.Worker; 021 022 public class JmsService extends ServiceBase { 023 024 private static final Logger logger = Logger.getLogger(JmsService.class.getName()); 025 026 private biz.hammurapi.jms.adapter.definition.JmsService definition; 027 028 private Hashtable properties; 029 030 public JmsService(biz.hammurapi.jms.adapter.definition.JmsService definition) throws ConfigurationException { 031 this.definition = definition; 032 this.properties = JmsAdapter.instantiate(definition.getPropertyArray()); 033 } 034 035 private Destination requestDestination; 036 037 private Destination replyDestination; 038 039 private Converter converter; 040 041 private Worker worker; 042 043 public Worker getWorker() { 044 return worker; 045 } 046 047 protected void startInternal() throws ConfigurationException { 048 if (!definition.getQueueFromSession() && !definition.getTopicFromSession()) { 049 requestDestination = ((DestinationResolver) getOwner(DestinationResolver.class)).lookupDestination(definition.getRequestDestination()); 050 051 if (!JmsAdapter.isBlank(definition.getReplyDestination())) { 052 replyDestination = ((DestinationResolver) getOwner(DestinationResolver.class)).lookupDestination(definition.getReplyDestination()); 053 } 054 } 055 056 converter = (Converter) get("/bind-types/"+definition.getBindType()); 057 058 if (definition.getWorker()==null) { 059 worker = (Worker) getOwner(Worker.class); 060 } else { 061 worker = (Worker) get("/workers/"+definition.getWorker()); 062 } 063 064 } 065 066 protected void stopInternal() throws ConfigurationException { 067 // Nothing to do 068 } 069 070 private Destination getRequestDestination(Session session) throws JMSException { 071 if (definition.getQueueFromSession()) { 072 return session.createQueue(definition.getRequestDestination()); 073 } 074 075 if (definition.getTopicFromSession()) { 076 return session.createTopic(definition.getRequestDestination()); 077 } 078 079 return requestDestination; 080 } 081 082 private Destination getReplyDestination(Session session) throws JMSException { 083 if (JmsAdapter.isBlank(definition.getReplyDestination())) { 084 return session.createTemporaryQueue(); 085 } 086 087 if (definition.getQueueFromSession()) { 088 return session.createQueue(definition.getReplyDestination()); 089 } 090 091 if (definition.getTopicFromSession()) { 092 return session.createTopic(definition.getReplyDestination()); 093 } 094 095 return replyDestination; 096 } 097 098 /** 099 * Sends object to JMS destination. 100 * @param obj Object to send 101 * @param async If true all operations are performed in a worker thread. 102 * @throws JMSException 103 */ 104 public void send(final Object obj, boolean async) throws JMSException { 105 if (async) { 106 worker.post(new Runnable() { 107 108 public void run() { 109 try { 110 send(obj, false); 111 } catch (JMSException e) { 112 logger.log(Level.SEVERE, "send() failed: "+e, e); 113 } 114 } 115 116 }); 117 } else { 118 long start = System.currentTimeMillis(); 119 JmsConnection connection = (JmsConnection) getOwner(JmsConnection.class); 120 Session session = connection.borrowSession(); 121 try { 122 try { 123 Message message = converter.convert(obj, session, properties, null); 124 MessageProducer producer = session.createProducer(getRequestDestination(session)); 125 126 if (definition.getTimeToLive()>0) { 127 producer.setTimeToLive(definition.getTimeToLive()); 128 } 129 130 producer.setDeliveryMode(definition.getPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); 131 132 if (definition.getPriority()!=null) { 133 producer.setPriority(definition.getPriority().intValue()); 134 } 135 136 try { 137 producer.send(message); 138 } finally { 139 producer.close(); 140 } 141 } finally { 142 connection.returnSession(session); 143 } 144 } catch (Exception e) { 145 addMeasurement("error", 1, System.currentTimeMillis()); 146 connection.invalidateSession(session); 147 if (e instanceof RuntimeException) { 148 throw (RuntimeException) e; 149 } 150 throw e instanceof JMSException ? (JMSException) e : new JMSExceptionEx(e); 151 } finally { 152 long now = System.currentTimeMillis(); 153 addMeasurement("send", now - start, now); 154 } 155 } 156 } 157 158 /** 159 * Sends object JMS destination, waits for reply, receives reply converts to Object 160 * and returns. Reply message is correlated on send message ID if reply destination is set. 161 * If reply destination is not specified in configuration then temporary reply destination is 162 * used. If wait expires this method returns null. 163 * @param obj Object to send. 164 * @return Reply object. 165 * @throws JMSException 166 */ 167 public Object request(Object obj) throws JMSException { 168 long start = System.currentTimeMillis(); 169 JmsConnection connection = (JmsConnection) getOwner(JmsConnection.class); 170 Session session = connection.borrowSession(); 171 try { 172 try { 173 Message message = converter.convert(obj, session, properties, null); 174 MessageProducer producer = session.createProducer(getRequestDestination(session)); 175 176 if (definition.getTimeToLive()>0) { 177 producer.setTimeToLive(definition.getTimeToLive()); 178 } 179 180 message.setJMSReplyTo(getReplyDestination(session)); 181 182 producer.setDeliveryMode(definition.getPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); 183 184 if (definition.getPriority()!=null) { 185 producer.setPriority(definition.getPriority().intValue()); 186 } 187 188 try { 189 producer.send(message); 190 } finally { 191 producer.close(); 192 } 193 194 long waitStart = System.currentTimeMillis(); 195 196 MessageConsumer consumer; 197 198 if (message.getJMSReplyTo() instanceof TemporaryQueue) { 199 consumer = session.createConsumer(message.getJMSReplyTo()); 200 } else { 201 String selector = "JMSCorrelationID='"+message.getJMSMessageID()+"'"; 202 consumer = session.createConsumer(message.getJMSReplyTo(), selector); 203 } 204 205 try { 206 Message reply = consumer.receive(definition.getTimeout()); 207 if (reply == null) { 208 addMeasurement("timeout", 1, System.currentTimeMillis()); 209 throw new JMSException("Waiting for reply timed out"); 210 } 211 212 long waitEnd = System.currentTimeMillis(); 213 addMeasurement("wait", waitEnd - waitStart, waitEnd); 214 215 return converter.convert(reply, properties); 216 } finally { 217 consumer.close(); 218 if (message.getJMSReplyTo() instanceof TemporaryQueue) { 219 ((TemporaryQueue) message.getJMSReplyTo()).delete(); 220 } 221 } 222 } catch (Exception e) { 223 addMeasurement("error", 1, System.currentTimeMillis()); 224 connection.invalidateSession(session); 225 if (e instanceof RuntimeException) { 226 throw (RuntimeException) e; 227 } 228 throw e instanceof JMSException ? (JMSException) e : new JMSExceptionEx(e); 229 } finally { 230 connection.returnSession(session); 231 } 232 } finally { 233 long now = System.currentTimeMillis(); 234 addMeasurement("request", now - start, now); 235 } 236 } 237 238 /** 239 * Asynchronously performs request/reply operation in worker thread. 240 * @param obj Object to send 241 * @param receptor Processor for reply. In the case of failure, exception is passed to the processor. 242 */ 243 public void request(final Object obj, final Processor receptor) { 244 worker.post(new Runnable() { 245 246 public void run() { 247 try { 248 receptor.process(request(obj)); 249 } catch (Exception e) { 250 if (receptor instanceof ExceptionSink) { 251 ((ExceptionSink) receptor).consume(obj, e); 252 } else { 253 logger.log(Level.SEVERE, "Request processing failed: "+e, e); 254 } 255 } 256 } 257 258 }); 259 } 260 261 }