Java Concurrency: Queue Processing, Part 2

In Java Concurrency: Queue Processing, Part 1 (the first installment in this series), we explored how to coordinate a pool of worker threads listening on a queue, which was also the monitor object used to signal workers to process the queue. In this installment, we look at thejava.util.concurrent package. Specifically, we’ll explore how to coordinate the activities of multiple threads using the SychronousQueue class and other classes that implement theBlockingQueue interface.

The Hand Off

There are times when you need to process different actions in different threads, but they need to be coordinated using a hand-off pattern. In other words, you can have a child thread that does some work, and then sits and waits for more work from a queue. On the other side, you can have a producer thread that chugs along until it needs to hand-off a portion of its work activity to another thread. With a SynchronousQueue, both sides of the queue (consumer and producer) will block until there is a corresponding producer and consumer, respectively.

To be precise, a SynchronousQueue isn’t really a queue at all, since it doesn’t have any capacity. The requirement is that upon insertion into the queue, the producer will block until there is a consumer ready to pull a message off the queue. Similarly, a consumer will block until a producer places a message onto the queue; hence the hand-off pattern.

Fortunately, a SynchronousQueue is very easy to use; all of the complexity is implemented for you within Java’s concurrent collection classes. To illustrate, let’s look at a simple example. First, the worker (consumer) thread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MyWorker extends Thread {
    private final BlockingQueue<String> queue;
    public MyWorker(BlockingQueue<String> queue) {
        this.queue = queue;
    }
 
    public void run() {
        try {
            while ( true ) {
                String s = queue.take();
                doWork(s);
            }
        }
        catch ( InterruptedException ie ) {
            // just terminate
        }
    }
    
}

The doWork() method has been omitted, but all it does in this example is output a simple message. With the SynchronousQueue class, consumers call the blocking take() method. Calls to remove() or peek() will result in an Exception or null respectively, always. Also, the call to take() will only return when a producer places something into the queue.

The producer code is straightforward as well:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SyncQueueExample {
    // …   
    public SyncQueueExample() {
        try {
            int workItem = 0;
            // Create a synchronous queue
            BlockingQueue<String> queue = new SynchronousQueue<String>();
            // Create the child worker thread
            MyWorker worker = new MyWorker(queue);
            worker.start();
            // Start sending to the queue
            while ( true ) {
                System.out.println("\nPlacing work on queue");
                String work = "Work Item:" + (++workItem);
                queue.put(work);
            }
        }
        catch ( Exception e ) {
            e.printStackTrace();
        }
    }
}

When run, the output between the queue producer and consumer will be “coordinated,” in effect, and alternate as such:

1
2
3
4
5
6
7
8
9
10
11
12
Placing work on queue
Thread 'Thread-1' processing work: Work Item:1
Placing work on queue
Thread 'Thread-1' processing work: Work Item:2
Placing work on queue
Thread 'Thread-1' processing work: Work Item:3
Placing work on queue
Thread 'Thread-1' processing work: Work Item:4

To continue on the worker thread pool theme from the previous blog, you can create multiple queue consumers, each of which will block on the call to the SynchronousQueue‘s take()method until it’s chosen to process a message:

1
2
3
4
5
6
// Create child worker thread pool
int POOL_SIZE = 5;
for ( int i = 0; i < POOL_SIZE; i++ ) {
    MyWorker worker = new MyWorker(queue);
    worker.start();
}

This pattern is useful if processing each queued work request were to involve some lengthy operation, such as file IO. Since there can be many consumers waiting, the work queue producer would be free to continuously generate work even while an individual consumer took considerable time processing a single item. In this case, the producer thread would only be blocked on insertion when all consumer threads were busy processing. Once a worker thread completed its processing and once again called take(), the producer’s call to put() would unblock and the entire process would continue.

Hand Off, with Buffering

A similar scenario can be achieved with the ArrayBlockingQueue. In this implementation of the BlockingQueue interface, a producer can produce work on a queue until a certain number of outstanding queued items exist, at which point it will block. Until that number is reached, however, the producer will be free to continue producing work. As with a SychronousQueue, calls to take() by a consumer will block if there are no items on queue, but will return if and when queued items exist.

As for the consumer worker threads, the MyWorker class remains exactly the same as in the previous example. Since both SychronousQueue and ArrayBlockingQueue are instances of the BlockingQueue interface, no code change is required — this is polymorphism in action.

