001    /*
002    @license.text@ 
003     */
004    
005    package biz.hammurapi.util;
006    
007    import java.io.IOException;
008    import java.io.InputStream;
009    import java.io.OutputStream;
010    import java.util.ArrayList;
011    import java.util.List;
012    
013    /**
014     * 
015     * Copies all data from an input stream to an output stream.
016     * @author Pavel Vlasov
017     * @version $Revision: 1.3 $
018     */
019    public class StreamPumper implements Runnable {
020    
021        private final static int SIZE = 1024;
022        private InputStream is;
023        private OutputStream os;
024        
025            private ExceptionSink sink;
026        
027        private boolean closeStreams=false;
028        
029        /**
030         * Create a new stream pumper.
031         *
032         * @param is input stream to read data from
033         * @param os output stream to write data to.
034         */
035        public StreamPumper(InputStream is, OutputStream os, ExceptionSink sink, boolean closeStreams) {
036            this.is = is;
037            this.os = os;
038            this.closeStreams=closeStreams;
039            this.sink=sink;
040        }
041    
042        private List listeners=new ArrayList();
043            
044        public void addListener(StreamPumpListener listener, int tickSize) {
045            synchronized (listeners) {
046                listeners.add(new StreamPumpListenerEntry(listener, tickSize));
047            }
048        }
049        
050        public void removeListener(StreamPumpListener listener) {
051            synchronized (listeners) {
052                listeners.remove(listener);
053            }
054        }
055    
056        /**
057         * Copies data from the input stream to the output stream.
058         * Creates a copy of listeners collection before pumping. 
059         * addListener() and removeListener() have no effect once pumping has started. 
060         * Terminates as soon as the input stream is closed or an error occurs.
061         */
062        public void run() {
063            StreamPumpListenerEntry[] listenersArray;
064            synchronized (listeners) {
065                listenersArray= (StreamPumpListenerEntry[]) listeners.toArray(new StreamPumpListenerEntry[listeners.size()]);
066            }
067            
068            for (int i=0; i<listenersArray.length; i++) {
069                    listenersArray[i].listener.pumpStarted(this);
070            }
071            
072            long counter=0;
073            try {
074                final byte[] buf = new byte[SIZE];
075                int length;
076                while ((length = is.read(buf)) != -1) {
077                    os.write(buf, 0, length);
078                    counter+=length;
079                    for (int i=0; i<listenersArray.length; i++) {
080                        listenersArray[i].counter+=length;
081                        if (listenersArray[i].counter>=listenersArray[i].tickSize) {
082                            listenersArray[i].counter=0;
083                            listenersArray[i].listener.tick(this, counter);
084                        }
085                    }                
086                }
087            } catch(IOException e) {
088                handleException(e, listenersArray);
089            } finally {
090                for (int i=0; i<listenersArray.length; i++) {
091                    listenersArray[i].listener.pumpFinished(this);
092                }
093                if (closeStreams) {
094                    try {
095                        is.close();
096                    } catch (IOException ie) {
097                            handleException(ie, listenersArray);
098                    }
099                    
100                    try {
101                        os.close();
102                    } catch (IOException ie) {
103                            handleException(ie, listenersArray);
104                    }
105                }
106            }            
107        }
108    
109            /**
110             * @param e
111             * @param listenersArray
112             */
113            private void handleException(Exception e, StreamPumpListenerEntry[] listenersArray) {
114            for (int i=0; i<listenersArray.length; i++) {
115                    listenersArray[i].listener.pumpingError(this, e);
116            }
117            
118                    if (sink==null) {
119                            e.printStackTrace();
120                    } else {
121                            sink.consume(this, e);
122                    }
123            }
124    }