Understanding Synchronize In Java
The Problem
A lot of us misunderstand synchronized
in java. The way this can be used (as a block or as keyword to tag a function) - also makes it confusing.
A Dumb Unbounded Producer
Lets say you were to create a simple producer, no bounds - just a simple producer which keeps on writing integers onto a shared queue.
class Producer {
Queue<Integer> sharedMessages;
Producer(Queue<Integer> sharedMessages) {
this.sharedMessages = sharedMessages;
}
}
As shown above, we initialize the constructor with a sharedMessages
object. We purposedly use a vanilla Queue
(no fancy concurrent data-structure)
We’d need this as a runnable, so lets use the Runnable
interface, and define our run()
method
class Producer implements Runnable {
Queue<Integer> sharedMessages;
Producer(Queue<Integer> sharedMessages) {
this.sharedMessages = sharedMessages;
}
public void run(){
int i = 0;
while(true){
produce(i++);
}
}
We’d probably also need a produce()
method which adds these messages to the sharedMessages
:
public void produce(Integer i) {
this.sharedMessages.add(i);
}
A Dumb Unbounded Consumer
With a similar idea, we create an unbounded consumer:
class Consumer implements Runnable{
Queue<Integer> sharedMessages;
Consumer(Queue<Integer> sharedMessages) {
this.sharedMessages = sharedMessages;
}
public void consume() {
System.out.println(sharedMessages.remove());
}
public void run(){
while(sharedMessages.size() > 0){
consume();
}
}
}
Next, lets create a shared object for the messages, a single producer thread and multiple consumer threads.
// Initialize the shared object
Queue<Integer> sharedMessages = new LinkedList<>();
// Initialize a producer
new Thread(new Producer(sharedMessages)).start();
// Initialize many consumers running parallely
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
You’d definitely see some errors thrown (If you don’t see any errors, wait - or even better - start more consumer threads) :
47110
47111
47112
47086
null
47083
Exception in thread "Thread-4" Exception in thread "Thread-3" Exception in thread "Thread-2" Exception in thread "Thread-1" java.util.NoSuchElementException
at java.util.LinkedList.removeFirst(LinkedList.java:270)
A Smarter Producer-Consumer
Smarter Consumer
The first instinct upon seeing the error message is to synchronize the consume()
method.
public synchronized void consume() {
System.out.println(sharedMessages.remove());
}
That should do it, right? After all, adding synchronize
means only one instance can mess around with the method. However, you’d still see the same error happening.
The reason being the sharedMessages.size() > 0
within the run()
method - what’s happening is, Thread A calls consume()
after seeing a size of 1, and at the same time Thread B sees a size of 1, and while Thread A was doing a remove, Thread B was a nanosecond late, and called a remove() too. Too sad, there were no elements, hence the error.
The next natural thing to do is to try adding a synchronize
on the run()
method. But that sounds weird - well it’s not only unconventional, it’s actually self-contradictory. You want concurrency, you want multiple threads calling their run()
method concurrently - and marking run() synchronized
would prevent it from doing it. Instead, you’d adopt a more conventional approach, and do something like this:
public void run(){
synchronized (this){
while(sharedMessages.size() > 0){
consume();
}
}
}
To our dismay, we still see the dreaded error. The reason is yet another intricacy around the use of synchronized
-
I think a lot of tutorials on concurrency miss this point largely. There are two problems here.
-
Un-synchronized method called from a synchronised block : We’re trying to call
consume()
from a synchronized block. Ourconsume()
isn’t synchronized. It’s not either/or. If you’re synchronising, you have to explcitly synchronize any methods you’re calling from within the synchronised block. -
What are you synchronizing on? : The problem here is that we’re trying to synchronize on
this
- the instance of the class. When usingsynchronized
, you need an object as a monitor lock. Only threads using the same monitor lock will be synchronized.
It is important that same object is shared across different threads in order for them to synchronize correctly.
Thus, we want to ensure that our synchronize is over the shared resource. If you’re passing this
- that means you want to ensure that only one thread has access to this class instance at a time. But that also means that same class object should be passed to those threads.
Recall here that we are passing separate consumer objects, effectivelly rendering the synchronize(this)
block useless.
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
. . .
To sum our discussion, we need to synchronize here on sharedMessages
object.
synchronized (sharedMessages){
while(sharedMessages.size() > 0){
consume();
}
}
You shouldn’t see the dreaded error anymore! We finally have a smarter producer-consumer:
class Consumer implements Runnable{
Queue<Integer> sharedMessages;
Consumer(Queue<Integer> sharedMessages) {
this.sharedMessages = sharedMessages;
}
public void consume() {
synchronized (sharedMessages){
System.out.println(sharedMessages.remove());
}
}
@Override
public void run(){
synchronized (sharedMessages){
while(sharedMessages.size() > 0){
consume();
}
}
}
}
We follow a smilar strategy for Producer:
class Producer implements Runnable {
Queue<Integer> sharedMessages;
Integer i = 0;
Producer(Queue<Integer> sharedMessages) {
this.sharedMessages = sharedMessages;
}
public void produce(Integer i) {
synchronized (sharedMessages){
this.sharedMessages.add(i);
}
}
public void run(){
synchronized (sharedMessages) {
while (i < 10000) {
produce(i++);
}
}
}
}
Testing the Smarter Consumer
Lets also check whether our consumer is actually using multiple threads. We can do this by making a small change -
public void run(){
synchronized (sharedMessages){
while(sharedMessages.size() > 0){
consume();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + ": I was interrupted from my sleep!");
}
}
}
}
Running this code would result in something like this:
0 consumed by Thread-1
1 consumed by Thread-1
2 consumed by Thread-1
3 consumed by Thread-1
...
9996 consumed by Thread-1
9997 consumed by Thread-1
9998 consumed by Thread-1
9999 consumed by Thread-1
Even with a thread.sleep, one thread acquires the lock, and doesn’t release it. While the current thread is sleeping, it’d make sense for other threads to be able to consume messages, right?
Seeing is Believing
It’s important here to note here asking thread to sleep, you don’t affect the change in the monitor on which the lock is held on - sharedMessages
. A small change and the use of wait()
makes a huge difference:
@Override
public void run(){
synchronized (sharedMessages){
while(sharedMessages.size() > 0){
consume();
try {
sharedMessages.wait(5);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + ": I was interrupted from my sleep!");
}
sharedMessages.notifyAll();
}
}
}
0 consumed by Thread-1
1 consumed by Thread-2
2 consumed by Thread-3
3 consumed by Thread-4
4 consumed by Thread-1
5 consumed by Thread-2
Final Words
It’s important to understand how threads work behind the scenes, and how the keyword synchronised works is fundamental towards it. So is understanding wait()
and notify()
.
import java.util.LinkedList;
import java.util.Queue;
class Producer implements Runnable {
Queue<Integer> sharedMessages;
Integer i = 0;
Producer(Queue<Integer> sharedMessages) {
this.sharedMessages = sharedMessages;
}
public void produce(Integer i) {
synchronized (sharedMessages){
System.out.println("Producing message " + i);
this.sharedMessages.add(i);
try {
sharedMessages.wait(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void run(){
synchronized (sharedMessages) {
while (i < 100) {
produce(i++);
}
}
}
}
class Consumer implements Runnable{
Queue<Integer> sharedMessages;
Consumer(Queue<Integer> sharedMessages) {
this.sharedMessages = sharedMessages;
}
public void consume() {
synchronized (sharedMessages) {
if (sharedMessages.size() > 0) {
System.out.println(sharedMessages.remove() + " consumed by " + Thread.currentThread().getName().toString());
}
}
}
@Override
public void run(){
synchronized (sharedMessages){
while(true){
try {
consume();
sharedMessages.wait(1);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + ": I was interrupted from my sleep!");
}
}
}
}
}
public class Main {
public static void main(String[] args) {
Queue<Integer> sharedMessages = new LinkedList<>();
new Thread(new Producer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
new Thread(new Consumer(sharedMessages)).start();
}
}
I highly suggest you to take a look at the Intrinsic Locks and Synchronization
Feeling generous ? Help me write more blogs like this :)