BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Spark Application Performance Monitoring Using Uber JVM Profiler, InfluxDB and Grafana

Spark Application Performance Monitoring Using Uber JVM Profiler, InfluxDB and Grafana

Leia em Português

Key Takeaways

  • Running spark applications continuously & reliably is a challenging task and a good performance monitoring system is needed.
  • The performance monitoring system designed with three objectives - collect server and application metrics, store metrics in a time-series database and provide a dashboard for data visualization.
  • Uber JVM Profiler is used to monitor Spark application. Other technologies used are InfluxDB for storing time-series data and Grafana data visualization tool.
  • The performance monitoring system helps DevOps teams to efficiently monitor the system to meet compliance and service level agreements of an application.

 

Many industries are using Apache Spark for building big data processing applications. Spark provides implicit data parallelism and fault-tolerance for this type of application. The application could be a stream processing, batch processing, SQL dataset processing or machine learning application. Spark’s fast, in-memory data processing engine runs these applications in a cluster and transforms & processes large amount of data in data pipelines. Running these applications continuously & reliably is a challenging task, and a good performance monitoring system is needed. As Spark is being widely adopted by industries, performance monitoring, metric analysis and tuning the issues of Spark applications are getting more attention. Uber has recently open sourced their JVM Profiler for Spark. In this article we will discuss how we can extend Uber JVM Profiler and use it with InfluxDB and Grafana for monitoring and reporting the performance metrics of a Spark application.

Spark Application Performance Monitoring System

A performance monitoring system is needed for optimal utilisation of available resources and early detection of possible issues. It should provide comprehensive status reports of running systems and should send alerts on component failure. When we talk of large-scale distributed systems running in a Spark cluster along with different components of Hadoop echo system, the need for a fine-grained performance monitoring system becomes predominant. The Spark application performs distributed processing of data and works on shared resources which makes the job of the DevOps team very complex. The DevOps team has to manage the available resources effectively and monitor different components of the system closely to avoid any downtime. The full stack visibility provided by an effective performance monitoring system helps theDevOps team to understand the system behaviour and to react quickly on the production issues. This ensures reliability, scalability and performance of the Spark application.

The ideal performance monitoring system for such a complex system must have the following features:

  • The monitoring system should provide granular visibility about each component inside cluster. We should able to get detailed metrics about CPU, Memory, Storage, Disk I/O for local file and HDFS, stack traces etc. These metrics help to quickly troubleshoot the faulty instances.
  • The monitoring system should provide code level metrics for applications (e.g. execution time, arguments used by different methods) running on spark. This will help to identify slow running methods, disk hot-spotting etc.
  • The monitoring system should store metrics for every second and allow us to analyse metrics by navigating across different time periods. We should able to do second and sub second level dissection of these data. We should able to control the data retention period and easily access and analyse the past data when needed. This helps in analysing the current trend and predicting the future trend.
  • The monitoring system should provide an effective way of extracting meaningful information from a large volume of metrics being collected continuously. This includes querying of data using SQL or API, filtering of data, aggregation of values and applying custom analysis. This helps in easy transformation and faster analysis of data.
  • The monitoring system should provide easy access of information deduced from the analysis of metric data. There should be the option to display data in dashboard using different forms like charts, graphs tables, etc. There should be the option to categorise the data based on host, time or job, and users should be able to further drill them down to corelate the different data points. There should be a provision for configuring alerts and notifications for user defined thresholds on metrics data. This helps DevOps teams and other stockholders of an organisation to get the desired set of information quickly when it is needed.

In this article, we’ll develop a performance monitoring system using opensource tools & technologies. The spark application performance monitoring system is designed with three objectives:

  1. Collect the performance metrics of system (driver & executors) and application code
  2. Store these metrics in durable storage for timeseries analysis (batch & live)
  3. Generate reports from metrics data in the form of graphs and charts

