001    package biz.hammurapi.jms.adapter.jca;
002    
003    import java.net.URL;
004    import java.util.ArrayList;
005    import java.util.HashMap;
006    import java.util.HashSet;
007    import java.util.List;
008    import java.util.Map;
009    import java.util.Set;
010    import java.util.logging.Level;
011    import java.util.logging.Logger;
012    
013    import javax.resource.ResourceException;
014    import javax.resource.spi.ActivationSpec;
015    import javax.resource.spi.BootstrapContext;
016    import javax.resource.spi.ResourceAdapter;
017    import javax.resource.spi.ResourceAdapterInternalException;
018    import javax.resource.spi.endpoint.MessageEndpointFactory;
019    import javax.resource.spi.work.Work;
020    import javax.resource.spi.work.WorkEvent;
021    import javax.resource.spi.work.WorkException;
022    import javax.resource.spi.work.WorkListener;
023    import javax.resource.spi.work.WorkManager;
024    import javax.transaction.xa.XAResource;
025    
026    import org.apache.xmlbeans.XmlObject;
027    
028    import biz.hammurapi.config.ConfigurationException;
029    import biz.hammurapi.jms.adapter.JmsAdapter;
030    import biz.hammurapi.jms.adapter.definition.JmsAdapterDocument;
031    import biz.hammurapi.util.Worker;
032    
033    public class ResourceAdapterImpl implements ResourceAdapter {
034            
035            private Logger logger = Logger.getLogger(ResourceAdapterImpl.class.getName());
036    
037            public SharedAdapter getSharedAdapter() {
038                    // TODO Add reference counting when auto-reload of configuration is implemented.
039                    return new SharedAdapter() {
040    
041                            public void close() {
042                                    // TODO Decrement counter and stop adapter if it is scheduled to stop.
043                                    
044                            }
045    
046                            public Object get(String name) {                                
047                                    return adapter.get(name);
048                            }
049                            
050                    };
051            }
052            
053            private Map activationMap = new HashMap();
054            private WorkManager workManager;
055            
056            private static Object activationKey(MessageEndpointFactory factory, ActivationSpec spec) {
057                    ArrayList ret = new ArrayList();
058                    ret.add(factory);
059                    ret.add(spec);
060                    return ret;
061            }
062            
063            static MessageEndpointFactory factoryFromKey(Object key) {
064                    return (MessageEndpointFactory) ((List) key).get(0);
065            }
066    
067            public void endpointActivation(MessageEndpointFactory factory, ActivationSpec spec) throws ResourceException {
068                    if (!(spec instanceof ActivationSpecImpl)) {
069                            throw new ResourceException("Invalid activation specification: "+spec);
070                    }               
071                    
072                    if (adapter==null) {
073                            throw new ResourceException("Adapter is not properly initialized");
074                    }
075                    
076                    String listenerName = ((ActivationSpecImpl) spec).getListenerName();
077                    synchronized (activationMap) {
078                            Set endpointSet = (Set) activationMap.get(listenerName);
079                            if (endpointSet==null) {                                
080                                    Object processor = adapter.get(listenerName);
081                                    if (processor == null) {
082                                            throw new ResourceException("Listener not found: "+listenerName);
083                                    }
084                                    
085                                    if (!(processor instanceof ActivationProcessor)) {
086                                            throw new ResourceException("Unexpected listener processor type: "+processor.getClass());
087                                    }
088                                    
089                                    endpointSet = new HashSet();                            
090                                    activationMap.put(listenerName, endpointSet);
091                                    ((ActivationProcessor) processor).configure(endpointSet, workManager);
092                            }
093                            
094                            synchronized (endpointSet) {
095                                    endpointSet.add(activationKey(factory, spec));
096                            }
097                    }
098                                    
099            }
100    
101            /**
102             * Removes endpoint entry from activation maps.
103             */
104            public void endpointDeactivation(MessageEndpointFactory factory, ActivationSpec spec) {
105                    if (spec instanceof ActivationSpecImpl) {
106                            synchronized (activationMap) {
107                                    Set endpointSet = (Set) activationMap.get(((ActivationSpecImpl) spec).getListenerName());
108                                    if (endpointSet!=null) {
109                                            synchronized (endpointSet) {
110                                                    endpointSet.remove(activationKey(factory, spec));
111                                            }                                       
112                                    }
113                            }
114                    }
115            }
116    
117            public XAResource[] getXAResources(ActivationSpec[] speca) throws ResourceException {
118                    // Not implemented - transactions are not supported
119                    return null;
120            }
121            
122            private String configUrl;
123            private JmsAdapter adapter;
124            
125            /**
126             * URL of configuration. If it starts with <code>resource:</code> then it is read from classloader resource.
127             * @param configUrl
128             */
129            public void setConfigUrl(String configUrl) {
130                    this.configUrl = configUrl;
131            }
132            
133            // TODO - reload of configuration if changed - reload interval.
134            
135            private static final String RESOURCE_PREFIX = "resource:";
136    
137            public void start(BootstrapContext context) throws ResourceAdapterInternalException {
138            if (configUrl==null) {
139                    throw new ResourceAdapterInternalException("Configuration location is not set");
140            }
141            
142            try {
143                    URL confURL = configUrl.startsWith(RESOURCE_PREFIX) ? getClass().getClassLoader().getResource(configUrl.substring(RESOURCE_PREFIX.length())) : new URL(configUrl);
144                            XmlObject document = XmlObject.Factory.parse(confURL);
145                    if (!(document instanceof JmsAdapterDocument)) {
146                            throw new ResourceAdapterInternalException("Configuration document is not of expected type");
147                    }
148                    
149                    biz.hammurapi.jms.adapter.definition.JmsAdapter definition = ((JmsAdapterDocument) document).getJmsAdapter();
150                    adapter = new JmsAdapter(definition);
151                    adapter.start();
152                        
153                            workManager = context.getWorkManager();
154                            
155                            if (workManager!=null) {
156                                    final WorkListener workListener = new WorkListener() {
157                                            
158                                            public void workAccepted(WorkEvent workEvent) {
159                                                    logger.fine("Work accepted: "+workEvent.getWork());
160                                            }
161            
162                                            public void workCompleted(WorkEvent workEvent) {
163                                                    logger.fine("Work completed: "+workEvent.getWork());
164                                            }
165            
166                                            public void workRejected(WorkEvent workEvent) {
167                                                    logger.log(Level.SEVERE, "Work rejected: "+workEvent.getWork(), workEvent.getException());
168                                            }
169            
170                                            public void workStarted(WorkEvent workEvent) {
171                                                    logger.fine("Work started: "+workEvent.getWork());
172                                            }
173                                            
174                                    };
175                                    
176                                    Worker raWorker = new Worker() {
177            
178                                            public boolean post(final Runnable job) {
179                                                    
180                                                    Work work = new Work() {
181                                                            
182                                                            public void run() {
183                                                                    job.run();
184                                                            }
185            
186                                                            public void release() {
187                                                                    // Not supported                                                        
188                                                            }
189                                                            
190                                                            public String toString() {
191                                                                    return "Work for "+job;
192                                                            }
193                                                    };
194                                                                                            
195                                                    try {
196                                                            workManager.scheduleWork(work, WorkManager.INDEFINITE, null, workListener);
197                                                    } catch (WorkException e) {
198                                                            logger.log(Level.SEVERE, "Failed to schedule work", e);
199                                                            return false;
200                                                    }
201                                                    return true;
202                                            }
203                                            
204                                    };
205                                    
206                                    adapter.setDefaultWorker(raWorker);
207                            }
208            } catch (Exception e) {
209                    throw new ResourceAdapterInternalException("Failed to start adapter: "+e, e);
210            }
211            }
212    
213            public void stop() {
214                    if (adapter!=null) {
215                            try {
216                                    adapter.stop();
217                            } catch (ConfigurationException e) {
218                                    logger.log(Level.SEVERE, "Failed to properly stop adapter: "+e, e);
219                            }
220                    }
221                    
222            }
223            
224    }