Tutorials     About     RSS
Tech and Media Labs
Thread Ops for Java
  1. Thread Ops for Java

Thread Ops for Java

Jakob Jenkov
Last update: 2019-12-13

Thread Ops for Java is an open source Java toolkit providing concurrency solutions separate state same-threaded systems. By concurrency solutions I mean tools for running multiple long-running jobs and short-lived tasks within the same thread (samethreading), as well as tools for asynchronous communication between different threads.

While Thread Ops for Java can be used for any kind of system where its tools make sense, Thread Ops is specifically designed to have concurrency solutions needed by advanced distributed systems. By advanced distributed systems I mean systems that use asynchronous execution and communication, non-blocking IO, and more than just the traditional client-server model of interaction. With Thread Ops I believe it becomes easier to implement applications that acts as both clients and servers, like relay servers, routers, reverse servers, P2P networks and subscribe-notify type communication systems.

Please note, that Thread Ops is still under development, but is expected to reach a feature-complete state during 2020.

GitHub Repository

You can find the code for Thread Ops for Java in its GitHub repository, here:

https://github.com/nanosai/thread-ops-java

Design Goals and Challenges

The design goal of Thread Ops is to enable you to use a singlethreaded / samethreaded execution model. This model makes it easier to reason about concurrency, since there is no state (data / objects) shared among the threads executing in the system.

Additionally, Thread Ops is designed especially for systems that use asynchronous communication. Structuring application logic is harder when a logic flow can be broken by sending a message asynchronously, and not blocking the flow to wait for the response. The thread must go on. Sooner or later the thread will have to check if a response has arrived for the async request sent earlier, so the logic flow can continue. This is not trivial to implement.

Furthermore, Thread Ops is designed for systems that use non-blocking IO too. The challenge with non-blocking IO is, that you can read and write partial messages. You might receive only part of an inbound message, and thus have to come back later to read the rest of the message. You might only be able to write part of an outbound message, and thus have to come back later to write the rest of the message. This is not trivial to implement either.

Finally, an application might have multiple ports to read partial messages from, and to write partial messages too. For instance, an application may act as both a server (one port) and a client (another port). Such an application will have to keep reading inbound data and send outbound data from both the server connections (inbound connections from clients) and its client connections (outbound connections to remote services). When messages are received the application needs to continue the corresponding logic flow for each message. This is not trivial to implement either!

Execution Model

The Thread Ops solution to the challenges of implementing asynchronous communication over non-blocking IO within a single thread, is an execution model where a single thread executes one or more repeated tasks in a loop. Each repeated task gets a chance to do its work, and then return control of the execution back to the thread.

This execution model means you don't have to implement one big application loop for the thread from which you have to execute all tasks the application has to execute. Instead you can split each task into a separate repeated task which is invoked repeatedly. This is often both easier to implement, and easier to test, as the repeated task can be tested in isolation (outside of a loop) simply by invoking it.

This execution model also enables you to divide the checking of the various different asynchronous message ports into multiple different repeated tasks. This can make the management of async response receival and partial reading and writing of messages less complex to implement.

The Thread Ops execution model is illustrated here:

The Thread Ops Execution Model

The above diagram shows how a single thread executes multiple repeated tasks in a loop. Each repeated task gets the opportunity to do some work, if there is any work to do. For instance, check for inbound messages from clients, or check for responses from remote services.

Keep in mind that the above illustration is just an arbitrary illustration of a thread loop, repeated tasks, application logic and async ports. Your application may look different. In fact, each application will probably have a different number of repeated tasks, application classes, calls to async ports etc. The precise design is up to you - and to what makes sense for your concrete application.

Thread Loops

A thread loop is a thread that is executing the same code over and over again in a loop. A thread loop is implemented by combining a loop and a repeated task. The repeated task is executed over and over again inside the loop. Thread Ops for Java comes with with two implementations of a thread loop:

  • ThreadLoop
  • ThreadLoopPausable

Both of these thread loop implementations will be explained in the following sections.

Repeated Tasks

A repeated task is a task that is repeated over and over again inside a thread loop. A repeated task is a finite task that does its jobs and then returns control to the executing thread.

You should avoid blocking the thread from inside a repeated task. You block the thread if you call blocking IO operations, call object.wait() , Thread.sleep(), or similar methods that put the thread into an idle / waiting / blocked state.

Having said that, there might still be a few situations in which it can make sense to block the thread from inside a repeated task. For instance, if one thread is reading tasks from some async port (e.g. a Java NIO server), and passing them on to another thread for execution, you could risk getting into a situation where the other thread cannot keep up with the level of incoming tasks. One possible solution to that problem could be to simple block the thread that receives the incoming tasks until the other, task executing thread has capacity again. This would be a simple back pressure management strategy.

