001 /* 002 @license.text@ 003 */ 004 005 package biz.hammurapi.util; 006 007 import java.io.IOException; 008 import java.io.InputStream; 009 import java.io.OutputStream; 010 import java.util.ArrayList; 011 import java.util.List; 012 013 /** 014 * 015 * Copies all data from an input stream to an output stream. 016 * @author Pavel Vlasov 017 * @version $Revision: 1.3 $ 018 */ 019 public class StreamPumper implements Runnable { 020 021 private final static int SIZE = 1024; 022 private InputStream is; 023 private OutputStream os; 024 025 private ExceptionSink sink; 026 027 private boolean closeStreams=false; 028 029 /** 030 * Create a new stream pumper. 031 * 032 * @param is input stream to read data from 033 * @param os output stream to write data to. 034 */ 035 public StreamPumper(InputStream is, OutputStream os, ExceptionSink sink, boolean closeStreams) { 036 this.is = is; 037 this.os = os; 038 this.closeStreams=closeStreams; 039 this.sink=sink; 040 } 041 042 private List listeners=new ArrayList(); 043 044 public void addListener(StreamPumpListener listener, int tickSize) { 045 synchronized (listeners) { 046 listeners.add(new StreamPumpListenerEntry(listener, tickSize)); 047 } 048 } 049 050 public void removeListener(StreamPumpListener listener) { 051 synchronized (listeners) { 052 listeners.remove(listener); 053 } 054 } 055 056 /** 057 * Copies data from the input stream to the output stream. 058 * Creates a copy of listeners collection before pumping. 059 * addListener() and removeListener() have no effect once pumping has started. 060 * Terminates as soon as the input stream is closed or an error occurs. 061 */ 062 public void run() { 063 StreamPumpListenerEntry[] listenersArray; 064 synchronized (listeners) { 065 listenersArray= (StreamPumpListenerEntry[]) listeners.toArray(new StreamPumpListenerEntry[listeners.size()]); 066 } 067 068 for (int i=0; i<listenersArray.length; i++) { 069 listenersArray[i].listener.pumpStarted(this); 070 } 071 072 long counter=0; 073 try { 074 final byte[] buf = new byte[SIZE]; 075 int length; 076 while ((length = is.read(buf)) != -1) { 077 os.write(buf, 0, length); 078 counter+=length; 079 for (int i=0; i<listenersArray.length; i++) { 080 listenersArray[i].counter+=length; 081 if (listenersArray[i].counter>=listenersArray[i].tickSize) { 082 listenersArray[i].counter=0; 083 listenersArray[i].listener.tick(this, counter); 084 } 085 } 086 } 087 } catch(IOException e) { 088 handleException(e, listenersArray); 089 } finally { 090 for (int i=0; i<listenersArray.length; i++) { 091 listenersArray[i].listener.pumpFinished(this); 092 } 093 if (closeStreams) { 094 try { 095 is.close(); 096 } catch (IOException ie) { 097 handleException(ie, listenersArray); 098 } 099 100 try { 101 os.close(); 102 } catch (IOException ie) { 103 handleException(ie, listenersArray); 104 } 105 } 106 } 107 } 108 109 /** 110 * @param e 111 * @param listenersArray 112 */ 113 private void handleException(Exception e, StreamPumpListenerEntry[] listenersArray) { 114 for (int i=0; i<listenersArray.length; i++) { 115 listenersArray[i].listener.pumpingError(this, e); 116 } 117 118 if (sink==null) { 119 e.printStackTrace(); 120 } else { 121 sink.consume(this, e); 122 } 123 } 124 }