Guest User

ThrottledInputStream.java

a guest
May 26th, 2014
59
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.56 KB | None | 0 0
  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work
  3.  * for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License");
  4.  * you may not use this file except in compliance with the License. You may obtain a copy of the License at
  5.  *
  6.  * http://www.apache.org/licenses/LICENSE-2.0
  7.  *
  8.  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  9.  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations
  10.  * under the License.
  11.  */
  12.  
  13. package utils;
  14.  
  15. import java.io.IOException;
  16. import java.io.InputStream;
  17.  
  18. /**
  19.  * The ThrottleInputStream provides bandwidth throttling on a specified InputStream. It is implemented as a wrapper on top of another InputStream
  20.  * instance. The throttling works by examining the number of bytes read from the underlying InputStream from the beginning, and sleep()ing for a time
  21.  * interval if the byte-transfer is found exceed the specified tolerable maximum. (Thus, while the read-rate might exceed the maximum for a given
  22.  * short interval, the average tends towards the specified maximum, overall.)
  23.  */
  24. public class ThrottledInputStream extends InputStream {
  25.  
  26.     private final InputStream rawStream;
  27.     private final long maxBytesPerSec;
  28.     private final long startTime = System.currentTimeMillis();
  29.  
  30.     private long bytesRead = 0;
  31.     private long totalSleepTime = 0;
  32.  
  33.     private static final long SLEEP_DURATION_MS = 50;
  34.  
  35.     public ThrottledInputStream(InputStream rawStream) {
  36.         this(rawStream, Long.MAX_VALUE);
  37.     }
  38.  
  39.     public ThrottledInputStream(InputStream rawStream,
  40.                                 long maxBytesPerSec) {
  41.         assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
  42.         this.rawStream = rawStream;
  43.         this.maxBytesPerSec = maxBytesPerSec;
  44.     }
  45.  
  46.     @Override
  47.     public void close() throws IOException {
  48.         rawStream.close();
  49.     }
  50.  
  51.     /** @inheritDoc */
  52.     @Override
  53.     public int read() throws IOException {
  54.         throttle();
  55.         int data = rawStream.read();
  56.         if (data != -1) {
  57.             bytesRead++;
  58.         }
  59.         return data;
  60.     }
  61.  
  62.     /** @inheritDoc */
  63.     @Override
  64.     public int read(byte[] b) throws IOException {
  65.         throttle();
  66.         int readLen = rawStream.read(b);
  67.         if (readLen != -1) {
  68.             bytesRead += readLen;
  69.         }
  70.         return readLen;
  71.     }
  72.  
  73.     /** @inheritDoc */
  74.     @Override
  75.     public int read(byte[] b, int off, int len) throws IOException {
  76.         throttle();
  77.         int readLen = rawStream.read(b, off, len);
  78.         if (readLen != -1) {
  79.             bytesRead += readLen;
  80.         }
  81.         return readLen;
  82.     }
  83.  
  84.     private void throttle() throws IOException {
  85.         if (getBytesPerSec() > maxBytesPerSec) {
  86.             try {
  87.                 Thread.sleep(SLEEP_DURATION_MS);
  88.                 totalSleepTime += SLEEP_DURATION_MS;
  89.             } catch (InterruptedException e) {
  90.                 throw new IOException("Thread aborted", e);
  91.             }
  92.         }
  93.     }
  94.  
  95.     /**
  96.      * Getter for the number of bytes read from this stream, since creation.
  97.      *
  98.      * @return The number of bytes.
  99.      */
  100.     public long getTotalBytesRead() {
  101.         return bytesRead;
  102.     }
  103.  
  104.     /**
  105.      * Getter for the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart.
  106.      *
  107.      * @return Read rate, in bytes/sec.
  108.      */
  109.     public long getBytesPerSec() {
  110.         long elapsed = (System.currentTimeMillis() - startTime) / 1000;
  111.         if (elapsed == 0) {
  112.             return bytesRead;
  113.         } else {
  114.             return bytesRead / elapsed;
  115.         }
  116.     }
  117.  
  118.     /**
  119.      * Getter the total time spent in sleep.
  120.      *
  121.      * @return Number of milliseconds spent in sleep.
  122.      */
  123.     public long getTotalSleepTime() {
  124.         return totalSleepTime;
  125.     }
  126.  
  127.     /** @inheritDoc */
  128.     @Override
  129.     public String toString() {
  130.         return "ThrottledInputStream{"
  131.                + "bytesRead="
  132.                + bytesRead
  133.                + ", maxBytesPerSec="
  134.                + maxBytesPerSec
  135.                + ", bytesPerSec="
  136.                + getBytesPerSec()
  137.                + ", totalSleepTime="
  138.                + totalSleepTime
  139.                + '}';
  140.     }
  141. }
Advertisement
Add Comment
Please, Sign In to add comment