001 package biz.hammurapi.jms.adapter.jca; 002 003 import java.util.ArrayList; 004 import java.util.Collection; 005 import java.util.Iterator; 006 import java.util.Set; 007 import java.util.logging.Level; 008 import java.util.logging.Logger; 009 010 import javax.jms.Message; 011 import javax.jms.MessageListener; 012 import javax.resource.spi.endpoint.MessageEndpoint; 013 import javax.resource.spi.endpoint.MessageEndpointFactory; 014 import javax.resource.spi.work.Work; 015 import javax.resource.spi.work.WorkManager; 016 017 import biz.hammurapi.jms.adapter.Processor; 018 import biz.hammurapi.remoting.Invocation; 019 020 /** 021 * Passes objects to endpoints to be processed in worker threads. 022 * If endpoint is instance of MessageListener and object is instance of message then 023 * message is passed to the listener. 024 * If endpoint is instance of Processor then object is passed to the processor. 025 * Otherwise, if object is instance of Invocation then the invocation is executed 026 * against the endpoint. 027 * If none of this is true then message is discarded and a warning is logged. 028 * In the case of invocation, if the target method returns value and there are multiple endpoints 029 * listening then there are two options - a) return value from one of registered endpoints is returned (default) 030 * b) all values are assembled into a collection and returned. This behavior is controlled by "collectAllResults" parameter. 031 * The same is true if endpoints provide processor instances. Nulls are not collected and not sent back. Empty results collection 032 * is not sent back either. 033 * @author Pavel 034 * 035 */ 036 public class ActivationProcessor implements Processor { 037 038 private static final Logger logger = Logger.getLogger(ActivationProcessor.class.getName()); 039 040 private WorkManager workManager; 041 private Set activations; 042 043 public void configure(Set activations, WorkManager workManager) { 044 this.workManager = workManager; 045 this.activations = activations; 046 } 047 048 private boolean collectAllResults; 049 050 private long timeout = 30000; 051 052 /** 053 * Timeout for collection of results from all work items. 054 * @param timeout 055 */ 056 public void setTimeout(long timeout) { 057 this.timeout = timeout; 058 } 059 060 public long getTimeout() { 061 return timeout; 062 } 063 064 /** 065 * If set to true then value returned from process is a collection of all 066 * return 067 * @param collectAllReturns 068 */ 069 public void setCollectAllResults(boolean collectAllReturns) { 070 this.collectAllResults = collectAllReturns; 071 } 072 073 public boolean isCollectAllResults() { 074 return collectAllResults; 075 } 076 077 /** 078 * Collects non-null returns from registered endpoints and returns them as collection 079 * Returns null if collection is empty. 080 */ 081 public Object process(final Object obj) throws Exception { 082 Collection activationsCopy; 083 synchronized (activations) { 084 activationsCopy = new ArrayList(activations); 085 } 086 087 final Collection allResults = collectAllResults ? new ArrayList() : null; 088 final int[] workCounter = {0}; 089 Iterator it = activationsCopy.iterator(); 090 while (it.hasNext()) { 091 Object key = it.next(); 092 final MessageEndpointFactory factory = ResourceAdapterImpl.factoryFromKey(key); 093 094 if (it.hasNext()) { 095 synchronized (workCounter) { 096 ++workCounter[0]; 097 } 098 099 workManager.scheduleWork(new Work() { 100 101 public void release() { 102 // Not implemented 103 } 104 105 public void run() { 106 try { 107 Object result = processInternal(factory, obj); // By the spec object shall be cloned for each endpoint. we assume the object immutable. 108 if (collectAllResults && result!=null) { 109 allResults.add(result); 110 } 111 } catch (Exception e) { 112 logger.log(Level.SEVERE, "Exception processing "+obj, e); 113 } finally { 114 synchronized (workCounter) { 115 if (--workCounter[0]==0) { 116 workCounter.notifyAll(); 117 } 118 } 119 } 120 } 121 122 }); 123 } else { 124 Object result = processInternal(factory, obj); // By the spec object shall be cloned for each endpoint. we assume the object immutable. 125 if (collectAllResults && result!=null) { 126 allResults.add(result); 127 } else { 128 return processInternal(factory, obj); 129 } 130 } 131 } 132 133 synchronized (workCounter) { 134 while (workCounter[0]>0) { 135 workCounter.wait(timeout ); 136 } 137 } 138 139 return allResults==null || allResults.isEmpty() ? null : allResults; 140 } 141 142 private Object processInternal(MessageEndpointFactory factory, Object obj) throws Exception { 143 MessageEndpoint endpoint = factory.createEndpoint(null); 144 try { 145 if (endpoint instanceof MessageListener && obj instanceof Message) { 146 ((MessageListener) endpoint).onMessage((Message) obj); 147 return null; 148 } 149 150 if (obj instanceof Invocation) { 151 return ((Invocation) obj).invoke(endpoint); 152 } 153 154 if (endpoint instanceof Processor) { 155 return ((Processor) endpoint).process(obj); 156 } 157 158 logger.warning("Don't know how to process "+obj.getClass().getName()+" with "+endpoint.getClass().getName()); 159 return null; 160 } finally { 161 endpoint.release(); 162 } 163 } 164 165 }