Regarding the producer, the code from the previous example is mostly unchanged, except the single line where a SynchronousQueue was created. Instead, we need to create anArrayBlockingQueue, where a number is provided in the constructor:

1
2
3
// Create a synchronous queue
int MAX_Q_SIZE = 5;
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(MAX_Q_SIZE);

This number indicates the maximum size of the queue, which is the number of items that can be placed on the queue before the producer blocks on calls to put(). Once consumers remove items and the queue size drops below this maximum, the producer’s call to put() will return. Depending on OS thread scheduling as well as whether you specify the “fairness” of the queue’s consumer processing, you will see varied output in terms of messages being placed on the queue and when they are processed:

1
2
3
4
5
6
7
8
9
10
11
12
...
Placing work on queue
Placing work on queue
Placing work on queue
Thread 'Thread-1' processing work: Work Item:6
Thread 'Thread-2' processing work: Work Item:7
Thread 'Thread-3' processing work: Work Item:8
Thread 'Thread-4' processing work: Work Item:9
Thread 'Thread-5' processing work: Work Item:10
Placing work on queue
Placing work on queue
...

Priority, Delays, and Other Details

There are multiple queue classes in the java.util.concurrent package that implement theBlockingQueue interface (we’ve examined the first two in detail here):

  • SynchronousQueue: A hand-pattern. Producer(s) block until there is a consumer available; consumer(s) block until there is an enqueued message.
  • ArrayBlockingQueue: Similar to the SynchronousQueue, except the queue can contain a pre-set number of items before the queue producer(s) block on queue insertion.
  • DelayQueue: Elements placed in the queue are not available for removal until their delay time has expired. Items that are furthest past their expiration time are available for removal first. Calls to put() do not block since this queue is unbounded, although calls to take()will block until an expired message is available.
  • LinkedBlockingQueue: Similar to an ArrayBlockingQueue, but where queue growth is allowed. However, the queue constructor does accept an optional parameter to constrain the growth to a maximum number of items.
  • LinkedTransferQueue: Similar to SynchronousQueue, except the queue is unbounded in size and calls to put() never block. Calls to take(), however, will block until a queued item is available for removal.
  • PriorityBlockingQueue: Similar to SynchronousQueue, except the queue is unbounded in size and calls to put() never block. Calls to take(), however, will block until a queued item is available for removal, and those items are subject to priority-based ordering. Enqueued objects must be comparable, meaning they must implement the Comparableinterface and required methods.

In part 3 of this series on Java concurrency, we’ll continue with the java.util.concurrent package and explore Executors and the ExecutorService class.

Advertisements

Java Concurrency: Queue Processing, Part 1

The Queue as Monitor

Every Java class extends the Object class by default. There are several built-in methods on this class, of which we’ll focus on wait()notify(), and notifyAll(). These methods operate on the object’s monitor, which is used to synchronize operations across a group of threads. There are some rules to follow, but it’s really quite simple: The waiting thread needs to own the object’s monitor, which means it must call wait() within the code that synchronizes on the monitor. There are three ways of doing this, reference the Java API for more details. Here’s one example:

1
2
3
4
5
6
synchronized (obj) {
     while ( condition ) {
         obj.wait();
         // perform some action
     }
}

Conversely, the thread that signals the waiting thread(s) will call notify()or notifyAll() on the same monitor object. It, too, needs to own the monitor to do this, as in this example:

1
2
3
synchronized (obj) {
    obj.notify();
}

Calling notify() wakes up a single thread waiting on the monitor, while notifyAll() wakes up all of the threads waiting on the monitor. In general, calling notify()is more efficient as it results in fewer thread context switches, but there are times where notifyAll() may be a better choice. For instance, when work is produced at a high rate, it may be more efficient to wake as many threads as possible each time work is placed on the queue. This is mainly an implementation decision that you need to make.

Multithreaded Queue Example

Let’s examine a common scenario in which tasks are executed by worker threads, and where the work is coordinated via a queue to ensure once-and-only-once execution of each task. In this case, we’ll use the queue (itself a Java Object) as the monitor as well. Here’s the overall design:

  • Create a queue to act as a monitor and a way to transfer work
  • Create a pool of worker threads (consumers) that each wait on the monitor (the queue)
  • Create one or more producers that place items on the queue
  • Notify the monitor when an item is placed on the queue, which in turn wakes up a worker to pull the next item off the queue

