Threads Java

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
public class ByteFIFOTest extends Object {
  private ByteFIFO fifo;
  private byte[] srcData;
  public ByteFIFOTest() throws IOException {
    fifo = new ByteFIFO(20);
    makeSrcData();
    System.out.println("srcData.length=" + srcData.length);
    Runnable srcRunnable = new Runnable() {
        public void run() {
          src();
        }
      };
    Thread srcThread = new Thread(srcRunnable);
    srcThread.start();
    Runnable dstRunnable = new Runnable() {
        public void run() {
          dst();
        }
      };
    Thread dstThread = new Thread(dstRunnable);
    dstThread.start();
  }
  private void makeSrcData() throws IOException {
    String[] list = {
        "The first string is right here",
        "The second string is a bit longer and also right here",
        "The third string",
        "ABCDEFGHIJKLMNOPQRSTUVWXYZ",
        "0123456789",
        "The last string in the list"
      };
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(baos);
    oos.writeObject(list);
    oos.flush();
    oos.close();
    
    srcData = baos.toByteArray();
  }
  private void src() {
    try {
      boolean justAddOne = true;
      int count = 0;
      while ( count < srcData.length ) {
        if ( !justAddOne ) {
          int writeSize = (int) ( 40.0 * Math.random() );
          writeSize = Math.min(writeSize, srcData.length - count);
          byte[] buf = new byte[writeSize];
          System.arraycopy(srcData, count, buf, 0, writeSize);
          fifo.add(buf);
          count += writeSize;
          System.out.println("just added " + writeSize + " bytes");
        } else {
          fifo.add(srcData[count]);
          count++;
          System.out.println("just added exactly 1 byte");
        }
        justAddOne = !justAddOne;
      }
    } catch ( InterruptedException x ) {
      x.printStackTrace();
    }
  }
  private void dst() {
    try {
      boolean justAddOne = true;
      int count = 0;
      byte[] dstData = new byte[srcData.length];
      while ( count < dstData.length ) {
        if ( !justAddOne ) {
          byte[] buf = fifo.removeAll();
          if ( buf.length > 0 ) {
            System.arraycopy(buf, 0, dstData, count, buf.length);
            count += buf.length;
          }
          System.out.println(
            "just removed " + buf.length + " bytes");
        } else {
          byte b = fifo.remove();
          dstData[count] = b;
          count++;
          System.out.println(
            "just removed exactly 1 byte");
        }
        justAddOne = !justAddOne;
      }
      System.out.println("received all data, count=" + count);
      ObjectInputStream ois = new ObjectInputStream(
          new ByteArrayInputStream(dstData));
      String[] line = (String[]) ois.readObject();
      for ( int i = 0; i < line.length; i++ ) {
        System.out.println("line[" + i + "]=" + line[i]);
      }
    } catch ( ClassNotFoundException x1 ) {
      x1.printStackTrace();
    } catch ( IOException iox ) {
      iox.printStackTrace();
    } catch ( InterruptedException x ) {
      x.printStackTrace();
    }
  }
  public static void main(String[] args) {
    try {
      new ByteFIFOTest();
    } catch ( IOException iox ) {
      iox.printStackTrace();
    }
  }
}
class ByteFIFO extends Object {
  private byte[] queue;
  private int capacity;
  private int size;
  private int head;
  private int tail;
  public ByteFIFO(int cap) {
    capacity = ( cap > 0 ) ? cap : 1; // at least 1
    queue = new byte[capacity];
    head = 0;
    tail = 0;
    size = 0;
  }
  public int getCapacity() {
    return capacity;
  }
  public synchronized int getSize() {
    return size;
  }
  public synchronized boolean isEmpty() {
    return ( size == 0 );
  }
  public synchronized boolean isFull() {
    return ( size == capacity );
  }
  public synchronized void add(byte b) 
      throws InterruptedException {
    waitWhileFull();
    queue[head] = b;
    head = ( head + 1 ) % capacity;
    size++;
    notifyAll(); // let any waiting threads know about change
  }
  public synchronized void add(byte[] list) 
      throws InterruptedException {
    // For efficiency, the bytes are copied in blocks
    // instead of one at a time. As space becomes available,
    // more bytes are copied until all of them have been
    // added.
    int ptr = 0;
    while ( ptr < list.length ) {
      // If full, the lock will be released to allow 
      // another thread to come in and remove bytes.
      waitWhileFull();
      int space = capacity - size;
      int distToEnd = capacity - head;
      int blockLen = Math.min(space, distToEnd);
      int bytesRemaining = list.length - ptr;
      int copyLen = Math.min(blockLen, bytesRemaining);
      System.arraycopy(list, ptr, queue, head, copyLen);
      head = ( head + copyLen ) % capacity;
      size += copyLen;
      ptr += copyLen;
      // Keep the lock, but let any waiting threads 
      // know that something has changed.
      notifyAll();
    }
  }
  public synchronized byte remove() 
      throws InterruptedException {
    waitWhileEmpty();
    
    byte b = queue[tail];
    tail = ( tail + 1 ) % capacity;
    size--;
    notifyAll(); // let any waiting threads know about change
    return b;
  }
  public synchronized byte[] removeAll() {
    // For efficiency, the bytes are copied in blocks
    // instead of one at a time. 
    if ( isEmpty() ) {
      // Nothing to remove, return a zero-length
      // array and do not bother with notification
      // since nothing was removed.
      return new byte[0]; 
    }
    // based on the current size
    byte[] list = new byte[size]; 
    // copy in the block from tail to the end
    int distToEnd = capacity - tail;
    int copyLen = Math.min(size, distToEnd);
    System.arraycopy(queue, tail, list, 0, copyLen);
    // If data wraps around, copy the remaining data
    // from the front of the array.
    if ( size > copyLen ) {
      System.arraycopy(
          queue, 0, list, copyLen, size - copyLen);
    }
    tail = ( tail + size ) % capacity;
    size = 0; // everything has been removed
    // Signal any and all waiting threads that 
    // something has changed.
    notifyAll(); 
    return list; 
  }
  public synchronized byte[] removeAtLeastOne() 
      throws InterruptedException {
    waitWhileEmpty(); // wait for a least one to be in FIFO
    return removeAll();
  }
  public synchronized boolean waitUntilEmpty(long msTimeout) 
      throws InterruptedException {
    if ( msTimeout == 0L ) {
      waitUntilEmpty();  // use other method
      return true;
    }
    // wait only for the specified amount of time
    long endTime = System.currentTimeMillis() + msTimeout;
    long msRemaining = msTimeout;
    while ( !isEmpty() && ( msRemaining > 0L ) ) {
      wait(msRemaining);
      msRemaining = endTime - System.currentTimeMillis();
    }
    // May have timed out, or may have met condition, 
    // calc return value.
    return isEmpty();
  }
  public synchronized void waitUntilEmpty() 
      throws InterruptedException {
    while ( !isEmpty() ) {
      wait();
    }
  }
  public synchronized void waitWhileEmpty() 
      throws InterruptedException {
    while ( isEmpty() ) {
      wait();
    }
  }
  public synchronized void waitUntilFull() 
      throws InterruptedException {
    while ( !isFull() ) {
      wait();
    }
  }
  public synchronized void waitWhileFull() 
      throws InterruptedException {
    while ( isFull() ) {
      wait();
    }
  }
}