001 package biz.hammurapi.jms.adapter; 002 003 import java.util.ArrayList; 004 import java.util.Collection; 005 import java.util.Iterator; 006 import java.util.Timer; 007 import java.util.TimerTask; 008 import java.util.logging.Level; 009 import java.util.logging.Logger; 010 011 import javax.jms.Connection; 012 import javax.jms.ConnectionFactory; 013 import javax.jms.ExceptionListener; 014 import javax.jms.JMSException; 015 import javax.jms.Session; 016 017 import org.apache.commons.pool.ObjectPool; 018 import org.apache.commons.pool.PoolableObjectFactory; 019 import org.apache.commons.pool.impl.StackObjectPool; 020 021 import biz.hammurapi.config.ConfigurationException; 022 import biz.hammurapi.config.GenericContainer; 023 import biz.hammurapi.jms.JMSExceptionEx; 024 025 /** 026 * Base class for Jndi and Factory connections. 027 * @author Pavel 028 */ 029 public abstract class JmsConnection extends GenericContainer { 030 031 private static final Logger logger = Logger.getLogger(JmsConnection.class.getName()); 032 033 private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE; 034 035 private static final long DEFAULT_RECOVERY_DELAY = 10000; // 10 seconds default recovery delay 036 037 protected biz.hammurapi.jms.adapter.definition.Connection definition; 038 039 public JmsConnection(JmsAdapter jmsAdapter, biz.hammurapi.jms.adapter.definition.Connection definition) throws ConfigurationException { 040 this.definition = definition; 041 042 // Services 043 biz.hammurapi.jms.adapter.definition.JmsService[] serviceArray = definition.getServiceArray(); 044 GenericContainer services=new GenericContainer(); 045 addComponent("services", services); 046 for (int i=0; i<serviceArray.length; ++i) { 047 JmsService service = new JmsService(serviceArray[i]); 048 services.addComponent(serviceArray[i].getName(), service); 049 for (int j=0; j<serviceArray[i].getAliasArray().length; ++j) { 050 jmsAdapter.set(serviceArray[i].getAliasArray()[j], service); 051 } 052 } 053 054 // Proxy services 055 biz.hammurapi.jms.adapter.definition.ProxyService[] proxyServiceArray = definition.getProxyServiceArray(); 056 for (int i=0; i<proxyServiceArray.length; ++i) { 057 ProxyService service = new ProxyService(proxyServiceArray[i]); 058 services.addComponent(proxyServiceArray[i].getName(), service); 059 for (int j=0; j<proxyServiceArray[i].getAliasArray().length; ++j) { 060 jmsAdapter.set(proxyServiceArray[i].getAliasArray()[j], service); 061 } 062 } 063 064 // Listeners 065 biz.hammurapi.jms.adapter.definition.Listener[] listenerArray = definition.getListenerArray(); 066 ListenerContainer listeners=new ListenerContainer(); 067 addComponent("listeners", listeners); 068 for (int i=0; i<listenerArray.length; ++i) { 069 Listener listener = new Listener(listenerArray[i]); 070 listeners.addComponent(listenerArray[i].getName(), listener); 071 for (int j=0; j<listenerArray[i].getAliasArray().length; ++j) { 072 jmsAdapter.set(listenerArray[i].getAliasArray()[j], listener); 073 } 074 } 075 076 } 077 078 private class ListenerContainer extends GenericContainer { 079 080 void enroll(Session session) throws JMSException { 081 Iterator it = getComponents().iterator(); 082 while (it.hasNext()) { 083 ((Listener) it.next()).enroll(session); 084 } 085 } 086 } 087 088 /** 089 * Subclasses shall implement this method to either look up connection factory in JNDI or create using 090 * vendor-specific API's. 091 * @return 092 * @throws JMSException 093 */ 094 protected abstract ConnectionFactory getConnectionFactory() throws ConfigurationException; 095 096 097 private class ThreadSessionEntry { 098 int counter; 099 Session session; 100 } 101 102 private ThreadLocal sessionTL = new ThreadLocal() { 103 protected Object initialValue() { 104 return new ThreadSessionEntry(); 105 } 106 }; 107 108 private int connectionIndex; 109 110 private static final long CONNECTION_WAIT = 30000; // 30 seconds to wait for connection availability. 111 112 /** 113 * @return Valid connection is a round-robin fashion 114 */ 115 private Connection getConnection() throws JMSException { 116 for (int k=0; k<2; ++k) { 117 synchronized (connections) { 118 for (int i=0; i<connections.length; ++i) { 119 connectionIndex = (connectionIndex+1) % connections.length; 120 if (connections[connectionIndex]!=null) { 121 return connections[connectionIndex]; 122 } 123 } 124 125 try { 126 connections.wait(CONNECTION_WAIT); 127 } catch (InterruptedException e) { 128 throw new JMSExceptionEx("Waiting for connection was interrupted: "+e, e); 129 } 130 } 131 } 132 133 throw new JMSException("No connection is available at this time."); 134 } 135 136 public Session borrowSession() throws JMSException { 137 ThreadSessionEntry tse = (ThreadSessionEntry) sessionTL.get(); 138 139 if (definition.getReuseThreadSession() && tse.counter>0) { 140 ++tse.counter; 141 return tse.session; 142 } 143 144 if (sessionPool == null) { 145 /* 146 * TODO Use keyed pool to allow services to specify transactionality and acknowledge mode 147 * Do not reuse thread session if transactional. 148 * More complex ThreadSessionEntry - key sessions by acknowledge mode. 149 */ 150 return getConnection().createSession(false, ACKNOWLEDGE_MODE); 151 } else { 152 try { 153 return (Session) sessionPool.borrowObject(); 154 } catch (Exception e) { 155 throw new JMSExceptionEx("Failed to get session from pool: "+e, e); 156 } 157 } 158 } 159 160 private ObjectPool sessionPool; 161 162 public void returnSession(Session session) throws JMSException { 163 ThreadSessionEntry tse = (ThreadSessionEntry) sessionTL.get(); 164 165 if (definition.getReuseThreadSession() && --tse.counter>0) { 166 return; 167 } 168 169 tse.counter=0; 170 tse.session=null; 171 172 if (sessionPool == null) { 173 session.close(); 174 } else { 175 try { 176 sessionPool.returnObject(session); 177 } catch (Exception e) { 178 throw new JMSExceptionEx("Failed to return session to pool: "+e, e); 179 } 180 } 181 } 182 183 public void invalidateSession(Session session) throws JMSException { 184 ThreadSessionEntry tse = (ThreadSessionEntry) sessionTL.get(); 185 tse.counter=0; 186 tse.session=null; 187 188 if (sessionPool == null) { 189 session.close(); 190 } else { 191 try { 192 sessionPool.invalidateObject(session); 193 } catch (Exception e) { 194 throw new JMSExceptionEx("Failed to invalidate session in pool: "+e, e); 195 } 196 } 197 198 } 199 200 private Connection[] connections; 201 202 private Collection connectionTasks = new ArrayList(); 203 204 /** 205 * Starts connection, enrolls listener sessions 206 * @author Pavel 207 * 208 */ 209 private class ConnectionTask extends TimerTask { 210 211 private int index; 212 private boolean cancelOnSuccess; 213 214 ConnectionTask(int index, boolean cancelOnSuccess) { 215 this.index = index; 216 this.cancelOnSuccess = cancelOnSuccess; 217 synchronized (connectionTasks) { 218 connectionTasks.add(this); 219 } 220 } 221 222 public boolean cancel() { 223 synchronized (connectionTasks) { 224 connectionTasks.remove(this); 225 } 226 return super.cancel(); 227 } 228 229 public void run() { 230 try { 231 ConnectionFactory factory = getConnectionFactory(); 232 Connection connection; 233 if (JmsAdapter.isBlank(definition.getUser())) { 234 connection = factory.createConnection(); 235 } else { 236 connection = factory.createConnection(definition.getUser(), definition.getPassword()); 237 } 238 239 connection.setExceptionListener( 240 new ExceptionListener() { 241 242 public void onException(JMSException e) { 243 logger.log(Level.SEVERE, "Connection failure: "+e+", re-creating connection.", e); 244 synchronized (connections) { 245 connections[index]=null; 246 } 247 Timer timer = ((JmsAdapter) getOwner(JmsAdapter.class)).getTimer(); 248 timer.schedule(new ConnectionTask(index, true), 0, definition.getRecoveryDelay()==0 ? DEFAULT_RECOVERY_DELAY : definition.getRecoveryDelay()); 249 } 250 251 }); 252 253 int listeningSessions = Math.max(1, definition.getListeningSessions()); 254 for (int i=0; i<listeningSessions; ++i) { 255 ((ListenerContainer) get("listeners")).enroll(connection.createSession(false, ACKNOWLEDGE_MODE)); 256 } 257 258 connection.start(); 259 260 logger.fine("Connection started"); 261 262 synchronized (connections) { 263 if (connections[index]!=null) { 264 try { 265 connections[index].close(); 266 } catch (JMSException e) { 267 logger.log(Level.WARNING, "Could not close connection: "+e, e); 268 } 269 } 270 connections[index] = connection; 271 connections.notifyAll(); // Notify pending threads about availability of a connection. 272 } 273 274 if (cancelOnSuccess) { 275 cancel(); 276 } else { 277 index = index+1 % connections.length; 278 } 279 } catch (Exception e) { 280 logger.log(Level.SEVERE, "Could not create connection: "+e, e); 281 } 282 } 283 284 } 285 286 public void start() throws ConfigurationException { 287 // Start connections and schedule refresh tasks. 288 connections = new Connection[Math.max(1, definition.getConnectionPool())]; 289 Timer timer = ((JmsAdapter) getOwner(JmsAdapter.class)).getTimer(); 290 for (int i=0; i<connections.length; ++i) { 291 timer.schedule(new ConnectionTask(i, true), 0, definition.getRecoveryDelay()==0 ? DEFAULT_RECOVERY_DELAY : definition.getRecoveryDelay()); 292 } 293 294 // Regular reconnection 295 if (definition.getRefreshInterval()>0) { 296 timer.schedule(new ConnectionTask(0, false), definition.getRefreshInterval(), definition.getRefreshInterval()); 297 } 298 299 // Create session pool 300 if (definition.getSessionPool()!=null) { 301 PoolableObjectFactory sessionFactory = new PoolableObjectFactory() { 302 303 public void activateObject(Object obj) throws Exception { 304 // Nothing to do 305 } 306 307 public void destroyObject(Object obj) throws Exception { 308 ((Session) obj).close(); 309 } 310 311 public Object makeObject() throws Exception { 312 return getConnection().createSession(false, ACKNOWLEDGE_MODE); 313 } 314 315 public void passivateObject(Object obj) throws Exception { 316 // Nothing to do 317 } 318 319 public boolean validateObject(Object obj) { 320 // Nothing to do 321 return true; 322 } 323 324 }; 325 326 sessionPool = new StackObjectPool( 327 sessionFactory, 328 definition.getSessionPool().getMaxIdle(), 329 definition.getSessionPool().getInitial()); 330 } 331 332 super.start(); 333 } 334 335 public void stop() throws ConfigurationException { 336 super.stop(); 337 338 if (sessionPool!=null) { 339 try { 340 sessionPool.close(); 341 } catch (Exception e) { 342 throw new ConfigurationException("Could not stop session pool: "+e, e); 343 } 344 } 345 346 Iterator it; 347 synchronized (connectionTasks) { 348 it = new ArrayList(connectionTasks).iterator(); 349 } 350 351 while (it.hasNext()) { 352 ((ConnectionTask) it.next()).cancel(); 353 } 354 355 synchronized (connections) { 356 for (int i=0; i<connections.length; ++i) { 357 if (connections[i]!=null) { 358 try { 359 connections[i].close(); 360 } catch (JMSException e) { 361 logger.log(Level.INFO, "Connection closing failed: "+e, e); 362 } 363 } 364 } 365 } 366 } 367 368 }