001    package biz.hammurapi.jms;
002    
003    import javax.jms.Connection;
004    import javax.jms.ConnectionFactory;
005    import javax.jms.Destination;
006    import javax.jms.JMSException;
007    import javax.jms.Message;
008    import javax.jms.MessageConsumer;
009    import javax.jms.MessageProducer;
010    import javax.jms.Session;
011    import javax.jms.TemporaryQueue;
012    import javax.naming.Context;
013    import javax.naming.NamingException;
014    
015    import biz.hammurapi.config.ConfigurationException;
016    import biz.hammurapi.config.RuntimeConfigurationException;
017    
018    /**
019     * Base class for JMS adapters which operate on two queue pairs.
020     * 
021     * @author Pavel Vlasov
022     */
023    public abstract class Adapter extends MessageProcessor {
024    
025            private static final String BACK_END_INITIAL_CONTEXT_PREFIX = "backEndInitialContext/";
026    
027            protected Context backEndInitialContext;
028    
029            private String backEndConnectionFactoryName;
030    
031            private String backEndDestinationName;
032    
033            private String backEndReplyDestinationName;
034    
035            private String replyDestinationName;
036    
037            /**
038             * Initial context to look-up back-end destinations and connection factory.
039             * If back-end initial context is not set then back-end JMS objects are
040             * looked up in the "front-end" initial context.
041             * 
042             * @param initialContext
043             */
044            public void setBackEndInitialContext(Context initialContext) {
045                    this.initialContext = initialContext;
046            }
047    
048            /**
049             * Back-end request destination (queue or topic) name.
050             * 
051             * @param destinationName
052             */
053            public void setBackEndDestination(String destinationName) {
054                    this.backEndDestinationName = destinationName;
055            }
056    
057            /**
058             * Back-end reply destination (queue or topic) name.
059             * 
060             * @param destinationName
061             */
062            public void setBackEndReplyDestination(String destinationName) {
063                    this.backEndReplyDestinationName = destinationName;
064            }
065    
066            /**
067             * Front-end reply destination (queue or topic) name.
068             * 
069             * @param destinationName
070             */
071            public void setReplyDestination(String destinationName) {
072                    this.replyDestinationName = destinationName;
073            }
074    
075            /**
076             * Back-end JMS connection name. It it is not set then the "front-end"
077             * connection factory is used.
078             * 
079             * @param connectionName
080             */
081            public void setBackEndConnectionFactory(String connectionFactoryName) {
082                    this.backEndConnectionFactoryName = connectionFactoryName;
083            }
084    
085            protected Connection backEndConnection;
086    
087            private Destination backEndDestination;
088    
089            private Destination backEndReplyDestination;
090    
091            private Destination replyDestination;
092    
093            protected boolean isBackEndTransacted;
094    
095            protected int backEndAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
096            
097            protected long replyTimeToLive;
098            
099            protected long backEndRequestTimeToLive;
100            
101            protected long backEndTimeout;
102            
103            /**
104             * Timeout for backend reply. 
105             * @param backEndTimeout
106             */
107            public void setBackEndTimeout(long backEndTimeout) {
108                    this.backEndTimeout = backEndTimeout;
109            }
110            
111            /**
112             * Time to live for reply.
113             * @param replyTimeToLive
114             */
115            public void setReplyTimeToLive(long replyTimeToLive) {
116                    this.replyTimeToLive = replyTimeToLive;
117            }
118            
119            /**
120             * Time to live for back-end request
121             * @param backEndRequestTimeToLive
122             */
123            public void setBackEndRequestTimeToLive(long backEndRequestTimeToLive) {
124                    this.backEndRequestTimeToLive = backEndRequestTimeToLive;
125            }
126    
127            /**
128             * Back-end acknoledge mode. Valid values: AUTO (default), CLIENT, DUPS_OK.
129             * 
130             * @param acknowledgeModeName
131             */
132            public void setBackEndAcknowledgeMode(String acknowledgeModeName) {
133                    try {
134                            this.backEndAcknowledgeMode = Session.class.getField(acknowledgeModeName + "_ACKNOWLEDGE").getInt(null);
135                    } catch (Exception e) {
136                            throw new IllegalArgumentException("Invalid back-end acknowledge mode '" + acknowledgeModeName + "': " + e);
137                    }
138            }
139    
140            /**
141             * Explicitly sets back-end connection. This method is useful when several
142             * components share one connection.
143             * 
144             * @param connection
145             */
146            public void setBackEndConnection(Connection connection) {
147                    this.backEndConnection = connection;
148            }
149    
150            /**
151             * Transactional attribute of back-end JMS Session.
152             * 
153             * @param isTransacted
154             */
155            public void setBackEndTransacted(boolean isTransacted) {
156                    this.isBackEndTransacted = isTransacted;
157            }
158    
159            private String backEndUser;
160    
161            private String backEndPwd;
162    
163            protected ConnectionFactory backEndConnectionFactory;
164    
165            /**
166             * Back-end JMS Connection user name. Optional.
167             * 
168             * @param user
169             */
170            public void setBackEndUser(String user) {
171                    this.backEndUser = user;
172            }
173    
174            /**
175             * Back-end JMS Connection password. Optional.
176             * 
177             * @param pwd
178             */
179            public void setPassword(String pwd) {
180                    this.backEndPwd = pwd;
181            }
182    
183            public void start() throws ConfigurationException {
184                    super.start();
185                    try {
186                            logger.info(this, "Starting ...");
187    
188                            if (backEndConnectionFactoryName == null) {
189                                    backEndConnectionFactory = connectionFactory;
190                                    backEndConnection = connection;
191                            } else {
192                                    backEndConnectionFactory = (ConnectionFactory) initialContext.lookup(backEndConnectionFactoryName);
193                                    if (backEndUser == null) {
194                                            backEndConnection = backEndConnectionFactory.createConnection();
195                                    } else {
196                                            backEndConnection = backEndConnectionFactory.createConnection(backEndUser, backEndPwd);
197                                    }
198                                    backEndConnection.start();
199                            }
200    
201                            if (backEndInitialContext == null) {
202                                    backEndInitialContext = initialContext;
203                            }
204    
205                            backEndDestination = (Destination) backEndInitialContext.lookup(backEndDestinationName);
206                            backEndReplyDestination = (Destination) backEndInitialContext.lookup(backEndReplyDestinationName);
207    
208                            if (replyDestinationName!=null) {
209                                    replyDestination = (Destination) initialContext.lookup(replyDestinationName);
210                            }
211    
212                            logger.info(this, "Started");
213                    } catch (Exception e) {
214                            if (e instanceof ConfigurationException) {
215                                    throw (ConfigurationException) e;
216                            }
217    
218                            throw new ConfigurationException("Could not start message processor: " + e, e);
219                    }
220            }
221    
222            /**
223             * Processes request message by passing to processRequest() and
224             * processResponse() methods.
225             * 
226             * @param request
227             *            Request message
228             * @param session
229             *            Session if message is processed in the message listener thread
230             *            (worker is null or cannot process jobs), null otherwise.
231             */
232            protected void processMessage(Message request, Session session) {
233                    try {
234                            Session frontEndSession = session == null ? borrowSession() : session;
235                            Destination frontEndReplyDestination = request.getJMSReplyTo() == null ? replyDestination : request.getJMSReplyTo();
236                            Message frontEndReply = null;
237                            try {
238                                    Session backEndSession = connection == backEndConnection ? session : borrowBackEndSession();
239                                    try {
240                                            long start = System.currentTimeMillis();
241                                            Message backEndRequest = processRequest(request, frontEndSession, backEndSession);
242                                            long afterProcessRequest = System.currentTimeMillis();
243                                            addMeasurement("process-request", afterProcessRequest-start, afterProcessRequest);
244                                            if (backEndRequest!=null) {                                     
245                                                    Destination brd = backEndReplyDestination;
246                                                    if (brd==null) {
247                                                            brd = backEndSession.createTemporaryQueue();
248                                                    }
249                                                    backEndRequest.setJMSReplyTo(brd);
250                                        MessageProducer backEndProducer = backEndSession.createProducer(backEndDestination); 
251                            
252                                        String messageSelector;
253                            
254                                        try {
255                                            backEndProducer.send(backEndRequest); 
256                                            messageSelector = messageSelector(backEndRequest);
257                                        } finally {
258                                            backEndProducer.close();
259                                        }
260                            
261                                        Message backEndReply;
262                                                    MessageConsumer consumer;
263                                                    if (brd instanceof TemporaryQueue) {
264                                                logger.debug(this, "Waiting on "+brd);            
265                                                            consumer = session.createConsumer(brd);
266                                                    } else {
267                                                logger.debug(this, "Waiting for "+messageSelector+" on "+brd);                  
268                                                            consumer = session.createConsumer(brd, messageSelector);
269                                                    }
270                                                            
271                                                    try {
272                                                            backEndReply = backEndTimeout == 0 ? consumer.receive() : consumer.receive(backEndTimeout);
273                                        } finally {
274                                            consumer.close();
275                                                            if (brd instanceof TemporaryQueue) {
276                                                                    ((TemporaryQueue) brd).delete();
277                                                            }
278                                        }
279                                        long afterBackEndReply = System.currentTimeMillis();
280                                        addMeasurement("back-end", afterBackEndReply-afterProcessRequest, afterBackEndReply);
281                            
282                                                    if (frontEndReplyDestination==null) {
283                                                            logger.warn(this, "Unable to send reply to front-end - reply destination is null");
284                                                    } else {
285                                                            frontEndReply = processReply(backEndReply, request, frontEndSession, backEndSession);
286                                                            long afterProcessReply = System.currentTimeMillis();
287                                                            addMeasurement("process-reply", afterProcessReply-afterBackEndReply, afterProcessReply);                                                        
288                                                    }
289                                            }
290                                    } catch (Exception e) {
291                                            frontEndReply = handleException(e, frontEndSession);
292                                    } finally {
293                                            if (backEndSession!=frontEndSession) {
294                                                    releaseBackEndSession(backEndSession);
295                                            }
296                                    }
297                                    
298                                    if (frontEndReply!=null) {
299                                            if (frontEndReply.getJMSCorrelationID()==null) {
300                                                    frontEndReply.setJMSCorrelationID(request.getJMSCorrelationID()==null ? request.getJMSMessageID() : request.getJMSCorrelationID());
301                                            }
302                                            MessageProducer replyProducer = frontEndSession.createProducer(frontEndReplyDestination);
303                                            try {
304                                                    if (replyTimeToLive!=0) {
305                                                            replyProducer.setTimeToLive(replyTimeToLive);   
306                                                    }
307                                                    replyProducer.send(frontEndReply);
308                                            } finally {
309                                                    replyProducer.close();
310                                            }
311                                    }
312                            } finally {
313                                    if (session == null) {
314                                            releaseSession(frontEndSession);
315                                    }
316                            }
317                    } catch (Exception e) {
318                            logger.error(e, e.toString());
319                    }
320            }
321    
322            /**
323             * Processes front-end request message and builds back-end request message.
324             * 
325             * @param request
326             *            Request from front-end
327             * @param frontEndSession
328             *            Front-end JMS session
329             * @param backEndSession
330             *            Back-end JMS session. Equals to the front-end session if back
331             *            end connection is not set and the front-end connection is used
332             *            to connect to both front and back end.
333             * @return Message to be sent to the back-end request queue. Null if no message shall be sent (e.g. in the case of exception).
334             */
335            protected abstract Message processRequest(Message frontEndRequest, Session frontEndSession, Session backEndSession);
336    
337            /**
338             * Processes back-end reply and forms reply to front-end. This method is
339             * responsible for setting correlation ID.
340             * 
341             * @param backEndReply
342             *            Reply message from back-end. Null if request timed out.
343             * @param frontEndRequest
344             *            Original request from front end.
345             * @param frontEndSession
346             *            Front-end JMS session.
347             * @param backEndSession
348             *            Back-end JMS session. Equals to the front-end session if back
349             *            end connection is not set and the front-end connection is used
350             *            to connect to both front and back end.
351             * @return Message to be sent to the front-end reply queue. Null if no message shall be sent (e.g. in the case of exception).
352             */
353            protected abstract Message processReply(Message backEndReply, Message frontEndRequest, Session frontEndSession, Session backEndSession);
354            
355            /**
356             * Method to report exceptions to fron-end (excluding JMS exceptions in front-end JMS objects). This implementation returns null.
357             * @param exception Exception to report
358             * @param frontEndSession Front-end JMS session
359             * @return Message to send to front-end to reply queue. Null if no message shall be sent.
360             */
361            protected Message handleException(Exception exception, Session frontEndSession) {
362                    return null;
363            }
364    
365            /**
366             * Extracts message selector for reply back-end message from the back-end
367             * request message. This implementation returns
368             * <code>"JMSCorrelationID='" +backEndRequest.getJMSMessageID()+"'"</code>
369             * 
370             * @param backEndRequest
371             * @return Message selector to retrieve back-end reply.
372             * @throws JMSException
373             */
374            protected String messageSelector(Message backEndRequest) throws JMSException {
375                    return "JMSCorrelationID='" + backEndRequest.getJMSMessageID() + "'";
376            }
377    
378            /**
379             * Stops worker (thread pool), if any, and connection.
380             */
381            public void stop() throws ConfigurationException {
382                    try {
383                            logger.info(this, "Stopping ...");
384    
385                            if (backEndConnection != null && backEndConnection != connection) {
386                                    backEndConnection.close();
387                            }
388    
389                            logger.info(this, "Stopped");
390                    } catch (Exception e) {
391                            if (e instanceof ConfigurationException) {
392                                    throw (ConfigurationException) e;
393                            }
394    
395                            throw new ConfigurationException("Could not stop message processor");
396                    }
397            }
398    
399            /**
400             * This implementation simply creates a new session. Subclasses can override
401             * this method to implement session pooling.
402             * 
403             * @return
404             * @throws JMSException
405             */
406            protected Session borrowBackEndSession() throws JMSException {
407                    return backEndConnection.createSession(isBackEndTransacted,     backEndAcknowledgeMode);
408            }
409    
410            /**
411             * This implementation simply closes the session. Subclasses can override
412             * this method to implement session pooling.
413             * 
414             * @throws JMSException
415             */
416            protected void releaseBackEndSession(Session session) throws JMSException {
417                    session.close();
418            }
419    
420            /**
421             * Provides access to back-end destination, reply destinations, initial
422             * context, and connection in addition to objects accessible through
423             * superclass getChild() method. Bridges to back-end initial context. For
424             * names in a form backEndInitialContext/&lt;name&gt; the name is looked up
425             * in initial context.
426             */
427            protected Object getChild(String name) {
428                    if ("replyDestination".equals(name)) {
429                            return replyDestination;
430                    }
431    
432                    if ("backEndReplyDestination".equals(name)) {
433                            return backEndReplyDestination;
434                    }
435    
436                    if ("backEndDestination".equals(name)) {
437                            return backEndDestination;
438                    }
439    
440                    if ("backEndInitialContext".equals(name)) {
441                            return backEndInitialContext;
442                    }
443    
444                    if (name != null && name.startsWith(BACK_END_INITIAL_CONTEXT_PREFIX)) {
445                            String jndiName = name.substring(BACK_END_INITIAL_CONTEXT_PREFIX.length());
446                            try {
447                                    return backEndInitialContext.lookup(jndiName);
448                            } catch (NamingException e) {
449                                    throw new RuntimeConfigurationException("Lookup in back-end inital context failed for JNDI name '" + jndiName + "':" + e, e);
450                            }
451                    }
452    
453                    if ("backEndConnection".equals(name)) {
454                            return backEndConnection;
455                    }
456    
457                    if ("backEndConnectionFactory".equals(name)) {
458                            return backEndConnectionFactory;
459                    }
460    
461                    return super.getChild(name);
462            }
463    
464    }