Let’s start by looking at the worker’s class, MyWorker, which itself extends Thread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class MyWorker extends Thread {
    private static int instance = 0;
    private final Queue<String> queue;
    
    public MyWorker(Queue queue) {
        this.queue = queue;
        setName("MyWorker:" + (instance++));
    }
    
    @Override
    public void run() {
        while ( true ) {
            try {
                Runnable work = null;
                synchronized ( queue ) {
                    while ( queue.isEmpty() )
                        queue.wait();
                    
                    // Get the next work item off of the queue
                    work = queue.remove();
                }
                // Process the work item
                work.run();
            }
            catch ( InterruptedException ie ) {
                break// Terminate
            }
        }
    }
    private void doWork(Runnable work) { ... }
}

The code that creates a pool of MyWorker objects, which is essentially a Thread pool, looks like this:

1
2
3
4
5
6
7
for ( int i = 0; i < THREAD_POOL_SIZE; i++ ) {
    // Create and start the worker thread, which will
    // immediately wait on the monitor (the queue in
    // this case) to be signaled to begin processing
    MyWorker worker = new MyWorker(queue);
    worker.start();
}

The code to place work on the queue and signal a worker thread looks like this:

1
2
3
4
5
6
7
8
9
10
synchronized ( queue ) {
    // Add work to the queue
    Runnable work = getMoreWork();
    queue.add(work);
    // Notify the monitor object that all the threads
    // are waiting on. This will awaken just one to
    // begin processing work from the queue
    queue.notify();
}

This code illustrates how straightforward basic Java thread synchronization is, although it is low-level. In Part 2 of this series, we’ll look at the java.util.concurrent package and how it contains built-in classes to make synchronized queue and task processing even simpler.

Mapping Java 5 enums with Hibernate

Hibernate is used to map java objects to a relational database. Ideally, you would like your java object model to be as object-oriented as possible. It is also nice to be able to use features like java 5 enums while coding the object model. In this tutorial, I’ll share with you how I ended up mapping a simple object model using java 5 enums into Hibernate.

I’ve found the best way to jumpstart yourself into using Hibernate is to go through the official tutorial on the Hibernate site.

A Simple Example

I have a Beer object that corresponds directly to a BEER table in a database. The Beer object can wither be an “Ale” or a “Lager”, and there may be more types in the future. So the database schema looks like this:

Simple Beer schema
+--------------------+
| BEER               |
+--------------------+           +----------------+
| NUMBER  | ID       |           | BEER_TYPE      |
| VARCHAR | BRAND    |           +----------------+
| NUMBER  | TYPE     | FK------- | NUMBER  | ID   |
| NUMBER  | VOLUME   |           | VARCHAR | NAME |
+--------------------+           +----------------+

It is pretty straightforward how to create a java class to represent this data. Ideally, I would like to use a java 5 enum object to keep track of the beer type, so I use it within my data object to specify type:

Beer Data Object
public class Beer {
    private BeerType type;
    private String brand;
    private double volume;

    public BeerType getType() {
        return type;
    }

    public void setType(BeerType type) {
        this.type = type;
    }

    public String getBrand() {
        return brand;
    }

    public void setBrand(String brand) {
        this.brand = brand;
    }

    public double getVolume() {
        return volume;
    }

    public void setVolume(double volume) {
        this.volume = volume;
    }
}

The BeerType enum will simply specify the type of beer for each Beer object. This enum is going to map directly to the BEER_TYPE table. So I’ve given each beer type its own id through the private internal enum constructor. There is an accessor method for hibernate to retrieve the id when it needs to do the mapping.

BeerType enum
public enum BeerType {
UNKNOWN(0), ALE(1), LAGER(2);private int id;private BeerType(int id) {
this.id = id;
}

public int getId() {
return id;
}

}

One of the first things I found when I started searching the internet for ways to map Java 5 enums to a database using Hibernate was this article on the Hibernate page. It shows an implementation of the Hibernate UserType in a way that will allow an enum to be used within a hibernate key mapping. So if I create a custom type definition in the key-mapping that uses the GenericEnumUserType class as the class, and passes in the actual BeerType enum classpath as a parameter, I can do the work to break out the enum details in the GenericEnumUserType implementation.

