The Java Collections Queue implementations will either grow without limit, or block if it grows beyond a given size, like the LinkedBlockingDeque. However, what if you need a non-blocking queue which drops its oldest elements? The Apache Commons CircularFifoQueue covers that. The snippet below shows typical use, with a queue size of two, and where the first element of three is dropped.

  public void testDrop() {
    CircularFifoQueue<Integer> queue = new CircularFifoQueue<>(2);

    queue.add(1);
    queue.add(2);
    queue.add(3);

    assertTrue(2 == queue.poll());
    assertTrue(3 == queue.poll());
    assertTrue(queue.isEmpty());
  }

To install the Apache Commons 4.0 library on Debian / Ubuntu:

apt-get install libcommons-collections4-java libcommons-collections4-java-doc
apt-get source libcommons-collections4-java

The relevant files will be located at:

/usr/share/java/commons-collections4.jar  
/usr/share/maven-repo/org/apache/commons/commons-collections4/4.0/commons-collections4-4.0-javadoc.jar

Often, a queue is populated on one thread, and consumed by another. In this case, the access methods have to be synchronized, as seen in this example. Both offer() and poll() methods are non-blocking, and null is returned if the queue is empty.

  public void testMultithreaded() {
    CircularFifoQueue<Integer> queue = new CircularFifoQueue<>(5);

    Thread insertThread = new Thread(() -> {
      int i = 0;
      while (true) {
        synchronized (queue) {
          queue.offer(i++);
        }
      }
    });
    insertThread.start();

    sleep(1);
    for (int i = 0; i < 10; i++) {
      synchronized (queue) {
        System.out.println("" + i + ": " + queue.poll());
      }
    }
  }

Finally, how does this queue work with Streams? In a single-thread context, there shouldn’t be a problem. However, when multithreaded it gets more tricky. The example below fails since the two threads operate on the queue concurrently, and a NoSuchElementException is often thrown. The ConcurrentLinkedQueue is thread-safe, but unbounded. Furthermore, its documentation states that “the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal”. Which means we’re back to square one.

  public void streamFail() {
    CircularFifoQueue<Integer> queue = new CircularFifoQueue<>(10);

    Thread insertThread = new Thread(() -> {
      int i = 0;
      while (true) {
        queue.offer(i++);
      }
    });
    insertThread.start();

    sleep(1000);
    System.out.println("size=" + queue.size());
    // throws NoSuchElementException
    queue.stream().forEach(System.out::println);
  }

There are a few work-arounds, mentioned in this discussion. One trick is to use the Stream.generate() method, which will loop indefinilty, and synchronize on the queue within. The problem is, that this will never stop, which might be okey depending on your application. However, you’d have to run this on a spearate thread. Alternativly, use the limit() method or a stream terminating operation (e.g. findFirst()).

  public void streamGenerate() {
    CircularFifoQueue<Integer> queue = new CircularFifoQueue<>(10);

    Thread insertThread = new Thread(() -> {
      int i = 0;
      while (true) {
        synchronized (queue) {
          queue.offer(i++);
        }
      }
    });
    insertThread.start();

    Stream.generate(() -> {
      synchronized (queue) {
        return queue.poll();
      }
    })
    // Never stops without the limit
    .limit(20)
    .forEach(System.out::println);
  }

Also worth mentioning, is the Google Guava implementation EvictingQueue. However, it it also not thread-safe.

Here’s the full listing with all test methods.

CircularFifoQueueTest.java
GitHub Raw
/* Copyright rememberjava.com. Licensed under GPL 3. See http://rememberjava.com/license */
package com.rememberjava.apache;

import static org.junit.Assert.assertTrue;

import java.util.stream.Stream;

import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.junit.Test;

public class CircularFifoQueueTest {

  @Test
  public void testDrop() {
    CircularFifoQueue<Integer> queue = new CircularFifoQueue<>(2);

    queue.add(1);
    queue.add(2);
    queue.add(3);

    assertTrue(2 == queue.poll());
    assertTrue(3 == queue.poll());
    assertTrue(queue.isEmpty());
  }

  @Test
  public void testMultithreaded() {
    CircularFifoQueue<Integer> queue = new CircularFifoQueue<>(5);

    Thread insertThread = new Thread(() -> {
      int i = 0;
      while (true) {
        synchronized (queue) {
          queue.offer(i++);
        }
      }
    });
    insertThread.start();

    sleep(1);
    for (int i = 0; i < 10; i++) {
      synchronized (queue) {
        System.out.println("" + i + ": " + queue.poll());
      }
    }
  }

  @Test
  public void streamFail() {
    CircularFifoQueue<Integer> queue = new CircularFifoQueue<>(10);

    Thread insertThread = new Thread(() -> {
      int i = 0;
      while (true) {
        queue.offer(i++);
      }
    });
    insertThread.start();

    sleep(1000);
    System.out.println("size=" + queue.size());
    // throws NoSuchElementException
    queue.stream().forEach(System.out::println);
  }

  @Test
  public void streamGenerate() {
    CircularFifoQueue<Integer> queue = new CircularFifoQueue<>(10);

    Thread insertThread = new Thread(() -> {
      int i = 0;
      while (true) {
        synchronized (queue) {
          queue.offer(i++);
        }
      }
    });
    insertThread.start();

    Stream.generate(() -> {
      synchronized (queue) {
        return queue.poll();
      }
    })
    // Never stops without the limit
    .limit(20)
    .forEach(System.out::println);
  }

  private void sleep(long millis) {
    try {
      Thread.sleep(millis);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}