Java Concurrency: Queue Processing, Part 2

In Java Concurrency: Queue Processing, Part 1 (the first installment in this series), we explored how to coordinate a pool of worker threads listening on a queue, which was also the monitor object used to signal workers to process the queue. In this installment, we look at thejava.util.concurrent package. Specifically, we’ll explore how to coordinate the activities of multiple threads using the SychronousQueue class and other classes that implement theBlockingQueue interface.

The Hand Off

There are times when you need to process different actions in different threads, but they need to be coordinated using a hand-off pattern. In other words, you can have a child thread that does some work, and then sits and waits for more work from a queue. On the other side, you can have a producer thread that chugs along until it needs to hand-off a portion of its work activity to another thread. With a SynchronousQueue, both sides of the queue (consumer and producer) will block until there is a corresponding producer and consumer, respectively.

To be precise, a SynchronousQueue isn’t really a queue at all, since it doesn’t have any capacity. The requirement is that upon insertion into the queue, the producer will block until there is a consumer ready to pull a message off the queue. Similarly, a consumer will block until a producer places a message onto the queue; hence the hand-off pattern.

Fortunately, a SynchronousQueue is very easy to use; all of the complexity is implemented for you within Java’s concurrent collection classes. To illustrate, let’s look at a simple example. First, the worker (consumer) thread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MyWorker extends Thread {
    private final BlockingQueue<String> queue;
    public MyWorker(BlockingQueue<String> queue) {
        this.queue = queue;
    }
 
    public void run() {
        try {
            while ( true ) {
                String s = queue.take();
                doWork(s);
            }
        }
        catch ( InterruptedException ie ) {
            // just terminate
        }
    }
    
}

The doWork() method has been omitted, but all it does in this example is output a simple message. With the SynchronousQueue class, consumers call the blocking take() method. Calls to remove() or peek() will result in an Exception or null respectively, always. Also, the call to take() will only return when a producer places something into the queue.

The producer code is straightforward as well:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SyncQueueExample {
    // …   
    public SyncQueueExample() {
        try {
            int workItem = 0;
            // Create a synchronous queue
            BlockingQueue<String> queue = new SynchronousQueue<String>();
            // Create the child worker thread
            MyWorker worker = new MyWorker(queue);
            worker.start();
            // Start sending to the queue
            while ( true ) {
                System.out.println("\nPlacing work on queue");
                String work = "Work Item:" + (++workItem);
                queue.put(work);
            }
        }
        catch ( Exception e ) {
            e.printStackTrace();
        }
    }
}

When run, the output between the queue producer and consumer will be “coordinated,” in effect, and alternate as such:

1
2
3
4
5
6
7
8
9
10
11
12
Placing work on queue
Thread 'Thread-1' processing work: Work Item:1
Placing work on queue
Thread 'Thread-1' processing work: Work Item:2
Placing work on queue
Thread 'Thread-1' processing work: Work Item:3
Placing work on queue
Thread 'Thread-1' processing work: Work Item:4

To continue on the worker thread pool theme from the previous blog, you can create multiple queue consumers, each of which will block on the call to the SynchronousQueue‘s take()method until it’s chosen to process a message:

1
2
3
4
5
6
// Create child worker thread pool
int POOL_SIZE = 5;
for ( int i = 0; i < POOL_SIZE; i++ ) {
    MyWorker worker = new MyWorker(queue);
    worker.start();
}

This pattern is useful if processing each queued work request were to involve some lengthy operation, such as file IO. Since there can be many consumers waiting, the work queue producer would be free to continuously generate work even while an individual consumer took considerable time processing a single item. In this case, the producer thread would only be blocked on insertion when all consumer threads were busy processing. Once a worker thread completed its processing and once again called take(), the producer’s call to put() would unblock and the entire process would continue.

Hand Off, with Buffering

A similar scenario can be achieved with the ArrayBlockingQueue. In this implementation of the BlockingQueue interface, a producer can produce work on a queue until a certain number of outstanding queued items exist, at which point it will block. Until that number is reached, however, the producer will be free to continue producing work. As with a SychronousQueue, calls to take() by a consumer will block if there are no items on queue, but will return if and when queued items exist.

As for the consumer worker threads, the MyWorker class remains exactly the same as in the previous example. Since both SychronousQueue and ArrayBlockingQueue are instances of the BlockingQueue interface, no code change is required — this is polymorphism in action.

Regarding the producer, the code from the previous example is mostly unchanged, except the single line where a SynchronousQueue was created. Instead, we need to create anArrayBlockingQueue, where a number is provided in the constructor:

1
2
3
// Create a synchronous queue
int MAX_Q_SIZE = 5;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(MAX_Q_SIZE);

This number indicates the maximum size of the queue, which is the number of items that can be placed on the queue before the producer blocks on calls to put(). Once consumers remove items and the queue size drops below this maximum, the producer’s call to put() will return. Depending on OS thread scheduling as well as whether you specify the “fairness” of the queue’s consumer processing, you will see varied output in terms of messages being placed on the queue and when they are processed:

