Java Concurrency: Queue Processing, Part 2

In Java Concurrency: Queue Processing, Part 1 (the first installment in this series), we explored how to coordinate a pool of worker threads listening on a queue, which was also the monitor object used to signal workers to process the queue. In this installment, we look at thejava.util.concurrent package. Specifically, we’ll explore how to coordinate the activities of multiple threads using the SychronousQueue class and other classes that implement theBlockingQueue interface.

The Hand Off

There are times when you need to process different actions in different threads, but they need to be coordinated using a hand-off pattern. In other words, you can have a child thread that does some work, and then sits and waits for more work from a queue. On the other side, you can have a producer thread that chugs along until it needs to hand-off a portion of its work activity to another thread. With a SynchronousQueue, both sides of the queue (consumer and producer) will block until there is a corresponding producer and consumer, respectively.

To be precise, a SynchronousQueue isn’t really a queue at all, since it doesn’t have any capacity. The requirement is that upon insertion into the queue, the producer will block until there is a consumer ready to pull a message off the queue. Similarly, a consumer will block until a producer places a message onto the queue; hence the hand-off pattern.

Fortunately, a SynchronousQueue is very easy to use; all of the complexity is implemented for you within Java’s concurrent collection classes. To illustrate, let’s look at a simple example. First, the worker (consumer) thread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MyWorker extends Thread {
    private final BlockingQueue<String> queue;
    public MyWorker(BlockingQueue<String> queue) {
        this.queue = queue;
    }
 
    public void run() {
        try {
            while ( true ) {
                String s = queue.take();
                doWork(s);
            }
        }
        catch ( InterruptedException ie ) {
            // just terminate
        }
    }
    
}

The doWork() method has been omitted, but all it does in this example is output a simple message. With the SynchronousQueue class, consumers call the blocking take() method. Calls to remove() or peek() will result in an Exception or null respectively, always. Also, the call to take() will only return when a producer places something into the queue.

The producer code is straightforward as well:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SyncQueueExample {
    // …   
    public SyncQueueExample() {
        try {
            int workItem = 0;
            // Create a synchronous queue
            BlockingQueue<String> queue = new SynchronousQueue<String>();
            // Create the child worker thread
            MyWorker worker = new MyWorker(queue);
            worker.start();
            // Start sending to the queue
            while ( true ) {
                System.out.println("\nPlacing work on queue");
                String work = "Work Item:" + (++workItem);
                queue.put(work);
            }
        }
        catch ( Exception e ) {
            e.printStackTrace();
        }
    }
}

When run, the output between the queue producer and consumer will be “coordinated,” in effect, and alternate as such:

1
2
3
4
5
6
7
8
9
10
11
12
Placing work on queue
Thread 'Thread-1' processing work: Work Item:1
Placing work on queue
Thread 'Thread-1' processing work: Work Item:2
Placing work on queue
Thread 'Thread-1' processing work: Work Item:3
Placing work on queue
Thread 'Thread-1' processing work: Work Item:4

To continue on the worker thread pool theme from the previous blog, you can create multiple queue consumers, each of which will block on the call to the SynchronousQueue‘s take()method until it’s chosen to process a message:

1
2
3
4
5
6
// Create child worker thread pool
int POOL_SIZE = 5;
for ( int i = 0; i < POOL_SIZE; i++ ) {
    MyWorker worker = new MyWorker(queue);
    worker.start();
}

This pattern is useful if processing each queued work request were to involve some lengthy operation, such as file IO. Since there can be many consumers waiting, the work queue producer would be free to continuously generate work even while an individual consumer took considerable time processing a single item. In this case, the producer thread would only be blocked on insertion when all consumer threads were busy processing. Once a worker thread completed its processing and once again called take(), the producer’s call to put() would unblock and the entire process would continue.

Hand Off, with Buffering

A similar scenario can be achieved with the ArrayBlockingQueue. In this implementation of the BlockingQueue interface, a producer can produce work on a queue until a certain number of outstanding queued items exist, at which point it will block. Until that number is reached, however, the producer will be free to continue producing work. As with a SychronousQueue, calls to take() by a consumer will block if there are no items on queue, but will return if and when queued items exist.

As for the consumer worker threads, the MyWorker class remains exactly the same as in the previous example. Since both SychronousQueue and ArrayBlockingQueue are instances of the BlockingQueue interface, no code change is required — this is polymorphism in action.

Regarding the producer, the code from the previous example is mostly unchanged, except the single line where a SynchronousQueue was created. Instead, we need to create anArrayBlockingQueue, where a number is provided in the constructor:

1
2
3
// Create a synchronous queue
int MAX_Q_SIZE = 5;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(MAX_Q_SIZE);

This number indicates the maximum size of the queue, which is the number of items that can be placed on the queue before the producer blocks on calls to put(). Once consumers remove items and the queue size drops below this maximum, the producer’s call to put() will return. Depending on OS thread scheduling as well as whether you specify the “fairness” of the queue’s consumer processing, you will see varied output in terms of messages being placed on the queue and when they are processed:

1
2
3
4
5
6
7
8
9
10
11
12
...
Placing work on queue
Placing work on queue
Placing work on queue
Thread 'Thread-1' processing work: Work Item:6
Thread 'Thread-2' processing work: Work Item:7
Thread 'Thread-3' processing work: Work Item:8
Thread 'Thread-4' processing work: Work Item:9
Thread 'Thread-5' processing work: Work Item:10
Placing work on queue
Placing work on queue
...

Priority, Delays, and Other Details

There are multiple queue classes in the java.util.concurrent package that implement theBlockingQueue interface (we’ve examined the first two in detail here):

  • SynchronousQueue: A hand-pattern. Producer(s) block until there is a consumer available; consumer(s) block until there is an enqueued message.
  • ArrayBlockingQueue: Similar to the SynchronousQueue, except the queue can contain a pre-set number of items before the queue producer(s) block on queue insertion.
  • DelayQueue: Elements placed in the queue are not available for removal until their delay time has expired. Items that are furthest past their expiration time are available for removal first. Calls to put() do not block since this queue is unbounded, although calls to take()will block until an expired message is available.
  • LinkedBlockingQueue: Similar to an ArrayBlockingQueue, but where queue growth is allowed. However, the queue constructor does accept an optional parameter to constrain the growth to a maximum number of items.
  • LinkedTransferQueue: Similar to SynchronousQueue, except the queue is unbounded in size and calls to put() never block. Calls to take(), however, will block until a queued item is available for removal.
  • PriorityBlockingQueue: Similar to SynchronousQueue, except the queue is unbounded in size and calls to put() never block. Calls to take(), however, will block until a queued item is available for removal, and those items are subject to priority-based ordering. Enqueued objects must be comparable, meaning they must implement the Comparableinterface and required methods.

In part 3 of this series on Java concurrency, we’ll continue with the java.util.concurrent package and explore Executors and the ExecutorService class.

Leave a comment