The key here is to apply your critical, analytic sense, and not just be dogmatic about not blocking the thread. Most of the time you should not block the thread, but sometimes it can make sense in a specific situation.

Thread Ops currently supports two types of repeated tasks:

  • IRepeatedTask
  • IRepeatedTaskPausable

The first repeated task type is represented by the IRepeatedTask interface which looks like this:

public interface IRepeatedTask {

    public void exec();

}

An IRepeatedTask type of repeated task will have its exec() method call over and over again in a loop, by a ThreadLoop. There is no pause between executions.

The second repeated task type is a Thread Ops specific interface named IRepeatedTaskPausable which looks like this:

public interface IRepeatedTaskPausable {

    public long exec();

}

A Thread Ops ThreadLoopPausable will call the exec() method over and over again in a loop. The long returned from exec() is a number of nanoseconds the IRepeatedTaskPausable thinks it makes sense to pause before calling exec() again. This pause return value is really the only difference between a IRepeatedTask and an IRepeatedTaskPausable implementation.

ThreadLoop

TreadLoop is a thread loop that can execute a single IRepeatedTask over and over again in a loop. You can create a ThreadLoop similarly to how you create a normal Java thread. You call start() to start the thread loop. And you can call stop() to stop the loop. Here is an example of starting and stopping a ThreadLoop:

ThreadLoop threadLoop = new ThreadLoop(() -> {
    System.out.println("Repeated Task");
});
threadLoop.start();

...

threadLoop.stop();

ThreadLoopPausable

TreadLoopPausable is a thread loop that can execute a single IRepeatedTaskPausable over and over again in a loop. You can create the ThreadLoopPausable similarly to how you create a normal Java thread. You call start() to start the thread loop. And you can call stop() to stop the loop.

The IRepeatedTaskPausable has a single method named exec(). This method can return a long which is a number of nanoseconds the ThreadLoopPausable is to pause ( Thread.sleep() ) before calling the IRepeatedTaskPausable.exec() the next time. Here is an example of starting and stopping a ThreadLoopPausable:

ThreadLoopPausable threadLoop = new ThreadLoopPausable(() -> {
    System.out.println("Repeated Task");
    return 500;  //pause 500 nanoseconds before next execution
});
threadLoop.start();

...

threadLoop.stop();

RepeatedTaskExecutor

The RepeatedTaskExecutor is an IRepeatedTask implementation which can itself execute other IRepeatedTask implementations in a round robin fashion.

After each round of execution of all IRepeatedTask instances, the RepeatedTaskExecutor returns from its own exec() method, and return control of execution to the component that called it (typically a ThreadLoop).

Here is an example of how to create and execute a RepeatedTaskExecutor inside a ThreadLoop:

RepeatedTaskExecutor executor = new RepeatedTaskExecutor(
         () -> { System.out.println("First");  }
        ,() -> { System.out.println("Second"); }
    );

ThreadLoop threadLoop = new ThreadLoop(executor);
threadLoop.start();

Thread.sleep(2000);     // sleep 2 seconds before stopping the ThreadLoop completely.

threadLoop.stop();

RepeatedTaskExecutorPausable

The RepeatedTaskExecutorPausable is an IRepeatedTask implementation which can itself execute other IRepeatedTask implementations in a round robin fashion.

Each IRepeatedTask can pause itself a number of nanoseconds. The RepeatedTaskExecutorPausable will keep track of when it is time to execute the IRepeatedTask next time.

After each round of execution of all IRepeatedTask instances it was time to execute, the RepeatedTaskExecutorPausable calculates the earliest time it should execute next, and returns how many nanoseconds the RepeatedTaskExecutorPausable as a whole should be paused before next execution by the ThreadLoopPausable executing it.

Here is an example of how to create and execute a RepeatedTaskExecutorPausable inside a ThreadLoopPausable:

RepeatedTaskExecutorPausable executor = new RepeatedTaskExecutorPausable(
         () -> { System.out.println("First");  return 1_000_000_000; }  // pause   1 second
        ,() -> { System.out.println("Second"); return   400_000_000; }  // pause 400 millis
        );

ThreadLoopPausable threadLoop = new ThreadLoopPausable(executor);
threadLoop.start();

Thread.sleep(5000);     // sleep 5 seconds before stopping the ThreadLoopPausable completely.

threadLoop.stop();

Jakob Jenkov

Featured Videos





















Sponsored Ads

Maildroppa - Smart Email Marketing Solution
Close TOC

All Trails

Trail TOC

Page TOC

Previous

Next