Hibernate key-mapping for Beer
<hibernate-mapping>
    <typedef name="beerType">
        <param name="enumClassName">net.dangertree.BeerType</param>
        <param name="identifierMethod">getId</param>
    </typedef>

    <class name="net.dangertree.Beer" table="BEER">
        <id name="id" type="int" column="ID">
            <generator/>
        </id>
        <property name="type" type="beerType" column="TYPE"/>
        <property name="brand" type="string" column="BRAND"/>
        <property name="volumn" type="double" column="VOLUME"/>
    </class>
</hibernate-mapping>

Here is the full code for the GenericEnumUserType class that I used to help map my enums into Hibernate. Reflection is used to find an “identifier” method in the enum that will get the id for each value. I have specified in the type-def in my key mapping that I’m going to use the getId() method of my enum to get an identifying attribute. By default, the class will look for the inherent name method of enum, but I’m using ids as my identifier.

The two method objects are grabbed and saved, then called in the null-safe setter and getter deeper within the Hibernate workings.

This class implementation of UserType is not exactly the same as any of those listings on the hibernate article I mentioned earlier. I had to make several minor changes to get it working with this code.
Generic Enum UserType implementation
import org.hibernate.HibernateException;
import org.hibernate.type.NullableType;
import org.hibernate.type.TypeFactory;
import org.hibernate.usertype.ParameterizedType;
import org.hibernate.usertype.UserType;import java.io.Serializable;
import java.lang.reflect.Method;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;public class GenericEnumUserType implements UserType, ParameterizedType {
private static final String DEFAULT_IDENTIFIER_METHOD_NAME = “name”;
private static final String DEFAULT_VALUE_OF_METHOD_NAME = “valueOf”;

private Class enumClass;
private Class identifierType;
private Method identifierMethod;
private Method valueOfMethod;
private NullableType type;
private int[] sqlTypes;

public void setParameterValues(Properties parameters) {
String enumClassName = parameters.getProperty(”enumClassName”);
try {
enumClass = Class.forName(enumClassName).asSubclass(Enum.class);
} catch (ClassNotFoundException cfne) {
throw new HibernateException(”Enum class not found”, cfne);
}

String identifierMethodName = parameters.getProperty(”identifierMethod”,
DEFAULT_IDENTIFIER_METHOD_NAME);

try {
identifierMethod = enumClass.getMethod(identifierMethodName,
new Class[0]);
identifierType = identifierMethod.getReturnType();
} catch (Exception e) {
throw new HibernateException(”Failed to obtain identifier method”,
e);
}

type = (NullableType) TypeFactory.basic(identifierType.getName());

if (type == null)
throw new HibernateException(”Unsupported identifier type ”
+ identifierType.getName());

sqlTypes = new int[] { type.sqlType() };

String valueOfMethodName = parameters.getProperty(”valueOfMethod”,
DEFAULT_VALUE_OF_METHOD_NAME);

try {
valueOfMethod = enumClass.getMethod(valueOfMethodName,
new Class[] { identifierType });
} catch (Exception e) {
throw new HibernateException(”Failed to obtain valueOf method”, e);
}
}

public Class returnedClass() {
return enumClass;
}

public Object nullSafeGet(ResultSet rs, String[] names, Object owner)
throws HibernateException, SQLException {
Object identifier = type.get(rs, names[0]);
if (rs.wasNull()) {
return null;
}

try {
return valueOfMethod.invoke(enumClass, new Object[] { identifier });
} catch (Exception e) {
throw new HibernateException(”Exception while invoking ”
+ “valueOf method ‘” + valueOfMethod.getName() + “‘ of ”
+ ”enumeration class ‘” + enumClass + “‘”, e);
}
}

public void nullSafeSet(PreparedStatement st, Object value, int index)
throws HibernateException, SQLException {
try {
if (value == null) {
st.setNull(index, type.sqlType());
} else {
Object identifier = identifierMethod.invoke(value,
new Object[0]);
type.set(st, identifier, index);
}
} catch (Exception e) {
throw new HibernateException(”Exception while invoking ”
+ “identifierMethod ‘” + identifierMethod.getName() + “‘ of ”
+ ”enumeration class ‘” + enumClass + “‘”, e);
}
}

public int[] sqlTypes() {
return sqlTypes;
}

public Object assemble(Serializable cached, Object owner)
throws HibernateException {
return cached;
}

public Object deepCopy(Object value) throws HibernateException {
return value;
}

public Serializable disassemble(Object value) throws HibernateException {
return (Serializable) value;
}

public boolean equals(Object x, Object y) throws HibernateException {
return x == y;
}

public int hashCode(Object x) throws HibernateException {
return x.hashCode();
}

public boolean isMutable() {
return false;
}

public Object replace(Object original, Object target, Object owner)
throws HibernateException {
return original;
}
}

