Six Ways to Handle Concurrency in the JVM

Written by

In this article, we provide a quick overview of different ways to handle concurrency in the JVM, with a basic solution to the producer-consumer problem.

What is Concurrency?

Concurrency is the execution of multiple tasks to achieve a goal, which may or may not execute simultaneously (that’s the difference with parallelism). This involves several challenges such as race conditions (multiple tasks access shared data and try to change it at the same time), memory consistency (multiple tasks have inconsistent views of what should be the same data), and deadlocks (multiple tasks block each other, each waiting to acquire a resource held by some other).

Every day the alternatives around the JVM become wider and nowadays there are several languages and frameworks that take advantage of the support the JVM offers and provide alternatives to tackle this kind of problem.

A common problem that involves concurrency is the producer-consumer, both are separate processes that share a common queue of data. The producer generates data and adds it to the common queue and the consumer takes the data from it. There could be several producers and consumers.

Using this problem as a base we are going to do a quick overview of several existing approaches to solve it.

1. Java Threads

Java’s concurrency basic structure is the Thread, a Thread requires defining a Runnable which contains the task’s logic that will be executed.

Also for the shared queue of the processes Java provides a BlockingQueue interface that is thread-safe (multiple threads can add and remove elements without concurrency issues), BlockingQueue provides two main methods: put() and take().

Its put() method blocks the calling thread if the queue is full. Similarly, if the queue is empty, its take() method blocks the calling thread.

Using this approach our producer implementation looks like this, our producer will execute indefinitely adding random values to the queue (using the put method) and notifying it in the console.

We will have 10 producers, the start() method call after the new thread declaration will start the producer execution:

BlockingQueue<String> queue = new LinkedBlockingQueue<>();
Random random = new Random();

IntStream.range(1, 11).forEach(id -> {
   Runnable producer = (() -> {
   	while (true) {
       	try {
           	Thread.sleep(getRandomTime(random));

           	var value = RandomStringUtils.randomAlphanumeric(10);
           	queue.put(value);
           	System.out.println("producer " + id + " producing " + value);
       	} catch (InterruptedException e) {
           	System.out.println("producer " + id + " error producing value " + e.getMessage());
       	}
   	}
   });

   var task = new Thread(producer);
   task.start();
});

In a similar fashion our 10 consumers will be the following:

IntStream.range(1, 11).forEach(id -> {
   Runnable consumer = (() -> {
   	while (true) {
       	try {
           	Thread.sleep(getRandomTime(random));

           	var value = queue.take();
           	System.out.println("consumer " + id + " consuming value " + value);
       	} catch (InterruptedException e) {
           	System.out.println("consumer " + id + " error consuming value " + e.getMessage());
       	}
   	}
   });

   var task = new Thread(consumer);
   task.start();
});

A thread creation is an expensive process (it spawns an OS thread), also creating too many threads will affect performance due to context switching (the JVM stops processing one thread to start processing another, this requires storing the stopped thread data for future resuming).

To improve this situation there are the thread pools, a group of pre-allocated threads that will be reused. Thread pools can be created using the ExecutorService, in our scenario a FixedThreadPool is useful due we have a fixed number of long-running tasks.

ExecutorService executorService = Executors.newFixedThreadPool(20)

After including the thread pool our producer will look a little bit different:

Runnable producer = (() -> {
   while (true) {
   	try {
       	Thread.sleep(getRandomTime(random));

       	var value = RandomStringUtils.randomAlphanumeric(10);
       	queue.put(value);
       	System.out.println("producer " + id + " producing " + value);
   	} catch (InterruptedException e) {
       	System.out.println("producer " + id + " error producing value " + e.getMessage());
   	}
   }
});

executorService.execute(producer);

2. Scala Futures

Scala provides its own approach to handle concurrent tasks called Futures, which are  placeholder objects for a value that may not yet exist. The advantage of the Futures over the threads is that they can be composed (through combinators like flatMap, foreach, filter) in a non-blocking way.

