001 package biz.hammurapi.jms.adapter.jca; 002 003 import java.net.URL; 004 import java.util.ArrayList; 005 import java.util.HashMap; 006 import java.util.HashSet; 007 import java.util.List; 008 import java.util.Map; 009 import java.util.Set; 010 import java.util.logging.Level; 011 import java.util.logging.Logger; 012 013 import javax.resource.ResourceException; 014 import javax.resource.spi.ActivationSpec; 015 import javax.resource.spi.BootstrapContext; 016 import javax.resource.spi.ResourceAdapter; 017 import javax.resource.spi.ResourceAdapterInternalException; 018 import javax.resource.spi.endpoint.MessageEndpointFactory; 019 import javax.resource.spi.work.Work; 020 import javax.resource.spi.work.WorkEvent; 021 import javax.resource.spi.work.WorkException; 022 import javax.resource.spi.work.WorkListener; 023 import javax.resource.spi.work.WorkManager; 024 import javax.transaction.xa.XAResource; 025 026 import org.apache.xmlbeans.XmlObject; 027 028 import biz.hammurapi.config.ConfigurationException; 029 import biz.hammurapi.jms.adapter.JmsAdapter; 030 import biz.hammurapi.jms.adapter.definition.JmsAdapterDocument; 031 import biz.hammurapi.util.Worker; 032 033 public class ResourceAdapterImpl implements ResourceAdapter { 034 035 private Logger logger = Logger.getLogger(ResourceAdapterImpl.class.getName()); 036 037 public SharedAdapter getSharedAdapter() { 038 // TODO Add reference counting when auto-reload of configuration is implemented. 039 return new SharedAdapter() { 040 041 public void close() { 042 // TODO Decrement counter and stop adapter if it is scheduled to stop. 043 044 } 045 046 public Object get(String name) { 047 return adapter.get(name); 048 } 049 050 }; 051 } 052 053 private Map activationMap = new HashMap(); 054 private WorkManager workManager; 055 056 private static Object activationKey(MessageEndpointFactory factory, ActivationSpec spec) { 057 ArrayList ret = new ArrayList(); 058 ret.add(factory); 059 ret.add(spec); 060 return ret; 061 } 062 063 static MessageEndpointFactory factoryFromKey(Object key) { 064 return (MessageEndpointFactory) ((List) key).get(0); 065 } 066 067 public void endpointActivation(MessageEndpointFactory factory, ActivationSpec spec) throws ResourceException { 068 if (!(spec instanceof ActivationSpecImpl)) { 069 throw new ResourceException("Invalid activation specification: "+spec); 070 } 071 072 if (adapter==null) { 073 throw new ResourceException("Adapter is not properly initialized"); 074 } 075 076 String listenerName = ((ActivationSpecImpl) spec).getListenerName(); 077 synchronized (activationMap) { 078 Set endpointSet = (Set) activationMap.get(listenerName); 079 if (endpointSet==null) { 080 Object processor = adapter.get(listenerName); 081 if (processor == null) { 082 throw new ResourceException("Listener not found: "+listenerName); 083 } 084 085 if (!(processor instanceof ActivationProcessor)) { 086 throw new ResourceException("Unexpected listener processor type: "+processor.getClass()); 087 } 088 089 endpointSet = new HashSet(); 090 activationMap.put(listenerName, endpointSet); 091 ((ActivationProcessor) processor).configure(endpointSet, workManager); 092 } 093 094 synchronized (endpointSet) { 095 endpointSet.add(activationKey(factory, spec)); 096 } 097 } 098 099 } 100 101 /** 102 * Removes endpoint entry from activation maps. 103 */ 104 public void endpointDeactivation(MessageEndpointFactory factory, ActivationSpec spec) { 105 if (spec instanceof ActivationSpecImpl) { 106 synchronized (activationMap) { 107 Set endpointSet = (Set) activationMap.get(((ActivationSpecImpl) spec).getListenerName()); 108 if (endpointSet!=null) { 109 synchronized (endpointSet) { 110 endpointSet.remove(activationKey(factory, spec)); 111 } 112 } 113 } 114 } 115 } 116 117 public XAResource[] getXAResources(ActivationSpec[] speca) throws ResourceException { 118 // Not implemented - transactions are not supported 119 return null; 120 } 121 122 private String configUrl; 123 private JmsAdapter adapter; 124 125 /** 126 * URL of configuration. If it starts with <code>resource:</code> then it is read from classloader resource. 127 * @param configUrl 128 */ 129 public void setConfigUrl(String configUrl) { 130 this.configUrl = configUrl; 131 } 132 133 // TODO - reload of configuration if changed - reload interval. 134 135 private static final String RESOURCE_PREFIX = "resource:"; 136 137 public void start(BootstrapContext context) throws ResourceAdapterInternalException { 138 if (configUrl==null) { 139 throw new ResourceAdapterInternalException("Configuration location is not set"); 140 } 141 142 try { 143 URL confURL = configUrl.startsWith(RESOURCE_PREFIX) ? getClass().getClassLoader().getResource(configUrl.substring(RESOURCE_PREFIX.length())) : new URL(configUrl); 144 XmlObject document = XmlObject.Factory.parse(confURL); 145 if (!(document instanceof JmsAdapterDocument)) { 146 throw new ResourceAdapterInternalException("Configuration document is not of expected type"); 147 } 148 149 biz.hammurapi.jms.adapter.definition.JmsAdapter definition = ((JmsAdapterDocument) document).getJmsAdapter(); 150 adapter = new JmsAdapter(definition); 151 adapter.start(); 152 153 workManager = context.getWorkManager(); 154 155 if (workManager!=null) { 156 final WorkListener workListener = new WorkListener() { 157 158 public void workAccepted(WorkEvent workEvent) { 159 logger.fine("Work accepted: "+workEvent.getWork()); 160 } 161 162 public void workCompleted(WorkEvent workEvent) { 163 logger.fine("Work completed: "+workEvent.getWork()); 164 } 165 166 public void workRejected(WorkEvent workEvent) { 167 logger.log(Level.SEVERE, "Work rejected: "+workEvent.getWork(), workEvent.getException()); 168 } 169 170 public void workStarted(WorkEvent workEvent) { 171 logger.fine("Work started: "+workEvent.getWork()); 172 } 173 174 }; 175 176 Worker raWorker = new Worker() { 177 178 public boolean post(final Runnable job) { 179 180 Work work = new Work() { 181 182 public void run() { 183 job.run(); 184 } 185 186 public void release() { 187 // Not supported 188 } 189 190 public String toString() { 191 return "Work for "+job; 192 } 193 }; 194 195 try { 196 workManager.scheduleWork(work, WorkManager.INDEFINITE, null, workListener); 197 } catch (WorkException e) { 198 logger.log(Level.SEVERE, "Failed to schedule work", e); 199 return false; 200 } 201 return true; 202 } 203 204 }; 205 206 adapter.setDefaultWorker(raWorker); 207 } 208 } catch (Exception e) { 209 throw new ResourceAdapterInternalException("Failed to start adapter: "+e, e); 210 } 211 } 212 213 public void stop() { 214 if (adapter!=null) { 215 try { 216 adapter.stop(); 217 } catch (ConfigurationException e) { 218 logger.log(Level.SEVERE, "Failed to properly stop adapter: "+e, e); 219 } 220 } 221 222 } 223 224 }