001    package biz.hammurapi.jms.adapter;
002    
003    import java.util.ArrayList;
004    import java.util.Collection;
005    import java.util.Iterator;
006    import java.util.Timer;
007    import java.util.TimerTask;
008    import java.util.logging.Level;
009    import java.util.logging.Logger;
010    
011    import javax.jms.Connection;
012    import javax.jms.ConnectionFactory;
013    import javax.jms.ExceptionListener;
014    import javax.jms.JMSException;
015    import javax.jms.Session;
016    
017    import org.apache.commons.pool.ObjectPool;
018    import org.apache.commons.pool.PoolableObjectFactory;
019    import org.apache.commons.pool.impl.StackObjectPool;
020    
021    import biz.hammurapi.config.ConfigurationException;
022    import biz.hammurapi.config.GenericContainer;
023    import biz.hammurapi.jms.JMSExceptionEx;
024    
025    /**
026     * Base class for Jndi and Factory connections. 
027     * @author Pavel
028     */
029    public abstract class JmsConnection extends GenericContainer {
030            
031            private static final Logger logger = Logger.getLogger(JmsConnection.class.getName());
032            
033            private static final int ACKNOWLEDGE_MODE = Session.AUTO_ACKNOWLEDGE;
034    
035            private static final long DEFAULT_RECOVERY_DELAY = 10000; // 10 seconds default recovery delay
036    
037            protected biz.hammurapi.jms.adapter.definition.Connection definition;
038    
039            public JmsConnection(JmsAdapter jmsAdapter, biz.hammurapi.jms.adapter.definition.Connection definition) throws ConfigurationException {
040                    this.definition = definition;
041                    
042                    // Services
043                    biz.hammurapi.jms.adapter.definition.JmsService[] serviceArray = definition.getServiceArray();
044                    GenericContainer services=new GenericContainer();
045                    addComponent("services", services);
046                    for (int i=0; i<serviceArray.length; ++i) {
047                            JmsService service = new JmsService(serviceArray[i]);
048                            services.addComponent(serviceArray[i].getName(), service);
049                            for (int j=0; j<serviceArray[i].getAliasArray().length; ++j) {
050                                    jmsAdapter.set(serviceArray[i].getAliasArray()[j], service);
051                            }
052                    }
053                    
054                    // Proxy services
055                    biz.hammurapi.jms.adapter.definition.ProxyService[] proxyServiceArray = definition.getProxyServiceArray();
056                    for (int i=0; i<proxyServiceArray.length; ++i) {
057                            ProxyService service = new ProxyService(proxyServiceArray[i]);
058                            services.addComponent(proxyServiceArray[i].getName(), service);
059                            for (int j=0; j<proxyServiceArray[i].getAliasArray().length; ++j) {
060                                    jmsAdapter.set(proxyServiceArray[i].getAliasArray()[j], service);
061                            }
062                    }
063                    
064                    // Listeners
065                    biz.hammurapi.jms.adapter.definition.Listener[] listenerArray = definition.getListenerArray();
066                    ListenerContainer listeners=new ListenerContainer();
067                    addComponent("listeners", listeners);
068                    for (int i=0; i<listenerArray.length; ++i) {
069                            Listener listener = new Listener(listenerArray[i]);
070                            listeners.addComponent(listenerArray[i].getName(), listener);
071                            for (int j=0; j<listenerArray[i].getAliasArray().length; ++j) {
072                                    jmsAdapter.set(listenerArray[i].getAliasArray()[j], listener);
073                            }
074                    }
075                    
076            }
077            
078            private class ListenerContainer extends GenericContainer {
079                    
080                    void enroll(Session session) throws JMSException {
081                            Iterator it = getComponents().iterator();
082                            while (it.hasNext()) {
083                                    ((Listener) it.next()).enroll(session);
084                            }
085                    }
086            }
087            
088            /**
089             * Subclasses shall implement this method to either look up connection factory in JNDI or create using
090             * vendor-specific API's.
091             * @return
092             * @throws JMSException
093             */
094            protected abstract ConnectionFactory getConnectionFactory() throws ConfigurationException;
095            
096            
097            private class ThreadSessionEntry {
098                    int counter;
099                    Session session;
100            }
101            
102            private ThreadLocal sessionTL = new ThreadLocal() {
103                    protected Object initialValue() {
104                            return new ThreadSessionEntry();
105                    }
106            };
107            
108            private int connectionIndex;
109            
110            private static final long CONNECTION_WAIT = 30000; // 30 seconds to wait for connection availability.
111            
112            /**
113             * @return Valid connection is a round-robin fashion
114             */
115            private Connection getConnection() throws JMSException {
116                    for (int k=0; k<2; ++k) {
117                            synchronized (connections) {
118                                    for (int i=0; i<connections.length; ++i) {
119                                            connectionIndex = (connectionIndex+1) % connections.length;
120                                            if (connections[connectionIndex]!=null) {
121                                                    return connections[connectionIndex];
122                                            }
123                                    }
124                                    
125                                    try {
126                                            connections.wait(CONNECTION_WAIT);
127                                    } catch (InterruptedException e) {
128                                            throw new JMSExceptionEx("Waiting for connection was interrupted: "+e, e);
129                                    }
130                            }
131                    }
132                    
133                    throw new JMSException("No connection is available at this time.");
134            }
135            
136            public Session borrowSession() throws JMSException {
137                    ThreadSessionEntry tse = (ThreadSessionEntry) sessionTL.get();
138                    
139                    if (definition.getReuseThreadSession() && tse.counter>0) {
140                            ++tse.counter;
141                            return tse.session;
142                    }
143                    
144                    if (sessionPool == null) {
145                            /*
146                             *  TODO Use keyed pool to allow services to specify transactionality and acknowledge mode
147                             *  Do not reuse thread session if transactional.
148                             *  More complex ThreadSessionEntry - key sessions by acknowledge mode.
149                             */
150                            return getConnection().createSession(false, ACKNOWLEDGE_MODE);
151                    } else {
152                            try {
153                                    return (Session) sessionPool.borrowObject();
154                            } catch (Exception e) {
155                                    throw new JMSExceptionEx("Failed to get session from pool: "+e, e);
156                            }                       
157                    }
158            }
159            
160            private ObjectPool sessionPool;
161            
162            public void returnSession(Session session) throws JMSException {
163                    ThreadSessionEntry tse = (ThreadSessionEntry) sessionTL.get();
164                    
165                    if (definition.getReuseThreadSession() && --tse.counter>0) {
166                            return;
167                    }
168                    
169                    tse.counter=0;
170                    tse.session=null;
171                    
172                    if (sessionPool == null) {
173                            session.close();
174                    } else {
175                            try {
176                                    sessionPool.returnObject(session);
177                            } catch (Exception e) {
178                                    throw new JMSExceptionEx("Failed to return session to pool: "+e, e);
179                            }                       
180                    }
181            }
182            
183            public void invalidateSession(Session session) throws JMSException {
184                    ThreadSessionEntry tse = (ThreadSessionEntry) sessionTL.get();
185                    tse.counter=0;
186                    tse.session=null;
187                    
188                    if (sessionPool == null) {
189                            session.close();
190                    } else {
191                            try {
192                                    sessionPool.invalidateObject(session);
193                            } catch (Exception e) {
194                                    throw new JMSExceptionEx("Failed to invalidate session in pool: "+e, e);
195                            }                       
196                    }
197                    
198            }
199            
200            private Connection[] connections;
201            
202            private Collection connectionTasks = new ArrayList();
203            
204            /**
205             * Starts connection, enrolls listener sessions
206             * @author Pavel
207             *
208             */
209            private class ConnectionTask extends TimerTask {
210                    
211                    private int index;
212                    private boolean cancelOnSuccess;
213    
214                    ConnectionTask(int index, boolean cancelOnSuccess) {
215                            this.index = index;
216                            this.cancelOnSuccess = cancelOnSuccess;
217                            synchronized (connectionTasks) {
218                                    connectionTasks.add(this);
219                            }
220                    }
221                    
222                    public boolean cancel() {
223                            synchronized (connectionTasks) {
224                                    connectionTasks.remove(this);
225                            }
226                            return super.cancel();
227                    }
228    
229                    public void run() {
230                            try {
231                                    ConnectionFactory factory = getConnectionFactory();                             
232                                    Connection connection;
233                                    if (JmsAdapter.isBlank(definition.getUser())) {
234                                            connection = factory.createConnection();
235                                    } else {
236                                            connection = factory.createConnection(definition.getUser(), definition.getPassword());
237                                    }
238                                    
239                                    connection.setExceptionListener(
240                                            new ExceptionListener() {
241    
242                                                    public void onException(JMSException e) {
243                                                            logger.log(Level.SEVERE, "Connection failure: "+e+", re-creating connection.", e);
244                                                            synchronized (connections) {
245                                                                    connections[index]=null;
246                                                            }
247                                                            Timer timer = ((JmsAdapter) getOwner(JmsAdapter.class)).getTimer();
248                                                            timer.schedule(new ConnectionTask(index, true), 0, definition.getRecoveryDelay()==0 ? DEFAULT_RECOVERY_DELAY : definition.getRecoveryDelay());
249                                                    }
250                                                    
251                                            });
252                                    
253                                    int listeningSessions = Math.max(1, definition.getListeningSessions());
254                                    for (int i=0; i<listeningSessions; ++i) {
255                                            ((ListenerContainer) get("listeners")).enroll(connection.createSession(false, ACKNOWLEDGE_MODE));
256                                    }
257                                    
258                                    connection.start();
259                                    
260                                    logger.fine("Connection started");
261                                    
262                                    synchronized (connections) {
263                                            if (connections[index]!=null) {
264                                                    try {
265                                                            connections[index].close();
266                                                    } catch (JMSException e) {
267                                                            logger.log(Level.WARNING, "Could not close connection: "+e, e);                                                 
268                                                    }
269                                            }
270                                            connections[index] = connection;
271                                            connections.notifyAll(); // Notify pending threads about availability of a connection.                                  
272                                    }
273                                    
274                                    if (cancelOnSuccess) {
275                                            cancel();
276                                    } else {
277                                            index = index+1 % connections.length;
278                                    }
279                            } catch (Exception e) {
280                                    logger.log(Level.SEVERE, "Could not create connection: "+e, e);
281                            }                       
282                    }
283                    
284            }
285            
286            public void start() throws ConfigurationException {
287                    // Start connections and schedule refresh tasks.
288                    connections = new Connection[Math.max(1, definition.getConnectionPool())];
289                    Timer timer = ((JmsAdapter) getOwner(JmsAdapter.class)).getTimer();
290                    for (int i=0; i<connections.length; ++i) {
291                            timer.schedule(new ConnectionTask(i, true), 0, definition.getRecoveryDelay()==0 ? DEFAULT_RECOVERY_DELAY : definition.getRecoveryDelay());
292                    }
293                    
294                    // Regular reconnection
295                    if (definition.getRefreshInterval()>0) {
296                            timer.schedule(new ConnectionTask(0, false), definition.getRefreshInterval(), definition.getRefreshInterval());
297                    }
298                    
299                    // Create session pool
300                    if (definition.getSessionPool()!=null) {
301                            PoolableObjectFactory sessionFactory = new PoolableObjectFactory() {
302    
303                                    public void activateObject(Object obj) throws Exception {
304                                            // Nothing to do                                        
305                                    }
306    
307                                    public void destroyObject(Object obj) throws Exception {                                        
308                                            ((Session) obj).close();
309                                    }
310    
311                                    public Object makeObject() throws Exception {
312                                            return getConnection().createSession(false, ACKNOWLEDGE_MODE);
313                                    }
314    
315                                    public void passivateObject(Object obj) throws Exception {
316                                            // Nothing to do                                        
317                                    }
318    
319                                    public boolean validateObject(Object obj) {
320                                            // Nothing to do
321                                            return true;
322                                    }
323                                    
324                            };
325                            
326                            sessionPool = new StackObjectPool(
327                                            sessionFactory, 
328                                            definition.getSessionPool().getMaxIdle(), 
329                                            definition.getSessionPool().getInitial());
330                    }
331                    
332                    super.start();
333            }
334            
335            public void stop() throws ConfigurationException {
336                    super.stop();
337                    
338                    if (sessionPool!=null) {
339                            try {
340                                    sessionPool.close();
341                            } catch (Exception e) {
342                                    throw new ConfigurationException("Could not stop session pool: "+e, e);
343                            }
344                    }
345                    
346                    Iterator it;
347                    synchronized (connectionTasks) {
348                            it = new ArrayList(connectionTasks).iterator();
349                    }
350                    
351                    while (it.hasNext()) {
352                            ((ConnectionTask) it.next()).cancel();
353                    }
354                    
355                    synchronized (connections) {
356                            for (int i=0; i<connections.length; ++i) {
357                                    if (connections[i]!=null) {
358                                            try {
359                                                    connections[i].close();
360                                            } catch (JMSException e) {
361                                                    logger.log(Level.INFO, "Connection closing failed: "+e, e);
362                                            }
363                                    }
364                            }
365                    }
366            }
367    
368    }