Multi-core computers are now the norm. For today's software technologists it is essential to use concurrency and/or parallelism in order to make software scalable. No matter which language feature or library that is used, multiple theads running on one or more cores are inevitable. For JVM-based languages, threads are provided by threadpools.
Large-scale multithreaded programs need to be matched with the right threadpool configuration in order to run at peak performance. Threadpool selection and parameter tuning can be greatly affected by the hardware, principally the number of CPUs. Test setups might deliberately simulate suboptimal threadpools in order to characterize failure modes and race conditions. Threadpool selection and tuning should be considered as a test and deployment activity, not just a programming activity. This tuning will show how sensitive your program is to selection of threadpools and configuration; some programs are remarkably insensitive, others respond dramatically to changes in threadpools.
Multithreaded programs that run on Java virtual machines can use standard java.util.concurrent.Executor threadpools or custom threadpools. This is true of Java and Scala programs and libraries such as Akka, which can be used by programs written in either language. Akka provides a flexible Executor selection and tuning mechanism, as described in my book Composable Futures with Akka 2.0. This is important because futures, dataflow, actors and Scala's parallel collections are all backed by threadpools.
It is unrealistic to expect that the optimal type of threadpool and configuration could be selected just by reading documentation and following generic guidelines without doing any testing. "In theory, there is no difference between theory and practise. But, in practise, there is." Generally cited to Jan L. A. van de Snepscheut, though there is some dispute.
Properly written programs can perform poorly if the threadpool does not match the hardware. Threadpool benchmarking should be performed when installing a program on a new computer with a different number of processors than you are familiar with. You might find that with some configurations, thread starvation causes deadlock or severely reduced throughput when there aren't enough threads, causing requests to queue up. On the other hand, allocating more threads than is required might reduce the computing resources that can be made available to other tasks, causing them to be adversely affected.
Concurrency differs from parallelization; concurrent tasks are stateful, often with complex interactions, while parallelizable tasks are stateless, do not interact, and are composable. You can launch any number of parallel tasks without affecting the results provided by other tasks, and incomplete parallel tasks can be restarted without affecting results. It is usually harder to benchmark a concurrent system than a parallelized system because interactions between threads are often complex and nuanced. Many, if not most real-world systems use a mix of parallelizable and concurrent tasks. The interplay of the various threads and as they contend for resources means that a proper selection and tuning of threadpools is necessarily a compromise.
Setting up a test load is fraught with issues, some of which include CPU loads introduced by other programs, allocating memory properly, allocating CPUs, identifying the critical code to benchmark, and minimizing the effects of the overhead of the benchmark framework. This article and the accompanying benchmark program attempt to help you benchmark your programs.
Results are Statistical in Nature
A standardized benchmark framework can provide helpful guidance for selecting and tuning threadpools. Used in this context, the term benchmark means gathering statistical results about the relative merits of various threadpool configurations. Although we are not usually interested at a system level in optimizing the absolute minimum execution times for just one task, it is instructive to compare the relative performance of threadpools in order to perform a given task.
There is always a certain margin of uncertainty in test results, so test accuracy should be computed based on the standard deviation of results from multiple runs. The standard deviation will be affected by the type of computational load. For example, parallelizable CPU-intensive tasks have a small value of standard deviation, while web spider tasks will have a greater value of standard deviation due to varying network latencies of each IP address fetched.
Wikipedia has a good explanation of standard deviation. The diagram below shows a Wikipedia commons image of Gaussian distribution, where one standard deviation is denoted by sigma (σ); each colored band has a width of 1 standard deviation. The median, denoted by mu (μ), is the midpoint of a distribution of observed values. If the standard deviation is small compared to the median, then the results do not vary much between measurements.
Large standard deviation; values are not tightly clustered around the median; results vary a lot each time a measurement is taken
Small standard deviation; values are tightly clustered around the median; results do not vary much each time a measurement is taken.
Because the behavior of multitasking OSes is also statistical in nature, some variance should be expected of the length of time that a long computation requires in any given multitasking environment. Computational loads should be be performed many times, and the results analyzed statistically in order to determine the mean value of the results and the standard deviation. Once a computational load has been characterized, its behavior and metrics can be statistically predicted for various circumstances.
The image below shows the same Wikipedia commons image of Gaussian distribution, superimposed on a stacked bar from a bar chart that shows how long a test took after being run many times. The red bar shows the mean time to perform a computation, less one standard deviation: 7.887 seconds; this is the fastest time that the test normally takes to complete. The stacked blue bar is two standard deviations long, or 670 milliseconds; one standard deviation for this set of results is therefore 335 milliseconds. This means that the mean time per test is 7.887 + 0.335 = 8.222 seconds, and that the longest the test normally takes to complete in this computing environment is 7.887 + 0.670 = 8.557 seconds.
Now that we know how to interpret this type of bar chart, let's see how we can generate them by benchmarking computational loads running on various types of threadpools.
Running the benchmark
Command-line settings for the JVM that runs the standard load test provided with ExecutorBenchmark are defined in build.sbt as follows.
-Xbatch -server -Xmx1G -Xms1G -XX:PermSize=64m -XX:MaxPermSize=64m
These settings were chosen so the test loads would have ample memory.
ExecutorBenchmark runs the load once to warm up the JVM and then runs it many more times in order to obtain a statistically significant sample. Three garbage collections are performed before each load is benchmarked. As you can see from the results in the bar chart below, the differences between the various configurations in two runs was about 10% for the test load. Once the JVM is warmed up, the size of the standard deviation indicates the degree of consistency of the test results.
(Click on the image to enlarge it)
Two types of containers for load instances are tested by the framework: Akka Futures and Scala parallel collections. You can readily extend the framework to support other containers for computing your test loads.
ExecutorBenchmark Code Walkthrough
ExecutorBenchmark is a free and open-source test framework for parallelizable tasks. It is written in Scala, and can be used with Java and Scala loads. Any number of threadpools can be used with a load of your choosing, and results are graphed. You can test various configurations of threadpools with ExecutorBenchmark; this is particularly helpful when poor documentation leaves you wondering just how important a threadpool configuration parameter really is. You can run the benchmark with various instances of a threadpool, with each instance set to different values, and thereby quickly find out what parameters really matter. The major components of the program follow a model-view-controller pattern:
- ExecutorBenchmark.scala
- Contains the entry point, user-defined threadpools and assigns loads to threadpools
- DefaultLoads.scala
- Defines sample loads
- Model.scala
- Data model
- Gui.scala
- Java swing user interface displays stacked bar charts using JFreeChart.
- Benchmark.scala
- Controller that runs benchmarks
- PersistableApp.scala
- Defines the PersistableApp trait, useful for user interfaces that save and restore state between runs.
Entry Point
Executor instances are simple to set up. Below is a Scala code snippet showing how instances of the some standard Java threadpools are configured. Two instances of a fixed thread pool are configured: one instance is configured with the same number of threads as there are processors in the machine that executes the benchmark, and one instance has only one thread.
val nProcessors = Runtime.getRuntime.availableProcessors val esFTPn = Executors.newFixedThreadPool(nProcessors) val esFTP1 = Executors.newFixedThreadPool(1) val esCTP = Executors.newCachedThreadPool() val esSTE = Executors.newSingleThreadExecutor()
One instance of a cached threadpool is also configured; the constructor used creates as many threads are there are CPUs. IO bound loads would likely benefit from a greater number of threads. The single thread executor is the effectively the same as is used by node.js.
The Fork/Join threadpool introduced in Java 7 has since been improved. The improved version is provided with Akka, and will likely ship with Java 8. The benchmark creates an instance of the improved version; this means the test can run on Java 6. If you wanted to use the Java 7 version instead, simply change the import statement to reference java.util.concurrent.ForkJoinPool:
import akka.jsr166y.ForkJoinPool val esFJP = new ForkJoinPool()
ForkJoinPool will strive to maintain a constant amount of parallelism; this means that if a thread blocks, ForkJoinPool will dynamically create more threads in order to maintain the desired number of busy threads. By default, ForkJoinPool sets the degree of parallelism equal to the number of processors available, which is appropriate for CPU-bound tasks. IO-bound tasks will need a greater amount of parallelism; the parallelism factor is the ratio of total threads to busy threads. When tuning ForkJoinPool, the main task is to determine the proper value of the parallelism factor.
Akka's threadpools can be configured from a String embedded in source code or read from a configuration file. Specifying the threadpool in a configuration file means that the threadpool for the accompanying program can be tuned during testing and deployment, instead of hard-coded by the original programmer. The string below configures a ForkJoinPool with the an initial number of threads equal to the parallelism-factor (3.0) times the number of processors available. The call to ActorSystem() actually invokes ActorSystem.apply(), which creates an instance of ActorSystem and the configured threadpools.
val configString1: String = """akka { logConfigOnStart=off executor = "fork-join-executor" fork-join-executor { parallelism-min = %d parallelism-factor = 3.0 parallelism-max = %d } }""".format(nProcessors, nProcessors) val system1 = ActorSystem("default1", ConfigFactory.parseString(configString1))
The above threadpools can be used for Akka futures, actors and data flow; however Scala parallel collections have their own threadpool, which is always an instance of the ForkJoinPool provided with the JVM. It has only one configurable parameter, the number of threads to start the computation with:
ForkJoinTasks.defaultForkJoinPool.setParallelism(int)
The entry point, ExecutorBenchmark, defines the tests as shown in the following code block; labels are short so they fit on the Y axis. Most tests are defined as functions, these functions are stored in data structures and passed as arguments. Functions that are used this way are often termed functors, used in a similar sense as a C or C++ function pointer. Each test specification consists of a key-value pair: the key is a functor or number for the test, and the value is the name of the test as it will be displayed on the bar chart's Y axis.
Model.ecNameMap = LinkedHashMap( 1 -> "PC 1", // parallel collection with 1 thread nProcessors -> "PC %d".format(nProcessors), // parallel collection with nProcessor threads system1 -> "Akka FJ", system2 -> "Akks TP 3", system3 -> "Akka TP 1", // ActorSystem thread-pool-executor & parallelism-factor=1 esFJP -> "Akka FJP", // ActorSystem ForkJoinPool esFTP1 -> "FT 1", // FixedThreadPool w/ nProcessors=1 esFTPn -> "FT %d".format(nProcessors), // FixedThreadPool w/ nProcessors esCTP -> "CT", // CachedThreadPool esSTE -> "ST" // SingleThreadExecutor )
Predefined Benchmark Loads
ExecutorBenchmark provides two predefined loads in DefaultLoads.scala. DefaultLoads.cpuIntensive is a CPU-intensive test, which calculates Pi to a million decimal places; while DefaultLoads.ioBound simulates a web spider. You should not use these loads for anything except to demonstrate how ExecutorBenchmark works. Instead, you should simulate discrete actions, or combinations of actions, of the parallelized task that you want to benchmark. Each action should be performed thousands of times. A load is a collection of actions, possibly performed in sequence, or possibly performed in a statistically consistent manner.
An action is merely a zero-argument method, and can be a static method or defined on an object instance. Static methods are preferred because they introduce less overhead. You can write action methods in Java or Scala. The default loads defined in DefaultLoad.scala are single action methods, as follows:
/** Simulate a CPU-bound task (compute Pi to a million places) */ def cpuIntensive(): Any = calculatePiFor(0, intensity) private def calculatePiFor(start: Int, nrOfElements: Int): Double = { var acc = 0.0 for (i <- start until (start + nrOfElements)) acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1) acc } /** Simulate an IO-bound task (web spider) */ def ioBound(): Any = simulateSpider(30, fetchCount) private def simulateSpider(minDelay: Int, maxDelay: Int, nrOfFetches: Int) { for (i <- 0 until nrOfFetches) { // simulate from minDelay to maxDelay ms latency Thread.sleep(random.nextInt(maxDelay-minDelay) + minDelay) calculatePiFor(0, 50) // simulate a tiny amount of computation } }
The load is invoked from Benchmark.scala companion object, and is supplied to the Benchmark constructor as a functor, like this:
def apply(load: () => Any = DefaultLoad.cpuIntensive, showResult: Boolean=false) = new Benchmark(load, showResult)
The apply method is implicitly used at the top of ExecutorBenchmark.scala, which is the entry point:
Benchmark(DefaultLoads.cpuIntensive).showGui
You can change the load used to a static method called foo() in Bar.java, as follows:
Benchmark(Bar.foo).showGui
The Model
The model is quite simple, and is stored in Model.scala. A scala.collection.mutable.LinkedHashMap associates keys (which are mostly Executors) with their name as displayed on the Y axis of the generated bar chart. Because the Scala parallel collection Executor is not externally accessible, the configuration parameter for that Executor is stored instead of storing the Executor. This quirk explains why ecNameMap defines the Executor key as type Any.
var ecNameMap = new LinkedHashMap[Any, String]
The entry point, ExecutorBenchmark, defines the tests by setting ecNameMap, as we shall see later.
A case class called TestResult holds the result of running one benchmark test; again, tests are defined as type Any:
case class TestResult(test: Any, testName: String, millis: Long, result: Any)
Another case class called MeanResult holds the result of statistically analysing a series of tests:
case class MeanResult(test: Any, testName: String, millisMean: Long, millisStdDev: Long)
Results are accumulated in two scala.collection.mutable.HashMaps called testResultMapWarmup, which stores the results of the test runs used to warm up the JVM, and testResultMapHot, which stores actual results:
val testResultMapWarmup = new LinkedHashMap[Any, TestResult] val testResultMapHot = new LinkedHashMap[Any, TestResult]
At the start of each run the model is reset:
def reset { ecNameMap.empty testResultMapHot.empty testResultMapWarmup.empty }
Each time a test is run, the results are saved by calling Model.addTest():
def addTest(test: Any, testName: String, timedResult: TimedResult[Seq[Any]], isWarmup: Boolean): TestResult = { val testResult = new TestResult(test, testName, timedResult.millis, timedResult.results) if (isWarmup) testResultMapWarmup += test -> testResult else testResultMapHot += test -> testResult testResult }
The Controller
Unlike Java classes, Scala class statements also define the primary constructor. The controller, Benchmark.scala, defines a class called Benchmark with the following signature.
class Benchmark (val load: () => Any, val showResult: Boolean)
The above says that the Benchmark constructor accepts two parameters; the first parameter is a no-args function that could return anything, and the second parameter is a Boolean that determines if the result should be displayed on the console or not.
Inside the class, a private immutable property is defined, which contains a reference to the graphical user interface singleton:
private val gui = new Gui(this)
Also, a private implicit variable is defined that contains the Akka ExecutionContext for tests that exercise an Akka Executor. This is declared as a mutable var because it will assume new values for every test of an Akka ExecutionContext.
private implicit var dispatcher: ExecutionContext = null
Before every test is run, the controller needs to be reset:
def reset { ExecutorBenchmark.reset Model.reset }
This is the master logic for each test:
def run() { reset ecNameMap.keys.foreach { ec: Any => val ecName: String = ecNameMap.get(ec.asInstanceOf[AnyRef]).get ec match { case system: ActorSystem => dispatcher = system.dispatcher runAkkaFutureLoads(ec, ecName) system.shutdown() case parallelism: Int => runParallelLoads(parallelism, ecName) case jucExecutor: Executor => // j.u.c.Executor assumed dispatcher = ExecutionContext.fromExecutor(jucExecutor) runAkkaFutureLoads(ec, ecName) ec.asInstanceOf[ExecutorService].shutdown() case unknown => println("Unknown test type: " + unknown) return } gui.removeCategorySpaces } gui.resize }
Recall that the ecNameMap keys are actually the tests to be run, while the values are the names of the tests. The foreach line reads: "for each key in ecNameMap , name the key ec." The body of the foreach closure continues onto the next line, which reads: "get the key's value (which is the displayed name of the test) and store it in ecName." The match statement contents with three cases for the type of test to run: Akka ActorSystems, or an Int that indicates a Scala parallel collection should be tested, otherwise a java.util.concurrent.Executor is expected.
The test loads are run using Akka futures as containers if an Akka ActorSystem is provided or a j.u.c.Executor is provided, then the ActorSystem or j.u.c.Executor is shut down after each test. Here is the Akka ActorSystem case:
dispatcher = system.dispatcher runAkkaFutureLoads(ec, ecName) system.shutdown()
Here is the j.u.c.Executor case:
dispatcher = ExecutionContext.fromExecutor(jucExecutor) runAkkaFutureLoads(ec, ecName) jucExecutor.asInstanceOf[ExecutorService].shutdown()
Otherwise, a Scala parallel collection is used as a test container, like this:
runParallelLoads(parallelism, ecName)
Because the Scala parallel collection's ForkJoinPool is internal, it cannot be explicitly shut down. It shuts itself down automatically once each test is complete.
The benchmark runs loads at least twice; once to warm up the Java hotspot JIT using the desired Executor, and again at least once to use the warmed up hotspot. If a standard deviation is desired then the load should be invoked at least 10 times, perhaps 100 times. Each test run is created and added to the model by calling Model.addTest(); the new test is returned and saved as newTest1.
def runAkkaFutureLoads(executor: Any, executorName: String) { val newTest1 = Model.addTest(executor, executorName, runAkkaFutureLoad, true) if (Benchmark.showWarmUpTimes) { val test1StdDev = 0 // we only warm up once gui.addValue(MeanResult(newTest1.test, newTest1.testName, newTest1.millis, test1StdDev), true) } val results: Seq[TestResult] = for ( i <- 0 until Benchmark.numRuns; val result = Model.addTest(executor, executorName, runAkkaFutureLoad, false) ) yield TestResult(newTest1.test, executorName, result.millis, result) val resultsMillis = results.map(_.millis) val millisMean = arithmeticMean(resulstMillis: _*).asInstanceOf[Long] val stdDev = popStdDev(resultsMillis: _*).asInstanceOf[Long] gui.addValue(MeanResult(runAkkaFutureLoad, executorName, stdDev * 2L, millisMean - stdDev), false) }
If warm-up times are to be shown, the test is saved in a new MeanResult case class. Warmups are only done once, and the mean of one value is of course the value itself. The actual tests are run many times in the for-comprehension, and the results are stored as a Seq[TestResult], which is rather like a Java Iterable<TestResult>. The arithmetic mean and standard deviation are computed as a map/reduce. First, the millis property is extracted from each of the results and stored in resultsMillis:
val resultsMillis: Seq[Long] = results.map(_.millis)
The arithmetic mean is then reduced to a Long by concatenated resultsMillis into a varargs list, which is then passed to arithmeticMean(), and the resulting Double is converted to a Long and stored as millisMean:
val millisMean = arithmeticMean(resultsMillis: _*).asInstanceOf[Long]
The standard deviation is computed in a similar manner:
val stdDev = popStdDev(resultsMillis: _*).asInstanceOf[Long]
Standard deviation straddles the mean, so the last line causes a bar to be drawn the length of the mean less one standard deviation, and a stacked bar of two standard deviations to show the range of normally expected values. A new MeanResult case class is added to the model and displayed by the following:
gui.addValue(MeanResult(runAkkaFutureLoad, executorName, stdDev * 2L, millisMean - stdDev), false)
Now that we have seen how the sequencing logic works for the loads, we are nearly ready to see how an individual load is actually benchmarked. First we should know how the loads are timed. Timing results are stored in a case class called TimedResult, and the timing is done by a function called time(). The time() function returns a TimedResult, which is just a tuple with two values in it:
case class TimedResult[T](millis: Long, results: T)
The time() function is also simple. It is a curried function that accepts two parameters; the first is the functor to time, block, which can return any value. The second parameter is a prefix for the formatted message displayed on the console.
def time(block: => Any)(msg: String = "Elapsed time"): TimedResult[Any] = { val t0 = System.nanoTime() val result: Any = block val elapsedMs = (System.nanoTime() - t0) / 1000000 if (Benchmark.consoleOutput) println(msg + ": " + elapsedMs + "ms") TimedResult(elapsedMs, result) }
Now we are ready to examine how time() and TimedResult are used to benchmark an individual load. Note that each timed run is prefixed by three garbage collections. The load is repeatedly executed on the container Benchmark.numInterations times. The results are then map-reduced to a Long called elapsedMs by using the Akka Future.sequence() method. A TimedResult[Seq[Any]]] is constructed and returned as the value of this method. We don't care about the list of results, which should all be identical, so they are not used in the program.
def runAkkaFutureLoad: TimedResult[Seq[Any]] = { System.gc(); System.gc(); System.gc() val t0 = System.nanoTime() val trFuture = time { for (i <- 1 to Benchmark.numInterations) yield Future { load() } }("Futures creation time").asInstanceOf[TimedResult[Seq[Future[Any]]]] val f2 = Future.sequence(trFuture.results).map { results: Seq[Any] => val elapsedMs: Long = (System.nanoTime() - t0) / 1000000 if (Benchmark.consoleOutput) { println("Total time for Akka future version: " + elapsedMs + "ms") if (showResult) println("Result in " + trFuture.millis + " using Akka future version: " + results) } TimedResult(elapsedMs, results) } val r = Await.result(f2, Duration.Inf) r.asInstanceOf[TimedResult[Seq[Any]]] }
The code for benchmarking Scala parallel collections is very similar to the above code for benchmarking Akka futures.
In Summary
Some experimentation will be necessary before you can know how many iterations of a load will produce statistically significant data. The size of the standard deviation will tell you how tightly clustered your results are, and therefore indicate of how much data you need to collect before you can trust the results.
Large-scale programs should be designed such that their major tasks can be measured as separate computational loads. Because some tasks might affect other tasks, be sure to measure them separately and in concert.
Testing on only one hardware configuration is not enough when developing a library or a product that must run on a variety of configurations. Considerably more work is involved in characterizing the computational load imposed by a program that must run on customer hardware as opposed to SAAS, where the hardware environment is more tightly controlled.
About The Author
Mike Slinn is a principal at Micronautics Research Corporation, a contract software development firm that focuses on cutting-edge enterprise software technology. Mike is a hands-on architect who enjoys programming and mentoring. He is often engaged to turn around projects and can provide interim technical leadership for companies in transition. Mike has written three books and over fifty articles on software technology, and he has been recognized as a software expert in US federal court.