Apache Spark provides a web-ui and REST API for metrics. Spark also provides a variety of sinks including Consoles, JMX, Servlet, Graphite etc. There are few other open source performance monitoring tools available like dr-elephant, sparklint, prometheus, etc. Metrics provided by these tools are mostly server level metrics, and few of them also provide information of running applications.

Uber JVM Profiler collects both server level and application code metrics. This profiler can collect all metrics (cpu, memory, buffer-pool etc) from the driver, executor or any JVM. It can instrument existing code without modifying it, so it can collect metrics about methods, arguments and execution time. For storing metrics for timeseries analysis, we will use InfluxDB, which is a powerful timeseries database. We will extend Uber JVM Profiler and add a new reporter for InfluxDB so metrics data can be stored using HTTP API. For the dashboard of graphs and charts we will use Grafana, which will query the InfluxDB for metrics data.

Below are the details about tools/technologies used for the spark application performance monitoring system.

Uber JVM Profiler   

Uber JVM Profiler is a distributed profiler which collects performance metrics and resource utilisation metrics from different nodes across the cluster. This JVM Profiler runs as a Java Agent along with the application and collects the different metrics. It publishes these metrics to a designated reporter for further analysis and reporting. Uber JVM Profiler was developed for profiling spark applications, but this can be used for profiling any JVM based application. There are three main components of Uber JVM Profiler:

  1. Profiler: Uber JVM Profiler comes with the following built in profilers:
  • CpuAndMemory Profiler – This profiler collects Buffer Pool (direct & mapped), Garbage Collection (count & time), Heap Memory (Committed & TotalUsed), Non- Heap Memory (committed & total used), CPU (load & time), Memory Pool details (EdenSpace, SurvivorSpace, TenuredGen,  CodeCache, CompressedClassSpace, Metaspace), vmHWM and vmRSS metrics.
  • IO Profiler – IO profiler collects CPU stat (idle, nice, system, user, iowait) and Disk io read/write bytes.
  • Stacktrace Profiler – This profiler collects Thread Name, Thread State and Stack trace metrics.
  • ProcessInfo Profiler – This profiler collects Agent Version, JVM Class Path, JVM Input Arguments and xmxBytes metrics.
  • MethodDuration Profiler – This profiler collects metrics for the execution time of a method which includes Class Name, Method Name and Process Name.
  • MethodArgument Profiler – This profiler collects metrics for method argument which includes Class Name, Method Name and Process Name.
  1. Transformer: This is a Class File Transformer to instrument java method bytecodes. This transformer is used to instrument methods from MethodDuration and MethodArgument profilers.
  1. Reporter: The following reporters are available:
  • KafkaOutputReporter – To send performance metrics to Kafka topic.
  • FileOutputReporter – To write metrics in a file.
  • ConsoleOutputReporter – Sends metrics to console.
  • RedisOutputReporter – Store metrics to Redis database.

For more details about JVM profiler please refer to Uber’s blog post.

InfluxDB and Grafana

InfluxDB: InfluxDB is an open source time series database which is designed for efficiently storing and querying a large amount of time-stamped data. This data could be IoT sensor data, real-time analytics data, application metrics data or dev-ops monitoring data. It provides automatic data lifecycle management by expiring & deleting old data. It provides write and query capabilities using SQL-like query language, HTTP API and client libraries. Check out the details here.

Grafana: Grafana is an open source metric dashboard and graph editor. Grafana also supports alerts and notifications for metrics. It supports many data sources like Graphite, InfluxDB, OpenTSDB, Prometheus, Elasticsearch and CloudWatch. Many Dashboard and Plugins (both opensource and commercial) are available at Grafana’s website. For more details about Grafana visit the website.

System Architecture

