The Problem

A lot of us misunderstand synchronized in java. The way this can be used (as a block or as keyword to tag a function) - also makes it confusing.

A Dumb Unbounded Producer

Lets say you were to create a simple producer, no bounds - just a simple producer which keeps on writing integers onto a shared queue.

class Producer {

    Queue<Integer> sharedMessages;
    Producer(Queue<Integer> sharedMessages) {
        this.sharedMessages = sharedMessages;
    }

}

As shown above, we initialize the constructor with a sharedMessages object. We purposedly use a vanilla Queue (no fancy concurrent data-structure)

We’d need this as a runnable, so lets use the Runnable interface, and define our run() method

class Producer implements Runnable {

    Queue<Integer> sharedMessages;
    Producer(Queue<Integer> sharedMessages) {
        this.sharedMessages = sharedMessages;
    }

    public void run(){
        int i = 0;
        while(true){
            produce(i++);
        }
    }

We’d probably also need a produce() method which adds these messages to the sharedMessages :

    public void produce(Integer i) {
        this.sharedMessages.add(i);
    }

A Dumb Unbounded Consumer

With a similar idea, we create an unbounded consumer:

class Consumer implements Runnable{

    Queue<Integer> sharedMessages;
    Consumer(Queue<Integer> sharedMessages) {
        this.sharedMessages = sharedMessages;
    }

    public void consume() {
        System.out.println(sharedMessages.remove());
    }

    public void run(){
        while(sharedMessages.size() > 0){
            consume();
        }
    }
}

Next, lets create a shared object for the messages, a single producer thread and multiple consumer threads.

// Initialize the shared object
Queue<Integer> sharedMessages = new LinkedList<>();

// Initialize a producer
new Thread(new Producer(sharedMessages)).start();

// Initialize many consumers running parallely
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();

You’d definitely see some errors thrown (If you don’t see any errors, wait - or even better - start more consumer threads) :

47110
47111
47112
47086
null
47083
Exception in thread "Thread-4" Exception in thread "Thread-3" Exception in thread "Thread-2" Exception in thread "Thread-1" java.util.NoSuchElementException
	at java.util.LinkedList.removeFirst(LinkedList.java:270)

A Smarter Producer-Consumer

Smarter Consumer

The first instinct upon seeing the error message is to synchronize the consume() method.

public synchronized void consume() {
    System.out.println(sharedMessages.remove());
}

That should do it, right? After all, adding synchronize means only one instance can mess around with the method. However, you’d still see the same error happening.

The reason being the sharedMessages.size() > 0 within the run() method - what’s happening is, Thread A calls consume() after seeing a size of 1, and at the same time Thread B sees a size of 1, and while Thread A was doing a remove, Thread B was a nanosecond late, and called a remove() too. Too sad, there were no elements, hence the error.

The next natural thing to do is to try adding a synchronize on the run() method. But that sounds weird - well it’s not only unconventional, it’s actually self-contradictory. You want concurrency, you want multiple threads calling their run() method concurrently - and marking run() synchronized would prevent it from doing it. Instead, you’d adopt a more conventional approach, and do something like this:

public void run(){
    synchronized (this){
        while(sharedMessages.size() > 0){
            consume();
        }
    }
}

To our dismay, we still see the dreaded error. The reason is yet another intricacy around the use of synchronized -

I think a lot of tutorials on concurrency miss this point largely. There are two problems here.

  1. Un-synchronized method called from a synchronised block : We’re trying to call consume() from a synchronized block. Our consume() isn’t synchronized. It’s not either/or. If you’re synchronising, you have to explcitly synchronize any methods you’re calling from within the synchronised block.

  2. What are you synchronizing on? : The problem here is that we’re trying to synchronize on this - the instance of the class. When using synchronized, you need an object as a monitor lock. Only threads using the same monitor lock will be synchronized.

It is important that same object is shared across different threads in order for them to synchronize correctly.

Thus, we want to ensure that our synchronize is over the shared resource. If you’re passing this - that means you want to ensure that only one thread has access to this class instance at a time. But that also means that same class object should be passed to those threads.

Recall here that we are passing separate consumer objects, effectivelly rendering the synchronize(this) block useless.

new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
. . .

To sum our discussion, we need to synchronize here on sharedMessages object.

synchronized (sharedMessages){
    while(sharedMessages.size() > 0){
        consume();
    }
}

You shouldn’t see the dreaded error anymore! We finally have a smarter producer-consumer:

class Consumer implements Runnable{

