001 package biz.hammurapi.jms; 002 003 import javax.jms.Connection; 004 import javax.jms.ConnectionFactory; 005 import javax.jms.Destination; 006 import javax.jms.JMSException; 007 import javax.jms.Message; 008 import javax.jms.MessageConsumer; 009 import javax.jms.MessageProducer; 010 import javax.jms.Session; 011 import javax.jms.TemporaryQueue; 012 import javax.naming.Context; 013 import javax.naming.NamingException; 014 015 import biz.hammurapi.config.ConfigurationException; 016 import biz.hammurapi.config.RuntimeConfigurationException; 017 018 /** 019 * Base class for JMS adapters which operate on two queue pairs. 020 * 021 * @author Pavel Vlasov 022 */ 023 public abstract class Adapter extends MessageProcessor { 024 025 private static final String BACK_END_INITIAL_CONTEXT_PREFIX = "backEndInitialContext/"; 026 027 protected Context backEndInitialContext; 028 029 private String backEndConnectionFactoryName; 030 031 private String backEndDestinationName; 032 033 private String backEndReplyDestinationName; 034 035 private String replyDestinationName; 036 037 /** 038 * Initial context to look-up back-end destinations and connection factory. 039 * If back-end initial context is not set then back-end JMS objects are 040 * looked up in the "front-end" initial context. 041 * 042 * @param initialContext 043 */ 044 public void setBackEndInitialContext(Context initialContext) { 045 this.initialContext = initialContext; 046 } 047 048 /** 049 * Back-end request destination (queue or topic) name. 050 * 051 * @param destinationName 052 */ 053 public void setBackEndDestination(String destinationName) { 054 this.backEndDestinationName = destinationName; 055 } 056 057 /** 058 * Back-end reply destination (queue or topic) name. 059 * 060 * @param destinationName 061 */ 062 public void setBackEndReplyDestination(String destinationName) { 063 this.backEndReplyDestinationName = destinationName; 064 } 065 066 /** 067 * Front-end reply destination (queue or topic) name. 068 * 069 * @param destinationName 070 */ 071 public void setReplyDestination(String destinationName) { 072 this.replyDestinationName = destinationName; 073 } 074 075 /** 076 * Back-end JMS connection name. It it is not set then the "front-end" 077 * connection factory is used. 078 * 079 * @param connectionName 080 */ 081 public void setBackEndConnectionFactory(String connectionFactoryName) { 082 this.backEndConnectionFactoryName = connectionFactoryName; 083 } 084 085 protected Connection backEndConnection; 086 087 private Destination backEndDestination; 088 089 private Destination backEndReplyDestination; 090 091 private Destination replyDestination; 092 093 protected boolean isBackEndTransacted; 094 095 protected int backEndAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; 096 097 protected long replyTimeToLive; 098 099 protected long backEndRequestTimeToLive; 100 101 protected long backEndTimeout; 102 103 /** 104 * Timeout for backend reply. 105 * @param backEndTimeout 106 */ 107 public void setBackEndTimeout(long backEndTimeout) { 108 this.backEndTimeout = backEndTimeout; 109 } 110 111 /** 112 * Time to live for reply. 113 * @param replyTimeToLive 114 */ 115 public void setReplyTimeToLive(long replyTimeToLive) { 116 this.replyTimeToLive = replyTimeToLive; 117 } 118 119 /** 120 * Time to live for back-end request 121 * @param backEndRequestTimeToLive 122 */ 123 public void setBackEndRequestTimeToLive(long backEndRequestTimeToLive) { 124 this.backEndRequestTimeToLive = backEndRequestTimeToLive; 125 } 126 127 /** 128 * Back-end acknoledge mode. Valid values: AUTO (default), CLIENT, DUPS_OK. 129 * 130 * @param acknowledgeModeName 131 */ 132 public void setBackEndAcknowledgeMode(String acknowledgeModeName) { 133 try { 134 this.backEndAcknowledgeMode = Session.class.getField(acknowledgeModeName + "_ACKNOWLEDGE").getInt(null); 135 } catch (Exception e) { 136 throw new IllegalArgumentException("Invalid back-end acknowledge mode '" + acknowledgeModeName + "': " + e); 137 } 138 } 139 140 /** 141 * Explicitly sets back-end connection. This method is useful when several 142 * components share one connection. 143 * 144 * @param connection 145 */ 146 public void setBackEndConnection(Connection connection) { 147 this.backEndConnection = connection; 148 } 149 150 /** 151 * Transactional attribute of back-end JMS Session. 152 * 153 * @param isTransacted 154 */ 155 public void setBackEndTransacted(boolean isTransacted) { 156 this.isBackEndTransacted = isTransacted; 157 } 158 159 private String backEndUser; 160 161 private String backEndPwd; 162 163 protected ConnectionFactory backEndConnectionFactory; 164 165 /** 166 * Back-end JMS Connection user name. Optional. 167 * 168 * @param user 169 */ 170 public void setBackEndUser(String user) { 171 this.backEndUser = user; 172 } 173 174 /** 175 * Back-end JMS Connection password. Optional. 176 * 177 * @param pwd 178 */ 179 public void setPassword(String pwd) { 180 this.backEndPwd = pwd; 181 } 182 183 public void start() throws ConfigurationException { 184 super.start(); 185 try { 186 logger.info(this, "Starting ..."); 187 188 if (backEndConnectionFactoryName == null) { 189 backEndConnectionFactory = connectionFactory; 190 backEndConnection = connection; 191 } else { 192 backEndConnectionFactory = (ConnectionFactory) initialContext.lookup(backEndConnectionFactoryName); 193 if (backEndUser == null) { 194 backEndConnection = backEndConnectionFactory.createConnection(); 195 } else { 196 backEndConnection = backEndConnectionFactory.createConnection(backEndUser, backEndPwd); 197 } 198 backEndConnection.start(); 199 } 200 201 if (backEndInitialContext == null) { 202 backEndInitialContext = initialContext; 203 } 204 205 backEndDestination = (Destination) backEndInitialContext.lookup(backEndDestinationName); 206 backEndReplyDestination = (Destination) backEndInitialContext.lookup(backEndReplyDestinationName); 207 208 if (replyDestinationName!=null) { 209 replyDestination = (Destination) initialContext.lookup(replyDestinationName); 210 } 211 212 logger.info(this, "Started"); 213 } catch (Exception e) { 214 if (e instanceof ConfigurationException) { 215 throw (ConfigurationException) e; 216 } 217 218 throw new ConfigurationException("Could not start message processor: " + e, e); 219 } 220 } 221 222 /** 223 * Processes request message by passing to processRequest() and 224 * processResponse() methods. 225 * 226 * @param request 227 * Request message 228 * @param session 229 * Session if message is processed in the message listener thread 230 * (worker is null or cannot process jobs), null otherwise. 231 */ 232 protected void processMessage(Message request, Session session) { 233 try { 234 Session frontEndSession = session == null ? borrowSession() : session; 235 Destination frontEndReplyDestination = request.getJMSReplyTo() == null ? replyDestination : request.getJMSReplyTo(); 236 Message frontEndReply = null; 237 try { 238 Session backEndSession = connection == backEndConnection ? session : borrowBackEndSession(); 239 try { 240 long start = System.currentTimeMillis(); 241 Message backEndRequest = processRequest(request, frontEndSession, backEndSession); 242 long afterProcessRequest = System.currentTimeMillis(); 243 addMeasurement("process-request", afterProcessRequest-start, afterProcessRequest); 244 if (backEndRequest!=null) { 245 Destination brd = backEndReplyDestination; 246 if (brd==null) { 247 brd = backEndSession.createTemporaryQueue(); 248 } 249 backEndRequest.setJMSReplyTo(brd); 250 MessageProducer backEndProducer = backEndSession.createProducer(backEndDestination); 251 252 String messageSelector; 253 254 try { 255 backEndProducer.send(backEndRequest); 256 messageSelector = messageSelector(backEndRequest); 257 } finally { 258 backEndProducer.close(); 259 } 260 261 Message backEndReply; 262 MessageConsumer consumer; 263 if (brd instanceof TemporaryQueue) { 264 logger.debug(this, "Waiting on "+brd); 265 consumer = session.createConsumer(brd); 266 } else { 267 logger.debug(this, "Waiting for "+messageSelector+" on "+brd); 268 consumer = session.createConsumer(brd, messageSelector); 269 } 270 271 try { 272 backEndReply = backEndTimeout == 0 ? consumer.receive() : consumer.receive(backEndTimeout); 273 } finally { 274 consumer.close(); 275 if (brd instanceof TemporaryQueue) { 276 ((TemporaryQueue) brd).delete(); 277 } 278 } 279 long afterBackEndReply = System.currentTimeMillis(); 280 addMeasurement("back-end", afterBackEndReply-afterProcessRequest, afterBackEndReply); 281 282 if (frontEndReplyDestination==null) { 283 logger.warn(this, "Unable to send reply to front-end - reply destination is null"); 284 } else { 285 frontEndReply = processReply(backEndReply, request, frontEndSession, backEndSession); 286 long afterProcessReply = System.currentTimeMillis(); 287 addMeasurement("process-reply", afterProcessReply-afterBackEndReply, afterProcessReply); 288 } 289 } 290 } catch (Exception e) { 291 frontEndReply = handleException(e, frontEndSession); 292 } finally { 293 if (backEndSession!=frontEndSession) { 294 releaseBackEndSession(backEndSession); 295 } 296 } 297 298 if (frontEndReply!=null) { 299 if (frontEndReply.getJMSCorrelationID()==null) { 300 frontEndReply.setJMSCorrelationID(request.getJMSCorrelationID()==null ? request.getJMSMessageID() : request.getJMSCorrelationID()); 301 } 302 MessageProducer replyProducer = frontEndSession.createProducer(frontEndReplyDestination); 303 try { 304 if (replyTimeToLive!=0) { 305 replyProducer.setTimeToLive(replyTimeToLive); 306 } 307 replyProducer.send(frontEndReply); 308 } finally { 309 replyProducer.close(); 310 } 311 } 312 } finally { 313 if (session == null) { 314 releaseSession(frontEndSession); 315 } 316 } 317 } catch (Exception e) { 318 logger.error(e, e.toString()); 319 } 320 } 321 322 /** 323 * Processes front-end request message and builds back-end request message. 324 * 325 * @param request 326 * Request from front-end 327 * @param frontEndSession 328 * Front-end JMS session 329 * @param backEndSession 330 * Back-end JMS session. Equals to the front-end session if back 331 * end connection is not set and the front-end connection is used 332 * to connect to both front and back end. 333 * @return Message to be sent to the back-end request queue. Null if no message shall be sent (e.g. in the case of exception). 334 */ 335 protected abstract Message processRequest(Message frontEndRequest, Session frontEndSession, Session backEndSession); 336 337 /** 338 * Processes back-end reply and forms reply to front-end. This method is 339 * responsible for setting correlation ID. 340 * 341 * @param backEndReply 342 * Reply message from back-end. Null if request timed out. 343 * @param frontEndRequest 344 * Original request from front end. 345 * @param frontEndSession 346 * Front-end JMS session. 347 * @param backEndSession 348 * Back-end JMS session. Equals to the front-end session if back 349 * end connection is not set and the front-end connection is used 350 * to connect to both front and back end. 351 * @return Message to be sent to the front-end reply queue. Null if no message shall be sent (e.g. in the case of exception). 352 */ 353 protected abstract Message processReply(Message backEndReply, Message frontEndRequest, Session frontEndSession, Session backEndSession); 354 355 /** 356 * Method to report exceptions to fron-end (excluding JMS exceptions in front-end JMS objects). This implementation returns null. 357 * @param exception Exception to report 358 * @param frontEndSession Front-end JMS session 359 * @return Message to send to front-end to reply queue. Null if no message shall be sent. 360 */ 361 protected Message handleException(Exception exception, Session frontEndSession) { 362 return null; 363 } 364 365 /** 366 * Extracts message selector for reply back-end message from the back-end 367 * request message. This implementation returns 368 * <code>"JMSCorrelationID='" +backEndRequest.getJMSMessageID()+"'"</code> 369 * 370 * @param backEndRequest 371 * @return Message selector to retrieve back-end reply. 372 * @throws JMSException 373 */ 374 protected String messageSelector(Message backEndRequest) throws JMSException { 375 return "JMSCorrelationID='" + backEndRequest.getJMSMessageID() + "'"; 376 } 377 378 /** 379 * Stops worker (thread pool), if any, and connection. 380 */ 381 public void stop() throws ConfigurationException { 382 try { 383 logger.info(this, "Stopping ..."); 384 385 if (backEndConnection != null && backEndConnection != connection) { 386 backEndConnection.close(); 387 } 388 389 logger.info(this, "Stopped"); 390 } catch (Exception e) { 391 if (e instanceof ConfigurationException) { 392 throw (ConfigurationException) e; 393 } 394 395 throw new ConfigurationException("Could not stop message processor"); 396 } 397 } 398 399 /** 400 * This implementation simply creates a new session. Subclasses can override 401 * this method to implement session pooling. 402 * 403 * @return 404 * @throws JMSException 405 */ 406 protected Session borrowBackEndSession() throws JMSException { 407 return backEndConnection.createSession(isBackEndTransacted, backEndAcknowledgeMode); 408 } 409 410 /** 411 * This implementation simply closes the session. Subclasses can override 412 * this method to implement session pooling. 413 * 414 * @throws JMSException 415 */ 416 protected void releaseBackEndSession(Session session) throws JMSException { 417 session.close(); 418 } 419 420 /** 421 * Provides access to back-end destination, reply destinations, initial 422 * context, and connection in addition to objects accessible through 423 * superclass getChild() method. Bridges to back-end initial context. For 424 * names in a form backEndInitialContext/<name> the name is looked up 425 * in initial context. 426 */ 427 protected Object getChild(String name) { 428 if ("replyDestination".equals(name)) { 429 return replyDestination; 430 } 431 432 if ("backEndReplyDestination".equals(name)) { 433 return backEndReplyDestination; 434 } 435 436 if ("backEndDestination".equals(name)) { 437 return backEndDestination; 438 } 439 440 if ("backEndInitialContext".equals(name)) { 441 return backEndInitialContext; 442 } 443 444 if (name != null && name.startsWith(BACK_END_INITIAL_CONTEXT_PREFIX)) { 445 String jndiName = name.substring(BACK_END_INITIAL_CONTEXT_PREFIX.length()); 446 try { 447 return backEndInitialContext.lookup(jndiName); 448 } catch (NamingException e) { 449 throw new RuntimeConfigurationException("Lookup in back-end inital context failed for JNDI name '" + jndiName + "':" + e, e); 450 } 451 } 452 453 if ("backEndConnection".equals(name)) { 454 return backEndConnection; 455 } 456 457 if ("backEndConnectionFactory".equals(name)) { 458 return backEndConnectionFactory; 459 } 460 461 return super.getChild(name); 462 } 463 464 }