001 package biz.hammurapi.jms; 002 003 import javax.jms.Connection; 004 import javax.jms.ConnectionFactory; 005 import javax.jms.Destination; 006 import javax.jms.ExceptionListener; 007 import javax.jms.JMSException; 008 import javax.jms.Message; 009 import javax.jms.MessageListener; 010 import javax.jms.Session; 011 import javax.naming.Context; 012 import javax.naming.NamingException; 013 014 import biz.hammurapi.config.Component; 015 import biz.hammurapi.config.ComponentBase; 016 import biz.hammurapi.config.ConfigurationException; 017 import biz.hammurapi.config.RestartCommand; 018 import biz.hammurapi.config.Restartable; 019 import biz.hammurapi.config.RuntimeConfigurationException; 020 import biz.hammurapi.logging.ConsoleLogger; 021 import biz.hammurapi.logging.Logger; 022 import biz.hammurapi.metrics.MeasurementCollector; 023 import biz.hammurapi.metrics.MeasurementConsumer; 024 import biz.hammurapi.util.Worker; 025 026 /** 027 * Base class for JMS message processors. 028 * @author Pavel Vlasov 029 */ 030 public abstract class MessageProcessor extends ComponentBase implements Restartable { 031 032 private static final String INITIAL_CONTEXT_PREFIX = "initialContext/"; 033 protected Logger logger = new ConsoleLogger(ConsoleLogger.INFO); 034 035 /** 036 * Sets logger. 037 * @param logger 038 */ 039 public void setLogger(Logger logger) { 040 this.logger = logger; 041 } 042 043 protected Context initialContext; 044 private String connectionFactoryName; 045 private String destinationName; 046 private int listeners = Runtime.getRuntime().availableProcessors(); 047 048 /** 049 * Number of message listening threads. Defaults to number of processes available to JVM. 050 * @param listeners 051 */ 052 public void setListeners(int listeners) { 053 this.listeners = listeners; 054 } 055 056 public void setInitialContext(Context initialContext) { 057 this.initialContext = initialContext; 058 } 059 060 /** 061 * Request destination (queue or topic) name 062 * @param destinationName 063 */ 064 public void setDestination(String destinationName) { 065 this.destinationName = destinationName; 066 } 067 068 /** 069 * JMS connection name 070 * @param connectionName 071 */ 072 public void setConnectionFactory(String connectionFactoryName) { 073 this.connectionFactoryName = connectionFactoryName; 074 } 075 076 /** 077 * Worker to process requests. It is optional. If worker is not set or cannot process requests, 078 * requests are processed in the message listener thread. 079 * @param worker 080 */ 081 public void setWorker(Worker worker) { 082 this.worker = worker; 083 } 084 085 protected Connection connection; 086 private Destination destination; 087 protected boolean isTransacted; 088 protected int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; 089 090 /** 091 * Acknoledge mode. Valid values: AUTO (default), CLIENT, DUPS_OK. 092 * @param acknowledgeModeName 093 */ 094 public void setAcknowledgeMode(String acknowledgeModeName) { 095 try { 096 this.acknowledgeMode = Session.class.getField(acknowledgeModeName+"_ACKNOWLEDGE").getInt(null); 097 } catch (Exception e) { 098 throw new IllegalArgumentException("Invalid acknowledge mode '"+acknowledgeModeName+"': "+e); 099 } 100 } 101 102 /** 103 * Explicitly sets connection. This method is useful when several components share one connection. 104 * @param connection 105 */ 106 public void setConnection(Connection connection) { 107 this.connection = connection; 108 } 109 110 /** 111 * Transactional attribute of JMS Session. 112 * @param isTransacted 113 */ 114 public void setTransacted(boolean isTransacted) { 115 this.isTransacted = isTransacted; 116 } 117 118 protected Worker worker; 119 120 private String user; 121 private String pwd; 122 protected ConnectionFactory connectionFactory; 123 124 /** 125 * JMS Connection user name. Optional. 126 * @param user 127 */ 128 public void setUser(String user) { 129 this.user = user; 130 } 131 132 /** 133 * JMS Connection password. Optional. 134 * @param pwd 135 */ 136 public void setPassword(String pwd) { 137 this.pwd = pwd; 138 } 139 140 /** 141 * Message listener class. 142 * @author Pavel Vlasov 143 */ 144 private class ProcessingMessageListener implements MessageListener { 145 146 private Session session; 147 148 public ProcessingMessageListener(Session session) { 149 this.session = session; 150 } 151 152 public synchronized void onMessage(final Message request) { 153 logger.debug(this, "Message received"); 154 addMeasurement("receive", 1, System.currentTimeMillis()); 155 156 if (worker==null) { 157 _processMessage(request, session); 158 } else { 159 Runnable job = new Runnable() { 160 161 public void run() { 162 _processMessage(request, null); 163 } 164 165 }; 166 167 // Try to post job to worker. process in the current thread if worker doesn't accept it. 168 if (!worker.post(job)) { 169 _processMessage(request, session); 170 } 171 } 172 } 173 174 }; 175 176 public void start() throws ConfigurationException { 177 try { 178 logger.info(this, "Starting ..."); 179 180 connectionFactory = (ConnectionFactory) initialContext.lookup(connectionFactoryName); 181 if (user==null) { 182 connection = connectionFactory.createConnection(); 183 } else { 184 connection = connectionFactory.createConnection(user, pwd); 185 } 186 connection.start(); 187 188 final ExceptionListener pel = connection.getExceptionListener(); 189 connection.setExceptionListener(new ExceptionListener() { 190 191 public void onException(JMSException e) { 192 if (pel!=null) { 193 pel.onException(e); 194 } 195 196 logger.error(MessageProcessor.this, "JMS connection exception: " + e.toString()); 197 198 if (restartCommand!=null) { 199 try { 200 try { 201 connection.setExceptionListener(pel); // To avoid double firing if there is another exception in stop(). 202 stop(); 203 } catch (Exception ex) { 204 ex.printStackTrace(); 205 logger.error(MessageProcessor.this, "Cannot stop the processor: "+ex); 206 } 207 } finally { 208 new Thread(restartCommand, "Connection restart thread "+restartCommand.getAttempt()).start(); 209 } 210 } 211 212 } 213 214 }); 215 216 destination = (Destination) initialContext.lookup(destinationName); 217 218 if (worker instanceof Component) { 219 ((Component) worker).start(); 220 } 221 222 if (worker instanceof MeasurementCollector) { 223 ((MeasurementCollector) worker).setMeasurementConsumer( 224 new MeasurementConsumer() { 225 226 public void addMeasurement(String name, double value, long time) { 227 MessageProcessor.this.addMeasurement("worker."+name, value, time); 228 } 229 230 }); 231 } 232 233 for (int i=0; i<listeners; ++i) { 234 final Session session = borrowSession(); 235 session.createConsumer(destination, messageSelector, noLocal).setMessageListener(new ProcessingMessageListener(session)); 236 } 237 238 logger.info(this, "Started"); 239 } catch (Exception e) { 240 if (e instanceof ConfigurationException) { 241 throw (ConfigurationException) e; 242 } 243 244 throw new ConfigurationException("Could not start message processor: "+e, e); 245 } 246 } 247 248 /** 249 * Processes request message 250 * @param request Request message 251 * @param session Session if message is processed in the message listener thread (worker is null or cannot process jobs), null otherwise. 252 */ 253 protected abstract void processMessage(Message request, Session session); 254 255 protected void _processMessage(Message request, Session session) { 256 long start = System.currentTimeMillis(); 257 try { 258 processMessage(request, session); 259 } finally { 260 long now = System.currentTimeMillis(); 261 addMeasurement("process", now - start, now); 262 } 263 } 264 265 /** 266 * Stops worker (thread pool), if any, and connection. 267 */ 268 public void stop() throws ConfigurationException { 269 try { 270 logger.info(this, "Stopping ..."); 271 272 // Stop worker 273 if (worker instanceof Component) { 274 ((Component) worker).stop(); 275 } 276 277 // No need to close individual sessions. 278 if (connection!=null) { 279 connection.close(); 280 } 281 282 logger.info(this, "Stopped"); 283 } catch (Exception e) { 284 if (e instanceof ConfigurationException) { 285 throw (ConfigurationException) e; 286 } 287 288 throw new ConfigurationException("Could not stop message processor"); 289 } 290 } 291 292 /** 293 * This implementation simply creates a new session. 294 * Subclasses can override this method to implement session pooling. 295 * @return 296 * @throws JMSException 297 */ 298 protected Session borrowSession() throws JMSException { 299 return connection.createSession(isTransacted, acknowledgeMode); 300 } 301 302 /** 303 * This implementation simply closes the session. 304 * Subclasses can override this method to implement session pooling. 305 * @throws JMSException 306 */ 307 protected void releaseSession(Session session) throws JMSException { 308 session.close(); 309 } 310 311 /** 312 * Provides access to destination, initial context, connection, and worker. 313 * Bridges to initial context. For names in a form initialContext/<name> the name is looked up in initial context. 314 */ 315 protected Object getChild(String name) { 316 if ("destination".equals(name)) { 317 return destination; 318 } 319 320 if ("initialContext".equals(name)) { 321 return initialContext; 322 } 323 324 if (name!=null && name.startsWith(INITIAL_CONTEXT_PREFIX)) { 325 String jndiName = name.substring(INITIAL_CONTEXT_PREFIX.length()); 326 try { 327 return initialContext.lookup(jndiName); 328 } catch (NamingException e) { 329 throw new RuntimeConfigurationException("Lookup in inital context failed for JNDI name '"+jndiName+"':"+e, e); 330 } 331 } 332 333 if ("connection".equals(name)) { 334 return connection; 335 } 336 337 if ("connectionFactory".equals(name)) { 338 return connectionFactory; 339 } 340 341 if ("worker".equals(name)) { 342 return worker; 343 } 344 345 return super.getChild(name); 346 } 347 348 private String messageSelector; 349 350 private boolean noLocal; 351 352 /** 353 * Message selector 354 * @param messageSelector 355 */ 356 public void setMessageSelector(String messageSelector) { 357 this.messageSelector = messageSelector; 358 } 359 360 /** 361 * If true and destination is topic then messages produced by this connection are not consumed by the message processor. 362 * @param noLocal 363 */ 364 public void setNoLocal(boolean noLocal) { 365 this.noLocal = noLocal; 366 } 367 368 protected RestartCommand restartCommand; 369 370 public void setRestartCommand(RestartCommand command) { 371 this.restartCommand = command; 372 } 373 }