    Queue<Integer> sharedMessages;
    Consumer(Queue<Integer> sharedMessages) {
        this.sharedMessages = sharedMessages;
    }

    public void consume() {
        synchronized (sharedMessages){
            System.out.println(sharedMessages.remove());
        }
    }

    @Override
    public void run(){
        synchronized (sharedMessages){
            while(sharedMessages.size() > 0){
                consume();
            }
        }
    }
}

We follow a smilar strategy for Producer:

class Producer implements Runnable {

    Queue<Integer> sharedMessages;
    Integer i = 0;
    Producer(Queue<Integer> sharedMessages) {
        this.sharedMessages = sharedMessages;
    }

    public void produce(Integer i) {
        synchronized (sharedMessages){
            this.sharedMessages.add(i);
        }
    }

    public void run(){
        synchronized (sharedMessages) {
            while (i < 10000) {
                produce(i++);
            }
        }
    }
}

Testing the Smarter Consumer

Lets also check whether our consumer is actually using multiple threads. We can do this by making a small change -

public void run(){
    synchronized (sharedMessages){
        while(sharedMessages.size() > 0){
            consume();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + ": I was interrupted from my sleep!");
            }
        }
    }
}

Running this code would result in something like this:

0 consumed by Thread-1
1 consumed by Thread-1
2 consumed by Thread-1
3 consumed by Thread-1
...
9996 consumed by Thread-1
9997 consumed by Thread-1
9998 consumed by Thread-1
9999 consumed by Thread-1

Even with a thread.sleep, one thread acquires the lock, and doesn’t release it. While the current thread is sleeping, it’d make sense for other threads to be able to consume messages, right?

Seeing is Believing

It’s important here to note here asking thread to sleep, you don’t affect the change in the monitor on which the lock is held on - sharedMessages. A small change and the use of wait() makes a huge difference:

@Override
public void run(){
    synchronized (sharedMessages){
        while(sharedMessages.size() > 0){
            consume();
            try {
                sharedMessages.wait(5);
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + ": I was interrupted from my sleep!");
            }
            sharedMessages.notifyAll();
        }
    }
}
0 consumed by Thread-1
1 consumed by Thread-2
2 consumed by Thread-3
3 consumed by Thread-4
4 consumed by Thread-1
5 consumed by Thread-2

Final Words

It’s important to understand how threads work behind the scenes, and how the keyword synchronised works is fundamental towards it. So is understanding wait() and notify().


import java.util.LinkedList;
import java.util.Queue;

class Producer implements Runnable {

    Queue<Integer> sharedMessages;
    Integer i = 0;
    Producer(Queue<Integer> sharedMessages) {
        this.sharedMessages = sharedMessages;
    }

    public void produce(Integer i) {
        synchronized (sharedMessages){
            System.out.println("Producing message " + i);
            this.sharedMessages.add(i);
            try {
                sharedMessages.wait(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void run(){
        synchronized (sharedMessages) {
            while (i < 100) {
                produce(i++);
            }
        }
    }
}

class Consumer implements Runnable{

    Queue<Integer> sharedMessages;
    Consumer(Queue<Integer> sharedMessages) {
        this.sharedMessages = sharedMessages;
    }

    public void consume() {
        synchronized (sharedMessages) {
            if (sharedMessages.size() > 0) {
                System.out.println(sharedMessages.remove() + " consumed by " + Thread.currentThread().getName().toString());
            }
        }
    }

    @Override
    public void run(){
        synchronized (sharedMessages){
            while(true){
                try {
                    consume();
                    sharedMessages.wait(1);
                } catch (InterruptedException e) {
                    System.out.println(Thread.currentThread().getName() + ": I was interrupted from my sleep!");
                }
            }
        }
    }
}

public class Main {
    public static void main(String[] args) {
        Queue<Integer> sharedMessages = new LinkedList<>();

        new Thread(new Producer(sharedMessages)).start();

        new Thread(new Consumer(sharedMessages)).start();
        new Thread(new Consumer(sharedMessages)).start();
        new Thread(new Consumer(sharedMessages)).start();
        new Thread(new Consumer(sharedMessages)).start();
        new Thread(new Consumer(sharedMessages)).start();
        new Thread(new Consumer(sharedMessages)).start();
    }

}

I highly suggest you to take a look at the Intrinsic Locks and Synchronization


Feeling generous ? Help me write more blogs like this :)