001    package biz.hammurapi.jms;
002    
003    import javax.jms.Connection;
004    import javax.jms.ConnectionFactory;
005    import javax.jms.Destination;
006    import javax.jms.ExceptionListener;
007    import javax.jms.JMSException;
008    import javax.jms.Message;
009    import javax.jms.MessageListener;
010    import javax.jms.Session;
011    import javax.naming.Context;
012    import javax.naming.NamingException;
013    
014    import biz.hammurapi.config.Component;
015    import biz.hammurapi.config.ComponentBase;
016    import biz.hammurapi.config.ConfigurationException;
017    import biz.hammurapi.config.RestartCommand;
018    import biz.hammurapi.config.Restartable;
019    import biz.hammurapi.config.RuntimeConfigurationException;
020    import biz.hammurapi.logging.ConsoleLogger;
021    import biz.hammurapi.logging.Logger;
022    import biz.hammurapi.metrics.MeasurementCollector;
023    import biz.hammurapi.metrics.MeasurementConsumer;
024    import biz.hammurapi.util.Worker;
025    
026    /**
027     * Base class for JMS message processors.
028     * @author Pavel Vlasov
029     */
030    public abstract class MessageProcessor extends ComponentBase implements Restartable {
031            
032            private static final String INITIAL_CONTEXT_PREFIX = "initialContext/";
033            protected Logger logger = new ConsoleLogger(ConsoleLogger.INFO);
034            
035            /**
036             * Sets logger.
037             * @param logger
038             */
039            public void setLogger(Logger logger) {
040                    this.logger = logger;
041            }
042                    
043            protected Context initialContext;
044            private String connectionFactoryName;
045            private String destinationName;
046            private int listeners = Runtime.getRuntime().availableProcessors();
047            
048            /**
049             * Number of message listening threads. Defaults to number of processes available to JVM.
050             * @param listeners
051             */
052            public void setListeners(int listeners) {
053                    this.listeners = listeners;
054            }
055            
056            public void setInitialContext(Context initialContext) {
057                    this.initialContext = initialContext;
058            }
059            
060            /**
061             * Request destination (queue or topic) name
062             * @param destinationName
063             */
064            public void setDestination(String destinationName) {
065                    this.destinationName = destinationName;
066            }
067            
068            /**
069             * JMS connection name
070             * @param connectionName
071             */
072            public void setConnectionFactory(String connectionFactoryName) {
073                    this.connectionFactoryName = connectionFactoryName;
074            }
075            
076            /**
077             * Worker to process requests. It is optional. If worker is not set or cannot process requests,
078             * requests are processed in the message listener thread.
079             * @param worker
080             */
081            public void setWorker(Worker worker) {
082                    this.worker = worker;
083            }
084            
085            protected Connection connection;
086            private Destination destination;
087            protected boolean isTransacted;
088            protected int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
089    
090            /**
091             * Acknoledge mode. Valid values: AUTO (default), CLIENT, DUPS_OK.
092             * @param acknowledgeModeName
093             */
094            public void setAcknowledgeMode(String acknowledgeModeName) {
095                    try {
096                            this.acknowledgeMode = Session.class.getField(acknowledgeModeName+"_ACKNOWLEDGE").getInt(null);
097                    } catch (Exception e) {
098                            throw new IllegalArgumentException("Invalid acknowledge mode '"+acknowledgeModeName+"': "+e);
099                    }
100            }
101            
102            /**
103             * Explicitly sets connection. This method is useful when several components share one connection.
104             * @param connection
105             */
106            public void setConnection(Connection connection) {
107                    this.connection = connection;
108            }
109            
110            /**
111             * Transactional attribute of JMS Session.
112             * @param isTransacted
113             */
114            public void setTransacted(boolean isTransacted) {
115                    this.isTransacted = isTransacted;
116            }
117            
118            protected Worker worker;        
119            
120            private String user;
121            private String pwd;
122            protected ConnectionFactory connectionFactory;
123            
124            /** 
125             * JMS Connection user name. Optional.
126             * @param user
127             */
128            public void setUser(String user) {
129                    this.user = user;
130            }
131            
132            /**
133             * JMS Connection password. Optional.
134             * @param pwd
135             */
136            public void setPassword(String pwd) {
137                    this.pwd = pwd;
138            }
139    
140            /**
141             * Message listener class.
142             * @author Pavel Vlasov
143             */
144            private class ProcessingMessageListener implements MessageListener {
145                    
146                    private Session session;
147    
148                    public ProcessingMessageListener(Session session) {
149                            this.session = session;
150                    }               
151                    
152                    public synchronized void onMessage(final Message request) {
153                            logger.debug(this, "Message received");
154                            addMeasurement("receive", 1, System.currentTimeMillis());
155                            
156                            if (worker==null) {
157                                    _processMessage(request, session);
158                            } else {
159                                    Runnable job = new Runnable() {
160    
161                                            public void run() {
162                                                    _processMessage(request, null);
163                                            }
164                                            
165                                    };
166                                    
167                                    // Try to post job to worker. process in the current thread if worker doesn't accept it.
168                                    if (!worker.post(job)) {
169                                            _processMessage(request, session);
170                                    }
171                            }                                               
172                    }
173                                    
174            };
175    
176            public void start() throws ConfigurationException {
177                    try {                   
178                            logger.info(this, "Starting ...");
179                            
180                            connectionFactory = (ConnectionFactory) initialContext.lookup(connectionFactoryName);
181                            if (user==null) {
182                                    connection = connectionFactory.createConnection();
183                            } else {
184                                    connection = connectionFactory.createConnection(user, pwd);
185                            }
186                            connection.start();
187                            
188                            final ExceptionListener pel = connection.getExceptionListener();
189                            connection.setExceptionListener(new ExceptionListener() {
190    
191                                    public void onException(JMSException e) {
192                                            if (pel!=null) {
193                                                    pel.onException(e);
194                                            }
195                                                                                    
196                                            logger.error(MessageProcessor.this, "JMS connection exception: " + e.toString());
197                                            
198                                            if (restartCommand!=null) {
199                                                    try {
200                                                            try {
201                                                                    connection.setExceptionListener(pel); // To avoid double firing if there is another exception in stop().
202                                                                    stop();
203                                                            } catch (Exception ex) {
204                                                                    ex.printStackTrace();
205                                                                    logger.error(MessageProcessor.this, "Cannot stop the processor: "+ex);
206                                                            }
207                                                    } finally {
208                                                            new Thread(restartCommand, "Connection restart thread "+restartCommand.getAttempt()).start();
209                                                    }
210                                            }
211                                            
212                                    }
213                                    
214                            });
215                            
216                            destination = (Destination) initialContext.lookup(destinationName);
217                            
218                            if (worker instanceof Component) {
219                                    ((Component) worker).start();
220                            }
221                            
222                            if (worker instanceof MeasurementCollector) {
223                                    ((MeasurementCollector) worker).setMeasurementConsumer(
224                                                    new MeasurementConsumer() {
225    
226                                                            public void addMeasurement(String name, double value, long time) {
227                                                                    MessageProcessor.this.addMeasurement("worker."+name, value, time);                                                              
228                                                            }
229                                                            
230                                                    });
231                            }                       
232                            
233                            for (int i=0; i<listeners; ++i) {
234                                    final Session session = borrowSession();
235                                    session.createConsumer(destination, messageSelector, noLocal).setMessageListener(new ProcessingMessageListener(session));
236                            }
237    
238                            logger.info(this, "Started");
239                    } catch (Exception e) {
240                            if (e instanceof ConfigurationException) {
241                                    throw (ConfigurationException) e;
242                            }
243                            
244                            throw new ConfigurationException("Could not start message processor: "+e, e);
245                    }               
246            }
247            
248            /**
249             * Processes request message
250             * @param request Request message
251             * @param session Session if message is processed in the message listener thread (worker is null or cannot process jobs), null otherwise.
252             */
253            protected abstract void processMessage(Message request, Session session);
254            
255            protected void _processMessage(Message request, Session session) {
256                    long start = System.currentTimeMillis();
257                    try {
258                            processMessage(request, session);
259                    } finally {
260                            long now = System.currentTimeMillis();
261                            addMeasurement("process", now - start, now);
262                    }
263            }
264            
265            /**
266             * Stops worker (thread pool), if any, and connection.
267             */
268            public void stop() throws ConfigurationException {
269                    try {                   
270                            logger.info(this, "Stopping ...");
271                            
272                            // Stop worker
273                            if (worker instanceof Component) {
274                                    ((Component) worker).stop();
275                            }
276                                                    
277                            // No need to close individual sessions.
278                            if (connection!=null) {
279                                    connection.close();
280                            }               
281                            
282                            logger.info(this, "Stopped");
283                    } catch (Exception e) {
284                            if (e instanceof ConfigurationException) {
285                                    throw (ConfigurationException) e;
286                            }
287                            
288                            throw new ConfigurationException("Could not stop message processor");
289                    }
290            }
291            
292            /**
293             * This implementation simply creates a new session.
294             * Subclasses can override this method to implement session pooling.
295             * @return
296             * @throws JMSException 
297             */
298            protected Session borrowSession() throws JMSException {
299                    return connection.createSession(isTransacted, acknowledgeMode);
300            }
301    
302            /**
303             * This implementation simply closes the session.
304             * Subclasses can override this method to implement session pooling.
305             * @throws JMSException 
306             */
307            protected void releaseSession(Session session) throws JMSException {
308                    session.close();
309            }
310    
311            /**
312             * Provides access to destination, initial context, connection, and worker.
313             * Bridges to initial context. For names in a form initialContext/&lt;name&gt; the name is looked up in initial context.
314             */
315            protected Object getChild(String name) {
316                    if ("destination".equals(name)) {
317                            return destination;
318                    }
319                    
320                    if ("initialContext".equals(name)) {
321                            return initialContext;
322                    }
323                    
324                    if (name!=null && name.startsWith(INITIAL_CONTEXT_PREFIX)) {
325                            String jndiName = name.substring(INITIAL_CONTEXT_PREFIX.length());
326                            try {
327                                    return initialContext.lookup(jndiName);
328                            } catch (NamingException e) {
329                                    throw new RuntimeConfigurationException("Lookup in inital context failed for JNDI name '"+jndiName+"':"+e, e);
330                            }
331                    }
332                    
333                    if ("connection".equals(name)) {
334                            return connection;
335                    }
336                    
337                    if ("connectionFactory".equals(name)) {
338                            return connectionFactory;
339                    }
340                    
341                    if ("worker".equals(name)) {
342                            return worker;
343                    }               
344                    
345                    return super.getChild(name);
346            }
347            
348            private String messageSelector;
349            
350            private boolean noLocal;
351            
352            /**
353             * Message selector
354             * @param messageSelector
355             */
356            public void setMessageSelector(String messageSelector) {
357                    this.messageSelector = messageSelector;
358            }
359            
360            /**
361             * If true and destination is topic then messages produced by this connection are not consumed by the message processor.
362             * @param noLocal
363             */
364            public void setNoLocal(boolean noLocal) {
365                    this.noLocal = noLocal;
366            }
367            
368            protected RestartCommand restartCommand;
369                    
370            public void setRestartCommand(RestartCommand command) {
371                    this.restartCommand = command;          
372            }
373    }