001    /*
002    @license.text@
003     */
004    package biz.hammurapi.jms;
005    
006    import java.io.IOException;
007    import java.io.Serializable;
008    
009    import javax.jms.JMSException;
010    import javax.jms.Message;
011    import javax.jms.ObjectMessage;
012    import javax.jms.Queue;
013    import javax.jms.QueueConnection;
014    import javax.jms.QueueConnectionFactory;
015    import javax.jms.QueueSender;
016    import javax.jms.QueueSession;
017    import javax.jms.Session;
018    import javax.jms.TextMessage;
019    import javax.xml.parsers.FactoryConfigurationError;
020    import javax.xml.parsers.ParserConfigurationException;
021    import javax.xml.transform.TransformerException;
022    
023    import org.w3c.dom.Element;
024    import org.w3c.dom.Node;
025    
026    import biz.hammurapi.config.ComponentBase;
027    import biz.hammurapi.config.ConfigurationException;
028    import biz.hammurapi.config.Context;
029    import biz.hammurapi.config.DomConfigurable;
030    import biz.hammurapi.util.Worker;
031    import biz.hammurapi.xml.dom.DOMUtils;
032    import biz.hammurapi.xml.dom.DomSerializable;
033    
034    
035    /**
036     * Posts serializable and dom-serializable jobs to
037     * JMS queue.
038     * @author Pavel Vlasov
039     * @revision $Revision$
040     */
041    public class JmsWorker extends ComponentBase implements Worker, DomConfigurable {
042    
043            private String queueName;
044            private String connectionFactoryName;
045            private String user;
046            private String password;
047            
048            private Queue queue;
049            private QueueConnection connection;
050            
051            /**
052             * Jobs can implement this interface to set
053             * JMS message headers 
054             * @author Pavel Vlasov
055             * @revision $Revision$
056             */
057            public interface Selectable {
058                    void setMessageHeaders(Message message);
059            }
060    
061            public JmsWorker() {
062                    super();
063            }
064    
065            public boolean post(Runnable job) {
066                    try {
067                    QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
068                    try {
069                            if (job instanceof JmsSerializable) {
070                                    sendMessage(session, ((JmsSerializable) job).toMessage(session));
071                                    return true;
072                            } else if (job instanceof DomSerializable) {
073                                    TextMessage message = session.createTextMessage();
074                                    if (job instanceof Selectable) {
075                                            ((Selectable) job).setMessageHeaders(message);
076                                    }
077                                    message.setStringProperty("type", job.getClass().getName());
078                                    message.setText(DOMUtils.toXmlString(job, "job"));
079                                    
080                                    sendMessage(session, message);
081                                    return true;
082                            } else if (job instanceof Serializable) {
083                                    ObjectMessage message = session.createObjectMessage();
084                                    if (job instanceof Selectable) {
085                                            ((Selectable) job).setMessageHeaders(message);
086                                    }
087                                    message.setStringProperty("type", job.getClass().getName());
088                                    message.setObject((Serializable) job);
089                            
090                                    sendMessage(session, message);
091                                    return true;
092                            } 
093                            return false;
094                            } finally {
095                            session.close();
096                    }
097                    } catch (JMSException e) {
098                            throw new RuntimeException("Could not post job to queue: "+e, e);
099            } catch (IOException e) {
100                            throw new RuntimeException("Could not serialize job to XML: "+e, e);
101                    } catch (TransformerException e) {
102                            throw new RuntimeException("Could not serialize job to XML: "+e, e);
103                    } catch (ParserConfigurationException e) {
104                            throw new RuntimeException("Could not serialize job to XML: "+e, e);
105                    } catch (FactoryConfigurationError e) {
106                            throw new RuntimeException("Could not serialize job to XML: "+e, e);
107                    }
108            }
109    
110            /**
111             * @param session
112             * @param message
113             * @throws JMSException
114             */
115            private void sendMessage(QueueSession session, Message message) throws JMSException {
116                    QueueSender sender = session.createSender(queue);
117                    try {
118                            sender.send(message);
119                    } finally {
120                            sender.close();
121                    }
122            }
123    
124            public void start() throws ConfigurationException {
125                    try {
126                            QueueConnectionFactory connectionFactory = (QueueConnectionFactory) get(connectionFactoryName);
127                            queue = (Queue) get(queueName);
128    
129                            if (user == null) {
130                                    connection = connectionFactory.createQueueConnection();
131                            } else {
132                                    connection = connectionFactory.createQueueConnection(user, password);
133                            }
134                    } catch (JMSException e) {
135                            throw new ConfigurationException(e);
136                    }
137            }
138    
139            public void stop() throws ConfigurationException {
140                    if (connection!=null) {
141                            try {
142                                    connection.close();
143                            } catch (JMSException e) {
144                                    throw new ConfigurationException(e);
145                            }
146                    }
147            }
148    
149            public void configure(Node configNode, Context context) throws ConfigurationException {
150                    Element ce = (Element) configNode;
151                    queueName=ce.getAttribute("queue");
152                    connectionFactoryName=ce.getAttribute("connection-factory");
153                    if (ce.hasAttribute("user")) {
154                            user=ce.getAttribute("user");
155                            password=ce.getAttribute("password");
156                    }
157            }
158    }