1
2
3
4
5
6
7
8
9
10
11
12
...
Placing work on queue
Placing work on queue
Placing work on queue
Thread 'Thread-1' processing work: Work Item:6
Thread 'Thread-2' processing work: Work Item:7
Thread 'Thread-3' processing work: Work Item:8
Thread 'Thread-4' processing work: Work Item:9
Thread 'Thread-5' processing work: Work Item:10
Placing work on queue
Placing work on queue
...

Priority, Delays, and Other Details

There are multiple queue classes in the java.util.concurrent package that implement theBlockingQueue interface (we’ve examined the first two in detail here):

  • SynchronousQueue: A hand-pattern. Producer(s) block until there is a consumer available; consumer(s) block until there is an enqueued message.
  • ArrayBlockingQueue: Similar to the SynchronousQueue, except the queue can contain a pre-set number of items before the queue producer(s) block on queue insertion.
  • DelayQueue: Elements placed in the queue are not available for removal until their delay time has expired. Items that are furthest past their expiration time are available for removal first. Calls to put() do not block since this queue is unbounded, although calls to take()will block until an expired message is available.
  • LinkedBlockingQueue: Similar to an ArrayBlockingQueue, but where queue growth is allowed. However, the queue constructor does accept an optional parameter to constrain the growth to a maximum number of items.
  • LinkedTransferQueue: Similar to SynchronousQueue, except the queue is unbounded in size and calls to put() never block. Calls to take(), however, will block until a queued item is available for removal.
  • PriorityBlockingQueue: Similar to SynchronousQueue, except the queue is unbounded in size and calls to put() never block. Calls to take(), however, will block until a queued item is available for removal, and those items are subject to priority-based ordering. Enqueued objects must be comparable, meaning they must implement the Comparableinterface and required methods.

In part 3 of this series on Java concurrency, we’ll continue with the java.util.concurrent package and explore Executors and the ExecutorService class.

Advertisements

Java Concurrency: Queue Processing, Part 1

The Queue as Monitor

Every Java class extends the Object class by default. There are several built-in methods on this class, of which we’ll focus on wait()notify(), and notifyAll(). These methods operate on the object’s monitor, which is used to synchronize operations across a group of threads. There are some rules to follow, but it’s really quite simple: The waiting thread needs to own the object’s monitor, which means it must call wait() within the code that synchronizes on the monitor. There are three ways of doing this, reference the Java API for more details. Here’s one example:

1
2
3
4
5
6
synchronized (obj) {
     while ( condition ) {
         obj.wait();
         // perform some action
     }
}

Conversely, the thread that signals the waiting thread(s) will call notify()or notifyAll() on the same monitor object. It, too, needs to own the monitor to do this, as in this example:

1
2
3
synchronized (obj) {
    obj.notify();
}

Calling notify() wakes up a single thread waiting on the monitor, while notifyAll() wakes up all of the threads waiting on the monitor. In general, calling notify()is more efficient as it results in fewer thread context switches, but there are times where notifyAll() may be a better choice. For instance, when work is produced at a high rate, it may be more efficient to wake as many threads as possible each time work is placed on the queue. This is mainly an implementation decision that you need to make.

Multithreaded Queue Example

Let’s examine a common scenario in which tasks are executed by worker threads, and where the work is coordinated via a queue to ensure once-and-only-once execution of each task. In this case, we’ll use the queue (itself a Java Object) as the monitor as well. Here’s the overall design:

  • Create a queue to act as a monitor and a way to transfer work
  • Create a pool of worker threads (consumers) that each wait on the monitor (the queue)
  • Create one or more producers that place items on the queue
  • Notify the monitor when an item is placed on the queue, which in turn wakes up a worker to pull the next item off the queue

Let’s start by looking at the worker’s class, MyWorker, which itself extends Thread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class MyWorker extends Thread {
    private static int instance = 0;
    private final Queue<String> queue;
    
    public MyWorker(Queue queue) {
        this.queue = queue;
        setName("MyWorker:" + (instance++));
    }
    
    @Override
    public void run() {
        while ( true ) {
            try {
                Runnable work = null;
                synchronized ( queue ) {
                    while ( queue.isEmpty() )
                        queue.wait();
                    
                    // Get the next work item off of the queue
                    work = queue.remove();
                }
                // Process the work item
                work.run();
            }
            catch ( InterruptedException ie ) {
                break// Terminate
            }
        }
    }
    private void doWork(Runnable work) { ... }
}

The code that creates a pool of MyWorker objects, which is essentially a Thread pool, looks like this:

1
2
3
4
5
6
7
for ( int i = 0; i < THREAD_POOL_SIZE; i++ ) {
    // Create and start the worker thread, which will
    // immediately wait on the monitor (the queue in
    // this case) to be signaled to begin processing
    MyWorker worker = new MyWorker(queue);
    worker.start();
}

The code to place work on the queue and signal a worker thread looks like this:

1
2
3
4
5
6
7
8
9
10
synchronized ( queue ) {
    // Add work to the queue
    Runnable work = getMoreWork();
    queue.add(work);
    // Notify the monitor object that all the threads
    // are waiting on. This will awaken just one to
    // begin processing work from the queue
    queue.notify();
}

This code illustrates how straightforward basic Java thread synchronization is, although it is low-level. In Part 2 of this series, we’ll look at the java.util.concurrent package and how it contains built-in classes to make synchronized queue and task processing even simpler.