Futures require an ExecutionContext which is supported by traditional Java thread pools, for example in this approach we could declare an execution context in the following way:

implicit val executionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(20))

Creating a Future is an eager operation, it means it is immediately sent to an execution context to be executed. On each future creation or execution of map, flatMap, etc… a new task is being sent to an execution context to be scheduled.

Using BlockingQueue as the process queue our producer will look as the following:

def producer(id: Long, queue: BlockingQueue[String]): Future[Unit] = Future {
 produce(id, queue)
}

@tailrec
def produce(id: Long, queue: BlockingQueue[String]): Unit = {
 Thread.sleep(getRandomTime)

 val value = RandomStringUtils.randomAlphanumeric(10)
 queue.put(value)
 println("producer " + id + " producing " + value)

 produce(id, queue)
}

Our producer will be a Future that executes the producer logic in a recursive fashion (instead of an infinite loop like the previous approach), our consumer will follow the same structure.

3. Cats-Effect Fibers

cats-effect is a fully-fledged runtime platform to execute parallel and concurrent computations.

One important cats-effect data type is IO, it is used for encoding side effects as pure values, capable of expressing both synchronous and asynchronous computations, it describes a chain of computations that will be executed.

Opposed to Futures, creating an IO does not send any task in a threadpool, manipulating IO means we are manipulating values while manipulating Futures means we are manipulating already running/enqueued computations.

IOs are executed on top of Fibers which are the fundamental concurrency primitive in cats-effect, they are lightweight threads (about 150 bytes per fiber), which allows to create about tens of millions of fibers if needed.

One advantage of the Fibers over the Futures is that they can be canceled, by default, fibers are cancelable at all points during their execution.

cats-effect has three independent thread pools to evaluate programs:

  • A work-stealing pool for computations: when a thread is idle, it will try to steal tasks from other threads in a random fashion.
  • A single-threaded schedule dispatcher: Dispatches sleeps with high precision.
  • An unbounded blocking pool: defaulting to zero Threads and allocating as-needed for blocking operations.

cats-effect includes the concept of semantic blocking, which means that a fiber that is blocked doesn’t block its related thread, instead the fiber gets descheduled giving the possibility to another fiber to run in one of the available threads of the pool.

cats-effect also provides a purely-functional concurrent implementation of a queue, this queue blocks the taking fiber when the queue is empty. It can be defined with different policies according to the desired behavior of offer fiber when the queue has reached capacity:

  • bounded: offer fiber blocks when the queue is full.
  • synchronous: offer and take fibers block until another fiber invokes the opposite action.
  • unbounded: offer fiber never blocks.
  • dropping: offer fiber never blocks but new elements are discarded if the queue is full.
  • circularBuffer: offer fiber never blocks but the oldest elements are discarded in favor of new elements when the queue is full.

In cats-effect our provider will look like the following; the producer is invoked recursively and adds elements to the queue through its offer method (our consumer will have a very similar structure, obtaining elements from the queue through its take method), both producer and consumer will have an IO as a return type:

def producer(id: Int, queue: Queue[IO, String]): IO[Unit] = {
 for {
   _ <- IO.sleep(getRandomTime)

   value = RandomStringUtils.randomAlphanumeric(10)
   _ <- queue.offer(value)
   _ <- IO.println(s"producer $id producing $value")

   _ <- producer(id, queue)
 } yield ()
}

For our scenario we will have an unbounded queue and we will define 10 producers and consumers, the parSequence method takes the declared consumer and producer IOs and starts a fiber to execute each one of them concurrently:

for {
 queue <- Queue.unbounded[IO, String]
 producers = List.range(1, 11).map(producer(_, queue))
 consumers = List.range(1, 11).map(consumer(_, queue))
 res <-
   (producers ++ consumers)
 	.parSequence
 	.as(ExitCode.Success)
} yield res

4. Kotlin Coroutines

Kotlin's concurrency approach is the coroutines, they are suspendable computations (functions can suspend their execution at some point and resume later on) that are executed independently of other blocks of code.

