/* * InputStreamMeter.java * * Created on October 21, 2005, 5:35 PM * * To change this template, choose Tools | Options and locate the template under * the Source Creation and Management node. Right-click the template and choose * Open. You can then make changes to the template in the Source Editor. */ package org.das2.client; import org.das2.datum.Datum; import org.das2.datum.Units; import org.das2.system.DasLogger; import java.io.IOException; import java.io.InputStream; import java.util.logging.Level; /** * * @author Jeremy */ public class InputStreamMeter { long totalBytesRead; long millisElapsed; double speedLimit; long meterCount; // only clock while there are > 0 inputStreams out there long startTime; /** Creates a new instance of InputStreamMeter */ public InputStreamMeter() { this.speedLimit= 0; this.totalBytesRead= 0; this.millisElapsed= 0; this.startTime= -1; } /** MeteredInputStream is used to monitor the read. It is required to notify * the InputStreamMeter of the number of bytesRead, and allow the InputStreamMeter * to govern download speed by inserting blocking sleeps. * * @author jbf */ private static class MeteredInputStream extends InputStream { InputStream in; InputStreamMeter meter; /** Creates a new instance of MeteredInputStream */ private MeteredInputStream( InputStream in, InputStreamMeter meter ) { this.meter= meter; this.in= in; } @Override public int read( byte[] b, int off, int len ) throws IOException { try { int bytesRead= in.read(b,off,len); meter.addBytes(bytesRead,this); meter.governSpeed(this); return bytesRead; } catch ( IOException e ) { meter.exception(this); throw e; } } public int read() throws IOException { try { int byteRead= in.read(); meter.addBytes(1,this); meter.governSpeed(this); return byteRead; } catch ( IOException e ) { meter.exception(this); throw e; } } @Override public void close() throws IOException { meter.closing(this); in.close(); } } public InputStream meterInputStream( InputStream in ) { meterCount++; if ( meterCount==1 ) { startTime= System.currentTimeMillis(); totalBytesRead= 0; millisElapsed= 0; } return new MeteredInputStream( in, this ); } /* limit total speed, possibly balancing load btw streams */ private void governSpeed( MeteredInputStream mis ) { if ( speedLimit>0 ) { if ( calcTransmitSpeed() > speedLimit ) { long targetMillis= (long) ( ( totalBytesRead ) / ( speedLimit / 1000. ) ); long waitMs= Math.min( 1000, targetMillis - calcMillisElapsed() ); DasLogger.getLogger(DasLogger.DATA_TRANSFER_LOG).log(Level.FINE, "limiting speed by waiting {0} ms", waitMs); try { Thread.sleep(waitMs); } catch ( InterruptedException ex ) { } } } } /* add these bytes to the meter, on behalf of mis. */ private void addBytes( long bytes, MeteredInputStream mis ) { totalBytesRead+= bytes; } private void closing( MeteredInputStream mis ) { meterCount--; if ( meterCount==0 ) { this.millisElapsed+= System.currentTimeMillis() - startTime; this.startTime=-1; } } private void exception( MeteredInputStream mis ) { meterCount--; if ( meterCount==0 ) { this.millisElapsed+= System.currentTimeMillis() - startTime; this.startTime=-1; } } private long calcMillisElapsed() { long millis= this.millisElapsed; if ( startTime!=-1 ) { millis+= System.currentTimeMillis() - startTime; } return millis; } private double calcTransmitSpeed() { long millis= calcMillisElapsed(); if ( millis==0 ) { return Units.bytesPerSecond.getFillDouble(); } else { return 1000. * totalBytesRead / millis; } } public Datum getTransmitSpeed() { return Units.bytesPerSecond.createDatum( calcTransmitSpeed(), 10 ); } public long getBytesTransmitted() { return totalBytesRead; } public Datum getSpeedLimit() { return Units.bytesPerSecond.createDatum( this.speedLimit ); } public void setSpeedLimit( Datum speedLimit) { this.speedLimit = speedLimit.doubleValue( Units.bytesPerSecond ); } }