001    package biz.hammurapi.jms.adapter;
002    
003    import java.util.Hashtable;
004    import java.util.logging.Level;
005    import java.util.logging.Logger;
006    
007    import javax.jms.DeliveryMode;
008    import javax.jms.Destination;
009    import javax.jms.JMSException;
010    import javax.jms.Message;
011    import javax.jms.MessageConsumer;
012    import javax.jms.MessageProducer;
013    import javax.jms.Session;
014    import javax.jms.TemporaryQueue;
015    
016    import biz.hammurapi.config.ConfigurationException;
017    import biz.hammurapi.config.ServiceBase;
018    import biz.hammurapi.jms.JMSExceptionEx;
019    import biz.hammurapi.util.ExceptionSink;
020    import biz.hammurapi.util.Worker;
021    
022    public class JmsService extends ServiceBase {
023            
024            private static final Logger logger = Logger.getLogger(JmsService.class.getName());
025    
026            private biz.hammurapi.jms.adapter.definition.JmsService definition;
027    
028            private Hashtable properties;
029            
030            public JmsService(biz.hammurapi.jms.adapter.definition.JmsService definition) throws ConfigurationException {
031                    this.definition = definition;
032                    this.properties = JmsAdapter.instantiate(definition.getPropertyArray());
033            }
034            
035            private Destination requestDestination;
036            
037            private Destination replyDestination;
038            
039            private Converter converter;
040            
041            private Worker worker;
042            
043            public Worker getWorker() {
044                    return worker;
045            }
046    
047            protected void startInternal() throws ConfigurationException {
048                    if (!definition.getQueueFromSession() && !definition.getTopicFromSession()) {           
049                            requestDestination = ((DestinationResolver) getOwner(DestinationResolver.class)).lookupDestination(definition.getRequestDestination());
050                            
051                            if (!JmsAdapter.isBlank(definition.getReplyDestination())) {
052                                    replyDestination = ((DestinationResolver) getOwner(DestinationResolver.class)).lookupDestination(definition.getReplyDestination());
053                            }
054                    }
055                    
056                    converter = (Converter) get("/bind-types/"+definition.getBindType());
057                    
058                    if (definition.getWorker()==null) {
059                            worker = (Worker) getOwner(Worker.class);
060                    } else {
061                            worker = (Worker) get("/workers/"+definition.getWorker());
062                    }               
063                    
064            }
065    
066            protected void stopInternal() throws ConfigurationException {
067                    // Nothing to do
068            }
069            
070            private Destination getRequestDestination(Session session) throws JMSException {
071                    if (definition.getQueueFromSession()) {
072                            return session.createQueue(definition.getRequestDestination());
073                    }
074                    
075                    if (definition.getTopicFromSession()) {
076                            return session.createTopic(definition.getRequestDestination());                                         
077                    }
078                    
079                    return requestDestination;
080            }
081            
082            private Destination getReplyDestination(Session session) throws JMSException {
083                    if (JmsAdapter.isBlank(definition.getReplyDestination())) {
084                            return session.createTemporaryQueue();
085                    }
086                    
087                    if (definition.getQueueFromSession()) {
088                            return session.createQueue(definition.getReplyDestination());
089                    }
090                    
091                    if (definition.getTopicFromSession()) {
092                            return session.createTopic(definition.getReplyDestination());                                           
093                    }
094                    
095                    return replyDestination;
096            }
097            
098            /**
099             * Sends object to JMS destination.
100             * @param obj Object to send
101             * @param async If true all operations are performed in a worker thread.
102             * @throws JMSException
103             */
104            public void send(final Object obj, boolean async) throws JMSException {
105                    if (async) {
106                            worker.post(new Runnable() {
107    
108                                    public void run() {
109                                            try {
110                                                    send(obj, false);
111                                            } catch (JMSException e) {
112                                                    logger.log(Level.SEVERE, "send() failed: "+e, e);
113                                            }                                       
114                                    }
115                                    
116                            });
117                    } else {
118                            long start = System.currentTimeMillis();
119                            JmsConnection connection = (JmsConnection) getOwner(JmsConnection.class);
120                            Session session = connection.borrowSession();
121                            try {
122                                    try {
123                                            Message message = converter.convert(obj, session, properties, null);
124                                            MessageProducer producer = session.createProducer(getRequestDestination(session));
125                                            
126                                            if (definition.getTimeToLive()>0) {
127                                                    producer.setTimeToLive(definition.getTimeToLive());
128                                            }
129                                            
130                                            producer.setDeliveryMode(definition.getPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
131                                            
132                                            if (definition.getPriority()!=null) {
133                                                    producer.setPriority(definition.getPriority().intValue());
134                                            }
135                                            
136                                            try {
137                                                    producer.send(message);
138                                            } finally {
139                                                    producer.close();
140                                            }
141                                    } finally {
142                                            connection.returnSession(session);
143                                    }
144                            } catch (Exception e) {
145                                    addMeasurement("error", 1, System.currentTimeMillis());
146                                    connection.invalidateSession(session);
147                                    if (e instanceof RuntimeException) {
148                                            throw (RuntimeException) e;
149                                    }
150                                    throw e instanceof JMSException ? (JMSException) e : new JMSExceptionEx(e);
151                            } finally {
152                                    long now = System.currentTimeMillis();
153                                    addMeasurement("send", now - start, now);
154                            }
155                    }
156            }
157            
158            /**
159             * Sends object JMS destination, waits for reply, receives reply converts to Object
160             * and returns. Reply message is correlated on send message ID if reply destination is set.
161             * If reply destination is not specified in configuration then temporary reply destination is 
162             * used. If wait expires this method returns null.
163             * @param obj Object to send.
164             * @return Reply object.
165             * @throws JMSException
166             */
167            public Object request(Object obj) throws JMSException {
168                    long start = System.currentTimeMillis();
169                    JmsConnection connection = (JmsConnection) getOwner(JmsConnection.class);
170                    Session session = connection.borrowSession();
171                    try {
172                            try {
173                                    Message message = converter.convert(obj, session, properties, null);
174                                    MessageProducer producer = session.createProducer(getRequestDestination(session));
175                                    
176                                    if (definition.getTimeToLive()>0) {
177                                            producer.setTimeToLive(definition.getTimeToLive());
178                                    }
179                                    
180                                    message.setJMSReplyTo(getReplyDestination(session));
181                                                            
182                                    producer.setDeliveryMode(definition.getPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
183                                    
184                                    if (definition.getPriority()!=null) {
185                                            producer.setPriority(definition.getPriority().intValue());
186                                    }
187                                                                    
188                                    try {
189                                            producer.send(message);
190                                    } finally {
191                                            producer.close();
192                                    }
193                                    
194                                    long waitStart = System.currentTimeMillis();
195                                    
196                                    MessageConsumer consumer;
197            
198                                    if (message.getJMSReplyTo() instanceof TemporaryQueue) {
199                                            consumer = session.createConsumer(message.getJMSReplyTo());
200                                    } else {
201                                            String selector = "JMSCorrelationID='"+message.getJMSMessageID()+"'";
202                                            consumer = session.createConsumer(message.getJMSReplyTo(), selector);
203                                    }
204                                    
205                                    try {
206                                            Message reply = consumer.receive(definition.getTimeout());
207                                            if (reply == null) {
208                                                    addMeasurement("timeout", 1, System.currentTimeMillis());
209                                                    throw new JMSException("Waiting for reply timed out");
210                                            }
211                                            
212                                            long waitEnd = System.currentTimeMillis();
213                                            addMeasurement("wait", waitEnd - waitStart, waitEnd);
214                                            
215                                            return converter.convert(reply, properties);
216                                    } finally {
217                                            consumer.close();
218                                            if (message.getJMSReplyTo() instanceof TemporaryQueue) {
219                                                    ((TemporaryQueue) message.getJMSReplyTo()).delete();
220                                            }
221                                    }               
222                            } catch (Exception e) {
223                                    addMeasurement("error", 1, System.currentTimeMillis());
224                                    connection.invalidateSession(session);
225                                    if (e instanceof RuntimeException) {
226                                            throw (RuntimeException) e;
227                                    }
228                                    throw e instanceof JMSException ? (JMSException) e : new JMSExceptionEx(e);
229                            } finally {
230                                    connection.returnSession(session);
231                            }
232                    } finally {
233                            long now = System.currentTimeMillis();
234                            addMeasurement("request", now - start, now);
235                    }
236            }
237            
238            /**
239             * Asynchronously performs request/reply operation in worker thread.
240             * @param obj Object to send
241             * @param receptor Processor for reply. In the case of failure, exception is passed to the processor.
242             */
243            public void request(final Object obj, final Processor receptor) {
244                    worker.post(new Runnable() {
245    
246                            public void run() {
247                                    try {
248                                            receptor.process(request(obj));
249                                    } catch (Exception e) {
250                                            if (receptor instanceof ExceptionSink) {
251                                                    ((ExceptionSink) receptor).consume(obj, e);
252                                            } else {
253                                                    logger.log(Level.SEVERE, "Request processing failed: "+e, e);
254                                            }
255                                    }                               
256                            }
257                            
258                    });
259            }
260    
261    }