AsynchronousBufferedInputStreamWrapper

Some time ago i was looking for a nice and quiet fast implementation for a buffered inputstream. I found some pieces and finally i came up with this one.

Just create a new AsynchronousBufferedInputStreamWrapper by using your unbuffered InputStream and the buffersize of your choice (eg. 512*256)

Invoke startBuffering() to start the buffering.

getBufferLevel() will return the current level of the buffer in percent of maximum.

This implementation is untested but it might be useful for someone. Please note also that you might have to change my logger implementation.

import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.schueth.snplayer.utils.SimpleLogger;

public class AsynchronousBufferedInputStreamWrapper extends InputStream {

	Thread thread;
	BufferingThread buffering;
	Logger logger = SimpleLogger.getLogger();

	public AsynchronousBufferedInputStreamWrapper(InputStream inputStream,
			int bufferSizeInBytes) {

		buffering = new BufferingThread(this, new BufferedInputStream(
				inputStream, bufferSizeInBytes), bufferSizeInBytes);
	}

	public float getBufferLevel() {
		return buffering.getBufferLevel();
	}

	public void startBuffering() {
		logger.debug("Buffering started");
		thread = new Thread(buffering);
		thread.start();
	}

	@Override
	public int read() throws IOException {

		if (this.thread == null) {
			logger.info("MANUAL OVERRIDE!");
			startBuffering();
		}
		waitForCurrentByteBuffer();
		if (reachedEndOfStream()) {
			return -1;
		}

		byte b = buffering.currentByteBuffer[buffering.currentBufferPosition];
		buffering.currentBufferPosition++;
		buffering.overallBytesConsumed++;
		return b & 0xFF;
	}

	private boolean reachedEndOfStream() {
		return buffering.overallBytesConsumed == buffering.overallBytesBuffered;
	}

	private void waitForCurrentByteBuffer() {
		if (buffering.currentByteBuffer == null
				|| buffering.currentBufferPosition > buffering.currentByteBuffer.length - 1) {
			buffering.currentByteBuffer = null;
			while (buffering.currentByteBuffer == null && !reachedEndOfStream()) {
				// System.out.println("Getting Queue element");
				buffering.currentByteBuffer = buffering.dataQueue.poll();
				buffering.currentBufferPosition = 0;
				try {
					TimeUnit.MILLISECONDS.sleep(5L);
				} catch (InterruptedException e) {
					logger.error(e);
				}
			}
		}
	}

	@Override
	public void close() throws IOException {
		super.close();
		this.thread.stop();
	}
}

class BufferingThread implements Runnable {

	AsynchronousBufferedInputStreamWrapper reference;
	Logger logger = SimpleLogger.getLogger();

	public static final int DEFAULT_BUFFER_SEGMENTSIZE = 128 * 1024;
	volatile boolean eof = false;

	volatile BufferedInputStream wrappedInputStream;
	volatile ConcurrentLinkedQueue<byte[]> dataQueue;
	int maxDataQueueSize;
	volatile int overallBytesConsumed = 0;
	int currentBufferPosition;

	volatile int totalBufferSizeInBytes;
	volatile int bufferSizeInBytes;
	volatile int overallBytesBuffered;
	int lastCurrentBytesBuffered;
	int currentBytesBuffered;
	byte[] currentByteBuffer;

	public BufferingThread(AsynchronousBufferedInputStreamWrapper reference,
			BufferedInputStream wrappedInputStream, int totalBufferSizeInBytes) {

		this.reference = reference;
		this.wrappedInputStream = wrappedInputStream;
		this.dataQueue = new ConcurrentLinkedQueue<byte[]>();

		this.totalBufferSizeInBytes = totalBufferSizeInBytes;
		this.maxDataQueueSize = totalBufferSizeInBytes
				/ DEFAULT_BUFFER_SEGMENTSIZE;
		this.bufferSizeInBytes = DEFAULT_BUFFER_SEGMENTSIZE;

		logger.info("totalBufferSizeInBytes: " + totalBufferSizeInBytes);
		logger.info("maxDataQueueSize: " + maxDataQueueSize);
		logger.info("bufferSizeInBytes: " + bufferSizeInBytes);

	}

	public float getBufferLevel() {
		logger.debug("overallBytesBuffered: " + overallBytesBuffered);
		logger.debug("overallBytesConsumed: " + overallBytesConsumed);
		logger.debug("dataQueue.size(): " + dataQueue.size());
		logger.debug("totalBufferSizeInBytes: " + totalBufferSizeInBytes);
		if (overallBytesBuffered == 0 || (overallBytesBuffered==overallBytesConsumed)) {
			return 0;
		}

		return (((float)overallBytesConsumed / (float)overallBytesBuffered)-1 )*-1;

	}

	public void run() {

		currentBytesBuffered = overallBytesBuffered - overallBytesConsumed;

		while (!eof) {

			try {

				// check if buffer is not full
				if (dataQueue.size() < maxDataQueueSize) {

					// System.out.println("Buffering...");
					ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

					// check if buffer is not full
					while (currentBytesBuffered < bufferSizeInBytes) {
						byte[] buffer = new byte[bufferSizeInBytes];
						int bytesRead = wrappedInputStream.read(buffer);

						if (bytesRead == -1) {
							// EOF
							logger.error("EOF");
							wrappedInputStream.close();
							eof = true;
							break;
						}

						// writing bytes to os
						byteArrayOutputStream.write(buffer, 0, bytesRead);
						overallBytesBuffered += bytesRead;

						currentBytesBuffered = overallBytesBuffered
								- overallBytesConsumed;

					}

					if (!eof) {
						// Adding byte array to queue
						dataQueue.add(byteArrayOutputStream.toByteArray());
						currentBytesBuffered = 0;
					}

				} else {
					TimeUnit.MILLISECONDS.sleep(4L);
				}
			} catch (Exception e) {
				logger.error(e);
			}
		}

		logger.info("Finished Buffering");
	}
}