Ok, there is only one problem now. The DEFAULT_VALUE_OF_METHOD_NAME method is specified by default to be valueOf, and I have not provided a parameter to override that in the type-def. So in order for my enum to match this, I need to add a valueOf method to my enum. And when the nullSafeGet method calls the “valueOfMethod“, it sends it an identifier as a parameter, so our valueOf method needs to take an int id as a parameter.

Modified BeerType enum
public enum BeerType {
UNKNOWN(0), ALE(1), LAGER(2);private int id;private BeerType(int id) {
this.id = id;
}

public int getId() {
return id;
}

public static BeerType valueOf(int id) {
switch (id) {
case 1: return ALE;
case 2: return LAGER;
default: return UNKNOWN;
}
}

}

Now I have my data object, the enum it is using, my key-mapping, and an implementation of UserType that provides a way to access the enum through a type-def element in the key-mapping.

XML Transformation

JDOM Way:

TransformerFactory tFactory = TransformerFactory.newInstance();
SAXBuilder builder = new SAXBuilder();

Document myDocument =  builder.build(new File(“path to xml File”);

// Make the input sources for the XML and XSLT documents
org.jdom.output.DOMOutputter outputter = new org.jdom.output.DOMOutputter();
JDOMSource source = new JDOMSource(myDocument);
JDOMResult result = new JDOMResult();

File xsltFile = new File(“C:/dev/kd/xml/data.xsl”);

StreamSource xsltSource =
new StreamSource(new FileInputStream(xsltFile));

FileOutputStream out = new FileOutputStream(new File(“path to output”);
// Get a XSLT transformer
Transformer transformer = tFactory.newTransformer(xsltSource);

// Do the transform
transformer.transform(source, result);
Document outDoc = result.getDocument();
XMLOutputter outp = new XMLOutputter();
outp.output(outDoc,out);

JDK javax.xml package:

TransformerFactory tFactory = TransformerFactory.newInstance();

File xsltFile = new File(“C:/dev/kd/xml/data.xsl”);

Templates template = tFactory.newTemplates(new StreamSource(
new FileInputStream(xsltFile)));
Transformer transformer = template.newTransformer();

// Do the transform
Source source = new StreamSource(new BufferedInputStream(new FileInputStream(new File(“Path to xml source”)));
Result result = new StreamResult(new FileOutputStream(new File(“path to output”));
Result printResult = new StreamResult(System.out);

transformer.transform(source, result);
source = new StreamSource(new FileInputStream(xmlFile));
transformer.setOutputProperty(OutputKeys.INDENT, “yes”);
transformer.transform(source, printResult);

Java Enum

Java enum can be really tricky like this:

package org.kd.examples;

public class EnumEx
{

private enum Column {
A(“a”,1)
{
public void execute(String value)
{
setExpression(“This is ” + A.title+ ” with ” + value);
};
},

B(“b”,2)
{
public void execute(String value)
{
setExpression(“This is ” + B.title+ ” with ” + value);
};
},

C(“c”,3)
{
public void execute(String value)
{
setExpression(“This is ” + C.title+ ” with ” + value);
};
};

public abstract void execute(String value);

private String title;
private int index;
private String expression;

public void setExpression(String value)
{
expression = value;
}
Column(String str, int i)
{
title = str;
index = i;
}

public String getExpression(){ return expression;}
}

public static Column getColumn(int i)
{
for(Column c : Column.values())
{
if(c.index == i)
return c;
}
return null;
}

public static void main(String argv[])
{
StringBuilder desc = new StringBuilder();

for(int i= 1 ; i < 4; i++)
{
Column c = getColumn(i);
c.execute(String.valueOf(i +4));
}

for(Column c: Column.values())
System.out.println(c.getExpression());
}

}