Spark application runs on cluster network which may consist of a few nodes to thousands of nodes. For collecting metrics from this distributed system and sending the metrics to other systems for further analysis, we need a loosely coupled and fault tolerant architecture. Publishing the metrics to Kafka topic is one of the best solutions. Uber JVM profiler comes with a “KafkaOutputReporter” which can be used for this purpose. Another solution approach for storing and reading the metrics is by using the InfluxDB. InfluxDB provides HTTP API which can be used to query and write to database. This API supports both basic and JWT token authentication and URLS can be configured for HTTPS access. “InfluxDBOutputReporter” that we are discussing in this article invokes Write HTTP Endpoint to store the metrics collected by different profilers. Grafana provides a rich data source plugin for InfluxDB. Grafana uses Query HTTP Endpoint to fetch metrics data from InfluxDB  and it displays the data on the dashboard in the form of rich graphs and tables. These graphs and tables are auto refreshed at fixed intervals, and time intervals can be configured in Grafana settings.

The architecture diagram of the performance monitoring system for spark application using Uber JVM Profiler, InfluxDB and Grafana is illustrated in Figure 1 below.

Figure 1. Performance Monitoring System Architecture Diagram

Technologies and Tools

The following Table shows the technologies and tools used for a performance monitoring system.

Tools and Technology

Version

Download URL

JDK

1.8

http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

Maven

3.5.4

https://maven.apache.org/download.cgi

InfluxDB

1.6.2

https://portal.influxdata.com/downloads

Grafana

5.2.4

https://grafana.com/grafana/download

Spark

2.3.1

http://spark.apache.org/downloads.html

Please refer to the documentation for installing and configuring these tools.

Design and Implementation

The following sections provide design and implementation details for a performance monitoring system for spark application. Uber JVM profiler collects metrics from driver and executors and these metrics have detailed information like role (driver/executor), processUuid and host. These details are useful for identifying the different systems and analysing their metrics. In InfluxDB we can use these details to query performance metrics of different executors. We can add processUuid in InfluxDb tags, so query performance will be high. First, we will create “metrics” database in InfluxDB, then we will add an “InfluxDBOutputReporter” in JVM Profiler code base, and finally we will configure the Grafana dashboard and graphs.

Create Metrics Database in InfluxDB

  • Start the InfluxDB server, which by default runs on 8086 port. For example, in Ubuntu system open a terminal and execute “influxd” command.

/user/bin$ sudo influxd

  • Once the server is up and running, start “influx” utility in another terminal. And then execute command to create “metrics” database.

/user/bin$ sudo influx            

CREATE DATABASE metrics

Implement InfluxDBOutputReporter

We will implement “InfluxDBOutputReporter” in JVM-Profile code base. Refer to the InfluxDBOutputReporter.java file at “influxdb_reporter” branch at GitHub for the implementation details discussed in this section.

  • We will use influxdb-java library to interact with InfluxDB database server. It uses HTTP API to read and write in the InfluxDB database. Update the pom.xml file and add maven dependency for influxdb-java library.
  • Define host, port and database properties for InfluxDB database server in InfluxDBOutputReporter class. Default values for these properties are “127.0.0.1”, “8086” and “metrics”, respectively. Create connection to InfluxDB using API provided by influxdb-java library.
  • InfluxDBOutputReporter class needs to implement com.uber.profiling.Reporter interface. We need to override public void report(String profilerName, Map<String, Object> metrics) method and public void close() method. The “profilerName” will be the measurement name in “metrics” database.
  • As InfluxDB uses Line Protocol to store key=value pairs, we need to process metrics data available in Map<String, Object> sent by Profiler and convert them to a form so that it can be stored in a Point of single series. We can use the value of “name” property from metrics, if it is available, to form the name of field. If “name” property is not available, then we can use a counter value. InfluxDB supports regular expression for field name in query, so querying these fields won’t be an issue.
  • Create Point and Batchpoint using influxdb-java library APIs and write the Batchpoints to InfuxDB database.
  • For reading database connection properties from the yaml file we have to use com.uber.profiling.YamlConfigProvider and com.uber.profiling.Argument classes. Add a reference of InfluxDBOutputReporter class in Argument class and invoke setProperties method to pass properties coming from yaml file. A sample yaml file for influxdb is available at resources/influxdb.yaml at GitHub.

