001    package biz.hammurapi.jms.adapter.jca;
002    
003    import java.util.ArrayList;
004    import java.util.Collection;
005    import java.util.Iterator;
006    import java.util.Set;
007    import java.util.logging.Level;
008    import java.util.logging.Logger;
009    
010    import javax.jms.Message;
011    import javax.jms.MessageListener;
012    import javax.resource.spi.endpoint.MessageEndpoint;
013    import javax.resource.spi.endpoint.MessageEndpointFactory;
014    import javax.resource.spi.work.Work;
015    import javax.resource.spi.work.WorkManager;
016    
017    import biz.hammurapi.jms.adapter.Processor;
018    import biz.hammurapi.remoting.Invocation;
019    
020    /**
021     * Passes objects to endpoints to be processed in worker threads.
022     * If endpoint is instance of MessageListener and object is instance of message then
023     * message is passed to the listener.
024     * If endpoint is instance of Processor then object is passed to the processor.
025     * Otherwise, if object is instance of Invocation then the invocation is executed
026     * against the endpoint. 
027     * If none of this is true then message is discarded and a warning is logged.
028     * In the case of invocation, if the target method returns value and there are multiple endpoints
029     * listening then there are two options - a) return value from one of registered endpoints is returned (default) 
030     * b) all values are assembled into a collection and returned. This behavior is controlled by "collectAllResults" parameter.
031     * The same is true if endpoints provide processor instances. Nulls are not collected and not sent back. Empty results collection
032     * is not sent back either. 
033     * @author Pavel
034     *
035     */
036    public class ActivationProcessor implements Processor {
037            
038            private static final Logger logger = Logger.getLogger(ActivationProcessor.class.getName());
039            
040            private WorkManager workManager;
041            private Set activations;
042            
043            public void configure(Set activations, WorkManager workManager) {
044                    this.workManager = workManager;
045                    this.activations = activations;
046            }
047            
048            private boolean collectAllResults;
049    
050            private long timeout = 30000;
051            
052            /**
053             * Timeout for collection of results from all work items.
054             * @param timeout
055             */
056            public void setTimeout(long timeout) {
057                    this.timeout = timeout;
058            }
059            
060            public long getTimeout() {
061                    return timeout;
062            }
063            
064            /**
065             * If set to true then value returned from process is a collection of all 
066             * return
067             * @param collectAllReturns
068             */
069            public void setCollectAllResults(boolean collectAllReturns) {
070                    this.collectAllResults = collectAllReturns;
071            }
072            
073            public boolean isCollectAllResults() {
074                    return collectAllResults;
075            }
076            
077            /**
078             * Collects non-null returns from registered endpoints and returns them as collection
079             * Returns null if collection is empty.
080             */
081            public Object process(final Object obj) throws Exception {
082                    Collection activationsCopy;
083                    synchronized (activations) {
084                            activationsCopy = new ArrayList(activations);
085                    }
086                    
087                    final Collection allResults = collectAllResults ? new ArrayList() : null;
088                    final int[] workCounter = {0};
089                    Iterator it = activationsCopy.iterator();
090                    while (it.hasNext()) {
091                            Object key = it.next();
092                            final MessageEndpointFactory factory = ResourceAdapterImpl.factoryFromKey(key);                 
093                            
094                            if (it.hasNext()) {
095                                    synchronized (workCounter) {
096                                            ++workCounter[0];
097                                    }
098                                    
099                                    workManager.scheduleWork(new Work() {
100    
101                                            public void release() {
102                                                    // Not implemented                                                      
103                                            }
104    
105                                            public void run() {
106                                                    try {
107                                                            Object result = processInternal(factory, obj); // By the spec object shall be cloned for each endpoint. we assume the object immutable.
108                                                            if (collectAllResults && result!=null) {
109                                                                    allResults.add(result);
110                                                            }
111                                                    } catch (Exception e) {
112                                                            logger.log(Level.SEVERE, "Exception processing "+obj, e);
113                                                    } finally {
114                                                            synchronized (workCounter) {
115                                                                    if (--workCounter[0]==0) {
116                                                                            workCounter.notifyAll();
117                                                                    }
118                                                            }
119                                                    }
120                                            }
121                                            
122                                    });
123                            } else {
124                                    Object result = processInternal(factory, obj); // By the spec object shall be cloned for each endpoint. we assume the object immutable.
125                                    if (collectAllResults && result!=null) {
126                                            allResults.add(result);
127                                    } else {                                
128                                            return processInternal(factory, obj);
129                                    }
130                            }
131                    }
132                    
133                    synchronized (workCounter) {
134                            while (workCounter[0]>0) {
135                                    workCounter.wait(timeout );
136                            }                       
137                    }
138                    
139                    return allResults==null || allResults.isEmpty() ? null : allResults;
140            }
141            
142            private Object processInternal(MessageEndpointFactory factory, Object obj) throws Exception {
143                    MessageEndpoint endpoint = factory.createEndpoint(null);
144                    try {
145                            if (endpoint instanceof MessageListener && obj instanceof Message) {
146                                    ((MessageListener) endpoint).onMessage((Message) obj);
147                                    return null;
148                            }
149                            
150                            if (obj instanceof Invocation) {
151                                    return ((Invocation) obj).invoke(endpoint);
152                            }
153                            
154                            if (endpoint instanceof Processor) {
155                                    return ((Processor) endpoint).process(obj);
156                            }
157                            
158                            logger.warning("Don't know how to process "+obj.getClass().getName()+" with "+endpoint.getClass().getName());
159                            return null;
160                    } finally {
161                            endpoint.release();
162                    }               
163            }
164    
165    }