Coroutines can be blocked using the delay function, in a similar fashion to cats-effect this is a semantic blocking; it frees up the current Thread to do something else and then the rest of the coroutine is executed on some other Thread.

Kotlin coroutines runtime requires a coroutine context also called continuation, this is a data structure that stores all local context until the coroutine was blocked (it can be blocked several times) and then can be resumed in the next thread scheduled to continue the execution.

To run several suspend functions in sequence a coroutine scope is used, this will start a context for coroutines. Inside a coroutine scope, all the contained suspend functions are executed in sequence unless they are contained inside a launch block, and a launch block will essentially start a new coroutine that will execute in parallel.

As the shared data structure, Kotlin offers the channels, a channel is conceptually similar to a queue. One or more coroutines can write or read to/from a channel. A channel has a suspending send function and a suspending receive function.

There are four types of channels:

  • Rendezvous Channel: The channel has no buffer.
  • Buffered Channel: Has a predefined buffer, it can hold values in the buffer even if no receivers are receiving this value at the moment, but a coroutine must wait (suspend) before writing more values to the channel if the buffer is full.
  • Unlimited Channel: Has a buffer of unlimited capacity.
  • Conflated Channel: The most recently written value overrides the previously written value.

Using coroutines our provider will look like the following, the producer indefinitely will send values to the channel, and the random delay will allow the semantic blocking of the producer coroutines. The consumer will have a very similar structure, obtaining data from the channel using the receive method:

suspend fun producer(id: Int, queue: SendChannel<String>) {
   while (true) {
   	delay(getRandomTime())

   	val value = RandomStringUtils.randomAlphanumeric(10)
   	println("producer $id producing $value")
   	queue.send(value)
   }
}

For our scenario we will have an unlimited channel and we will define 10 producers and consumers, declaring each of these coroutines inside a launch block executes each one of them concurrently:

coroutineScope {
   val queue = Channel<String>(UNLIMITED)

   repeat(10) { i ->
   	launch { producer(i + 1, queue) }
   }

   repeat(10) { i ->
   	launch { consumer(i + 1, queue) }
   }
}

5. Akka Actors

Actors are objects which encapsulate state and behavior, they communicate exclusively by exchanging messages that are placed into the recipient’s mailbox. Akka actors conceptually each have their own lightweight thread, which is completely shielded from the rest of the system.

Instead of calling methods, actors send messages to each other. Sending a message does not transfer the thread of execution from the sender to the destination. An actor can send a message and continue without blocking.

Messages go into actor mailboxes. The behavior of the actor describes how the actor responds to messages (like sending more messages and/or changing state). An execution environment orchestrates a pool of threads to drive all these actions completely transparently.

An important difference between passing messages and calling methods is that messages have no return value. Instead, the receiving actor delivers the results in a reply message.

For our problem, instead of using a shared collection, we can take advantage of the actor model and have an actor instance as our shared queue, due to the actor’s received messages being enqueued and processed one at a time, we could support the existence of multiple producers and consumers.

Our producer actor will have the following structure, it will support one message type Produce, and has a reference to the queue actor. When a Produce message is received by the Producer actor, it will generate new data and send it to the queue actor using an Add message:

object Producer {
 case class Produce()

 def props(queue: ActorRef) = Props(new Producer(queue))
}

class Producer(queue: ActorRef) extends Actor {
 import Producer._
 import QueueActor._

 override def receive: Receive = {
   case Produce() =>
 	val value = RandomStringUtils.randomAlphanumeric(10)
 	println(s"producer ${self.path.name} producing $value")
 	queue ! Add(value)
 }
}

The consumer actor also has a reference to the queue actor and supports two kinds of messages Consume and Obtained. When a Consume message is received, this actor will request a value from the queue actor through a Retrieve message. When an Obtained message is received this actor will evaluate if the message actually contains a value and prints it in the console:

object Consumer {
 case class Consume()
 case class Obtained(valueOpt: Option[String])

 def props(queue: ActorRef) = Props(new Consumer(queue))
}