Add Data Source and Dashboard in Grafana

This section provides brief details on the steps needed for adding a graph for metric data from InfluxDB in Grafana. Please check on the Grafana documents for in-depth details:

  • Start the Grafana server. On Ubuntu we can execute the below command. By default, it runs on port number 3000.

sudo service grafana-server start

  • Open http://localhost:3000/ in a browser and create data source for InfluxDB. Add Name as “InfluxDBDataSource”, Type as “InfluxDB”, default URL for InfluxDB is “http://localhost:8086” and Database name will be “metrics”.
  • Create a new dashboard by clicking on Graph, click Edit and add query for graph. Below is a sample query for InfluxDB database.
select "heapMemoryCommitted" as Committed, "heapMemoryTotalUsed" as Used from "metrics"."autogen"."CpuAndMemory” where “role" = 'driver' AND time > now() – 5m
  • Grafana provides the option of defining template variables which can be passed in query. This will be useful for displaying data from multiple executors on dashboard. For example, we can create variables for “executorProcessUuid” & “timeInterval” and use them in query as below.
select "heapMemoryCommitted" as Committed, "heapMemoryTotalUsed" as Used from "metrics"."autogen"."CpuAndMemory” where "processUuid" =~ /^$executorProcessUuid$/ AND time > now() - $timeInterval

A sample Spark-Metrics-Dashboard JSON file has been provided at GitHub. This file can be imported at Grafana server. Open the URL http://localhost:3000/dashboard/import in a browser and click on “Upload.json File”.

Build and Deploy

This section describes the steps to build and deploy a performance monitoring system. Application code can be cloned from the “influxdb_reporter” branch at GitHub.

  • Use the below command to build the JVM profiler with “InfluxDBOutputReporter”.

mvn clean package

  • Copy the JVM Profiler-0.0.9.jar file created by maven build to some directory. (e.g /opt/profiler). We can keep influxdb.yaml file as well in this directory.
  • We will use the existing JavaNetworkWordCount application for profiling which is shipped with Apache Spark. Source code is available at /spark-2.3.1-bin-hadoop2.7/examples/src/main/java/org/apache/spark/examples/streaming.
  • To run JavaNetworkWordCount application, we need to run Netcat server using the below command.

nc -lk 9999

  • Go to /spark-2.3.1-bin-hadoop2.7/sbin and start the Master using the below command.

./start-master.sh

  • We can get the master URL from log file. Pass this URL to command to start the Worker.

./start-slave.sh -c 2 spark://192.168.1.6:7077

  • Go to /spark-2.3.1-bin-hadoop2.7/bin and execute the below command. This command will execute JavaNetworkWordCount application and start the JVM profilers for driver and executor. Check the Uber JVM profiler GitHub readme page for details about the parameters.
spark-submit --master spark://192.168.1.6:7077 --conf "spark.driver.extraJavaOptions=-javaagent:/opt/profiler/jvm-profiler-0.0.9.jar=reporter=com.uber.profiling.reporters.InfluxDBOutputReporter,metricInterval=5000,sampleInterval=5000,ioProfiling=true" --conf "spark.executor.extraJavaOptions=-javaagent:/opt/profiler/jvm-profiler-0.0.9.jar=reporter=com.uber.profiling.reporters.InfluxDBOutputReporter,tag=influxdb,configProvider=com.uber.profiling.YamlConfigProvider,configFile=/opt/profiler/influxdb.yaml,metricInterval=5000,sampleInterval=5000,ioProfiling=true" --class org.apache.spark.examples.streaming.JavaSqlNetworkWordCount ../examples/jars/spark-examples_2.11-2.3.1.jar localhost 9999
  • Optionally, we can run the application with yaml file. Pass “configProvider” and “configFile” parameters in command as below.
