Java Concurrency: Queue Processing, Part 1

The Queue as Monitor

Every Java class extends the Object class by default. There are several built-in methods on this class, of which we’ll focus on wait()notify(), and notifyAll(). These methods operate on the object’s monitor, which is used to synchronize operations across a group of threads. There are some rules to follow, but it’s really quite simple: The waiting thread needs to own the object’s monitor, which means it must call wait() within the code that synchronizes on the monitor. There are three ways of doing this, reference the Java API for more details. Here’s one example:

1
2
3
4
5
6
synchronized (obj) {
     while ( condition ) {
         obj.wait();
         // perform some action
     }
}

Conversely, the thread that signals the waiting thread(s) will call notify()or notifyAll() on the same monitor object. It, too, needs to own the monitor to do this, as in this example:

1
2
3
synchronized (obj) {
    obj.notify();
}

Calling notify() wakes up a single thread waiting on the monitor, while notifyAll() wakes up all of the threads waiting on the monitor. In general, calling notify()is more efficient as it results in fewer thread context switches, but there are times where notifyAll() may be a better choice. For instance, when work is produced at a high rate, it may be more efficient to wake as many threads as possible each time work is placed on the queue. This is mainly an implementation decision that you need to make.

Multithreaded Queue Example

Let’s examine a common scenario in which tasks are executed by worker threads, and where the work is coordinated via a queue to ensure once-and-only-once execution of each task. In this case, we’ll use the queue (itself a Java Object) as the monitor as well. Here’s the overall design:

  • Create a queue to act as a monitor and a way to transfer work
  • Create a pool of worker threads (consumers) that each wait on the monitor (the queue)
  • Create one or more producers that place items on the queue
  • Notify the monitor when an item is placed on the queue, which in turn wakes up a worker to pull the next item off the queue

Let’s start by looking at the worker’s class, MyWorker, which itself extends Thread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class MyWorker extends Thread {
    private static int instance = 0;
    private final Queue<String> queue;
    
    public MyWorker(Queue queue) {
        this.queue = queue;
        setName("MyWorker:" + (instance++));
    }
    
    @Override
    public void run() {
        while ( true ) {
            try {
                Runnable work = null;
                synchronized ( queue ) {
                    while ( queue.isEmpty() )
                        queue.wait();
                    
                    // Get the next work item off of the queue
                    work = queue.remove();
                }
                // Process the work item
                work.run();
            }
            catch ( InterruptedException ie ) {
                break// Terminate
            }
        }
    }
    private void doWork(Runnable work) { ... }
}

The code that creates a pool of MyWorker objects, which is essentially a Thread pool, looks like this:

1
2
3
4
5
6
7
for ( int i = 0; i < THREAD_POOL_SIZE; i++ ) {
    // Create and start the worker thread, which will
    // immediately wait on the monitor (the queue in
    // this case) to be signaled to begin processing
    MyWorker worker = new MyWorker(queue);
    worker.start();
}

The code to place work on the queue and signal a worker thread looks like this:

1
2
3
4
5
6
7
8
9
10
synchronized ( queue ) {
    // Add work to the queue
    Runnable work = getMoreWork();
    queue.add(work);
    // Notify the monitor object that all the threads
    // are waiting on. This will awaken just one to
    // begin processing work from the queue
    queue.notify();
}

This code illustrates how straightforward basic Java thread synchronization is, although it is low-level. In Part 2 of this series, we’ll look at the java.util.concurrent package and how it contains built-in classes to make synchronized queue and task processing even simpler.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s