class Consumer(queue: ActorRef) extends Actor {
 import Consumer._
 import QueueActor._

 override def receive: Receive = {
   case Consume() =>
 	queue ! Retrieve

   case Obtained(valueOpt) =>
 	valueOpt match {
   	case Some(value) => println(s"consumer ${self.path.name} consuming value $value")
   	case None => println(s"consumer ${self.path.name} no value obtained")
 	}
 }
}

Our QueueActor will support three kinds of messages Init, Add, and Retrieve. When it receives an Init message it will change its behavior to support the other two messages and contain an initial instance of a queue collection which will be used to contain the produced values internally.

Then in the updated behavior of the actor when an Add message is received, the actor updates its behavior with the updated queue containing the new added value (this strategy allows keeping the actor’s state without using mutable collections or variables).

When a Retrieve message is received (sent by Consumer actors) the actor tries to obtain a value from the inner queue and sends it as a reply to the sender actor in the form of an Obtained message.

object QueueActor {
 case class Init()
 case class Add(value: String)
 case class Retrieve()
}

class QueueActor() extends Actor {
 import QueueActor._
 import Consumer._

 override def receive: Receive = {
   case Init => context.become(withInnerQueue(Queue.empty[String]))
 }

 def withInnerQueue(inner: Queue[String]): Receive = {
   case Add(value) =>
 	context.become(withInnerQueue(inner.enqueue(value)))

   case Retrieve =>
 	if (inner.nonEmpty) {
   	val (value, remaining) = inner.dequeue
   	context.become(withInnerQueue(remaining))

   	sender ! Obtained(Some(value))
 	} else {
   	sender ! Obtained(None)
 	}
 }
}

Finally, our main logic creates and initializes our QueueActor using an Init message. Then it creates instances of Producer and Consumer actors and indefinitely sends them Produce and Consume messages respectively.

Those messages are processed by the producers and consumers in a round-robin fashion thanks to the RoundRobinPool router that created them.

val queue = system.actorOf(Props[QueueActor])
queue ! Init

Future {
 val producers = system.actorOf(RoundRobinPool(10).props(Producer.props(queue)))
 LazyList.from(1).foreach { _ =>
   Thread.sleep(getRandomTime)
   producers ! Produce()
 }
}

Future {
 val consumers = system.actorOf(RoundRobinPool(10).props(Consumer.props(queue)))
 LazyList.from(1).foreach { _ =>
   Thread.sleep(getRandomTime)
   consumers ! Consume()
 }
}

6. Java Virtual Threads

Until Project Loom appeared, every thread in the JVM was basically a wrapper of an OS thread. Now in recent Java versions (since 19) we are able to use virtual threads.

Virtual threads are a new type of thread that tries to overcome the resource limitation problem of platform threads and consist of lightweight threads in a similar fashion to some of the previously reviewed approaches.

One of the main advantages of these new virtual threads is the support of the existing API. To use virtual threads we only need to apply a couple of changes in our first examples.

In our first example we only need to update the new thread declaration:

var task = Thread.ofVirtual().unstarted(producer);
task.start();

In our second example we only need to update the definition of the executor service, due to the characteristics of the virtual threads is not necessary to maintain a fixed small number of threads and we can create a new virtual thread by task (this simplifies a lot the way we reason this kind of problem using the Java API):

ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor()

Wrapping up

As we reviewed the ecosystem around, the JVM offers several alternatives for concurrency handling (and there should be more that weren’t included in this quick overview), there are powerful runtimes that hide a lot of the complexity involved in concurrency like cats-effect, Akka, and Kotlin’s coroutines.

Understanding the advantages and characteristics of these different approaches can help us make better choices for our future projects. For instance, some problems may be more challenging to solve using the actors' approach while others may be significantly easier. The arrival of Virtual Threads will offer a new set of possibilities and the future of these concurrency runtimes will be linked to this big upgrade in the JVM.

The complete code of these examples can be found on GitHub.

Frequently Asked Questions