001 /* 002 @license.text@ 003 */ 004 package biz.hammurapi.jms; 005 006 import java.io.IOException; 007 import java.io.Serializable; 008 009 import javax.jms.JMSException; 010 import javax.jms.Message; 011 import javax.jms.ObjectMessage; 012 import javax.jms.Queue; 013 import javax.jms.QueueConnection; 014 import javax.jms.QueueConnectionFactory; 015 import javax.jms.QueueSender; 016 import javax.jms.QueueSession; 017 import javax.jms.Session; 018 import javax.jms.TextMessage; 019 import javax.xml.parsers.FactoryConfigurationError; 020 import javax.xml.parsers.ParserConfigurationException; 021 import javax.xml.transform.TransformerException; 022 023 import org.w3c.dom.Element; 024 import org.w3c.dom.Node; 025 026 import biz.hammurapi.config.ComponentBase; 027 import biz.hammurapi.config.ConfigurationException; 028 import biz.hammurapi.config.Context; 029 import biz.hammurapi.config.DomConfigurable; 030 import biz.hammurapi.util.Worker; 031 import biz.hammurapi.xml.dom.DOMUtils; 032 import biz.hammurapi.xml.dom.DomSerializable; 033 034 035 /** 036 * Posts serializable and dom-serializable jobs to 037 * JMS queue. 038 * @author Pavel Vlasov 039 * @revision $Revision$ 040 */ 041 public class JmsWorker extends ComponentBase implements Worker, DomConfigurable { 042 043 private String queueName; 044 private String connectionFactoryName; 045 private String user; 046 private String password; 047 048 private Queue queue; 049 private QueueConnection connection; 050 051 /** 052 * Jobs can implement this interface to set 053 * JMS message headers 054 * @author Pavel Vlasov 055 * @revision $Revision$ 056 */ 057 public interface Selectable { 058 void setMessageHeaders(Message message); 059 } 060 061 public JmsWorker() { 062 super(); 063 } 064 065 public boolean post(Runnable job) { 066 try { 067 QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 068 try { 069 if (job instanceof JmsSerializable) { 070 sendMessage(session, ((JmsSerializable) job).toMessage(session)); 071 return true; 072 } else if (job instanceof DomSerializable) { 073 TextMessage message = session.createTextMessage(); 074 if (job instanceof Selectable) { 075 ((Selectable) job).setMessageHeaders(message); 076 } 077 message.setStringProperty("type", job.getClass().getName()); 078 message.setText(DOMUtils.toXmlString(job, "job")); 079 080 sendMessage(session, message); 081 return true; 082 } else if (job instanceof Serializable) { 083 ObjectMessage message = session.createObjectMessage(); 084 if (job instanceof Selectable) { 085 ((Selectable) job).setMessageHeaders(message); 086 } 087 message.setStringProperty("type", job.getClass().getName()); 088 message.setObject((Serializable) job); 089 090 sendMessage(session, message); 091 return true; 092 } 093 return false; 094 } finally { 095 session.close(); 096 } 097 } catch (JMSException e) { 098 throw new RuntimeException("Could not post job to queue: "+e, e); 099 } catch (IOException e) { 100 throw new RuntimeException("Could not serialize job to XML: "+e, e); 101 } catch (TransformerException e) { 102 throw new RuntimeException("Could not serialize job to XML: "+e, e); 103 } catch (ParserConfigurationException e) { 104 throw new RuntimeException("Could not serialize job to XML: "+e, e); 105 } catch (FactoryConfigurationError e) { 106 throw new RuntimeException("Could not serialize job to XML: "+e, e); 107 } 108 } 109 110 /** 111 * @param session 112 * @param message 113 * @throws JMSException 114 */ 115 private void sendMessage(QueueSession session, Message message) throws JMSException { 116 QueueSender sender = session.createSender(queue); 117 try { 118 sender.send(message); 119 } finally { 120 sender.close(); 121 } 122 } 123 124 public void start() throws ConfigurationException { 125 try { 126 QueueConnectionFactory connectionFactory = (QueueConnectionFactory) get(connectionFactoryName); 127 queue = (Queue) get(queueName); 128 129 if (user == null) { 130 connection = connectionFactory.createQueueConnection(); 131 } else { 132 connection = connectionFactory.createQueueConnection(user, password); 133 } 134 } catch (JMSException e) { 135 throw new ConfigurationException(e); 136 } 137 } 138 139 public void stop() throws ConfigurationException { 140 if (connection!=null) { 141 try { 142 connection.close(); 143 } catch (JMSException e) { 144 throw new ConfigurationException(e); 145 } 146 } 147 } 148 149 public void configure(Node configNode, Context context) throws ConfigurationException { 150 Element ce = (Element) configNode; 151 queueName=ce.getAttribute("queue"); 152 connectionFactoryName=ce.getAttribute("connection-factory"); 153 if (ce.hasAttribute("user")) { 154 user=ce.getAttribute("user"); 155 password=ce.getAttribute("password"); 156 } 157 } 158 }