spark-submit --master spark://192.168.1.6:7077 --conf "spark.driver.extraJavaOptions=-javaagent:/opt/profiler/jvm-profiler-0.0.9.jar=reporter=com.uber.profiling.reporters.InfluxDBOutputReporter,configProvider=com.uber.profiling.YamlConfigProvider,configFile=/opt/profiler/influxdb.yaml,metricInterval=5000,sampleInterval=5000,ioProfiling=true" --conf "spark.executor.extraJavaOptions=-javaagent:/opt/profiler/jvm-profiler-0.0.9.jar=reporter=com.uber.profiling.reporters.InfluxDBOutputReporter,tag=influxdb,configProvider=com.uber.profiling.YamlConfigProvider,configFile=/opt/profiler/influxdb.yaml,metricInterval=5000,sampleInterval=5000,ioProfiling=true" --class org.apache.spark.examples.streaming.JavaSqlNetworkWordCount ../examples/jars/spark-examples_2.11-2.3.1.jar localhost 9999
  • Go to “influx” terminal and execute the below commands.

use metrics

show measurements

  • We will get the name of “Measurements” created in “metrics” database as below.

CpuAndMemory

IO

ProcessInfo

Stacktrace

We can check the data populated in these Measurements. For example, we use the below command to fetch a single record from CpuAndMemory measurement.

select * from CpuAndMemory limit 1

  • Go to Grafana dashboard for spark. Graphs for metrics should be populated with data. Below is the sample dashboard.

Figure 2. Sample Grafana dashboard for spark metrics

Pros and Cons

The performance monitoring system we discussed in this article collects different system and application metrics using Uber JVM Profiler and stores them in the InfluxDB time-series database. The time-series database provides data retention policies, continuous queries, flexible time aggregations and efficient handling of millions of records both for live and batch processing. These timeseries data helps us to analyse how system changed in the past, how the system is behaving in the present and predict how it may change in the future. We can identify the patterns of failures by correlating these metrics collected over time. We created a dashboard using Grafana which helps us in easy access and navigation across different types of metrics. The DevOps team can use these graphs and charts to corelate the different metrics to understand the system behaviour and can identify hotspots in the data. This helps in maintaining the compliance and achieving the service level agreements of an application. Overall this performance monitoring system helps in achieving the Continuous Monitoring of the system.

The limitations of this performance monitoring system are in line with the limitations we have for the agent-based profiler. The agent-based profiler may consume a certain amount of computing resources. Sometimes it may require troubleshooting and patching, which might be difficult for large geographically distributed system. You might need to complete the paper work before installing the agents on the production system. We also have to consider the security, scalability and availability of the performance monitoring system, i.e. monitor the monitoring system. Most of these challenges can be overcome by proper designing and tuning of the application and system. If you are not allowed to install profiler agents on a production system, then you can consider anagent-less system, but it has its own limitations, such as less granular metrics and increased network overload.

Summary

For complex spark applications identifying, debugging and resolving, the production issues are not easy tasks and we need an effective performance monitoring system which can assist us with this. Uber JVM Profiler is a great addition to the open source community. This JVM Profiler can be extended and a new reporter for publishing metrics can be added. Performance metrics for spark applications collected by different profilers can be stored in InfluxDB. The “InfluxDBOutputReporter” that we discussed in this article uses HTTP API to write metrics for spark drivers and executors to InfluxDB. Grafana provides a plugin for InfluxDB and uses HTTP API for querying the metrics. We can create a dashboard with charts, tables and graphs for these metrics, which can be auto refreshed in fixed intervals. Code for “InfluxDBOutputReporter” is available here, and Spark-InfluxDB-Grafana.json file is available here.

References

About The Author

Amit Baghel is a software architect with over 17 years of experience in design and development of enterprise applications and products around Java ecosystem. His current focus is on IoT, Cloud Computing, Big Data Solutions, Micro Services, DevOps and Continuous Integration and Delivery. Baghel  can be reached via e-mail.

Rate this Article

Adoption
Style

BT