13

I'm working on a Spark Streaming program which retrieves a Kafka stream, does very basic transformation on the stream and then inserts the data to a DB (voltdb if it's relevant). I'm trying to measure the rate in which I insert rows to the DB. I think metrics can be useful (using JMX). However I can't find how to add custom metrics to Spark. I've looked at Spark's source code and also found this thread however it doesn't work for me. I also enabled the JMX sink in the conf.metrics file. What's not working is I don't see my custom metrics with JConsole.

Could someone explain how to add custom metrics (preferably via JMX) to spark streaming? Or alternatively how to measure my insertion rate to my DB (specifically VoltDB)? I'm using spark with Java 8.

om-nom-nom
  • 62,329
  • 13
  • 183
  • 228
Gideon
  • 2,211
  • 5
  • 29
  • 47

5 Answers5

17

Ok after digging through the source code I found how to add my own custom metrics. It requires 3 things:

  1. Create my own custom source. Sort of like this
  2. Enable the Jmx sink in the spark metrics.properties file. The specific line I used is: *.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink which enable JmxSink for all instances
  3. Register my custom source in the SparkEnv metrics system. An example of how to do can be seen here - I actually viewed this link before but missed the registration part which prevented me from actually seeing my custom metrics in the JVisualVM

I'm still struggling with how to actually count the number of insertions into VoltDB because the code runs on the executors but that's a subject for a different topic :)

I hope this will help others

Gideon
  • 2,211
  • 5
  • 29
  • 47
  • Did you figure out on how to count anything from executors? I have similar usecase where I write to HTTP endpoint and I want to count bunch of things from executors, but counters won't budge. – K P Jul 01 '16 at 23:20
  • This was actually quite a long time ago but as far as I remember I sent my metrics from the executors using codahale metrics and they have a Graphite reporter and I just summed everything in graphite – Gideon Jul 04 '16 at 06:56
  • Ah ok, thanks for the response. My use case is a little different, writing my own Source and trying to send the events to internal metrics tool. – K P Jul 05 '16 at 17:12
  • Writing your own metrics is exactly like my answer, the issue is with sending them from the executors? – Gideon Jul 06 '16 at 07:05
  • when register my custom metric, I find that MetricsSystem is specified by `private[spark]`, I can't register my custom metric in the main function to MetricsSystem, how can you solve this? thanks – klion26 Oct 19 '16 at 07:00
  • I solved the `private[spark]` reference by adding a bridge class A, and A under the package org.apache.spark – klion26 Oct 19 '16 at 14:11
  • @klion26 its been a long time since I touched it but if I remember correctly I did the same thing. I wish they would make it a little bit more user friendly – Gideon Oct 20 '16 at 07:44
7

Groupon have a library called spark-metrics that lets you use a simple (Codahale-like) API on your executors and have the results collated back in the driver and automatically registered in Spark's existing metrics registry. These then get automatically exported along with Spark's built-in metrics when you configure a metric sink as per the Spark docs.

Martin McNulty
  • 2,601
  • 3
  • 22
  • 26
  • I'm obviously not stuck on this problem anymore but still, great to know there are some useful libraries for that kind of stuff. Thanks for the tip :) – Gideon Mar 28 '17 at 14:55
3

to insert rows from based on inserts from VoltDB, use accumulators - and then from your driver you can create a listener - maybe something like this to get you started

sparkContext.addSparkListener(new SparkListener() {
  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
    stageCompleted.stageInfo.accumulables.foreach { case (_, acc) => {

here you have access to those rows combined accumulators and then you can send to your sink..

robert towne
  • 191
  • 2
  • 9
  • I eventually went with gathering metrics for each executor and sending it to Graphana and summing all the information there. The listener is a cool idea though :) – Gideon Nov 09 '15 at 08:23
  • @Gideon can you expand on that? You say you ditched the normal metrics and did the work yourself or you got them to work? – Sebastian Piu Nov 24 '15 at 18:33
  • I didn't ditch the normal metrics. I added some of my own custom metrics on the Spark executors. The problem is that for these custom metrics I needed aggregated results (basically summing accumulators from the different executors) so what I did is send the data from each Spark executor to Graphana and aggregate the results there – Gideon Nov 25 '15 at 07:36
3

Below is a working example in Java.
It's tested with StreaminQuery (Unfortunately StreaminQuery does not have ootb metrics like StreamingContext till Spark 2.3.1).

Steps:

Define a custom source in the same package of Source class

package org.apache.spark.metrics.source;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.spark.sql.streaming.StreamingQueryProgress;

/**
 * Metrics source for structured streaming query.
 */
public class StreamingQuerySource implements Source {
    private String appName;
    private MetricRegistry metricRegistry = new MetricRegistry();
    private final Progress progress = new Progress();

    public StreamingQuerySource(String appName) {
        this.appName = appName;
        registerGuage("batchId", () -> progress.batchId());
        registerGuage("numInputRows", () -> progress.numInputRows());
        registerGuage("inputRowsPerSecond", () -> progress.inputRowsPerSecond());
        registerGuage("processedRowsPerSecond", () -> progress.processedRowsPerSecond());
    }

    private <T> Gauge<T> registerGuage(String name, Gauge<T> metric) {
        return metricRegistry.register(MetricRegistry.name(name), metric);
    }

    @Override
    public String sourceName() {
        return String.format("%s.streaming", appName);
    }


    @Override
    public MetricRegistry metricRegistry() {
        return metricRegistry;
    }

    public void updateProgress(StreamingQueryProgress queryProgress) {
        progress.batchId(queryProgress.batchId())
                .numInputRows(queryProgress.numInputRows())
                .inputRowsPerSecond(queryProgress.inputRowsPerSecond())
                .processedRowsPerSecond(queryProgress.processedRowsPerSecond());
    }

    @Data
    @Accessors(fluent = true)
    private static class Progress {
        private long batchId = -1;
        private long numInputRows = 0;
        private double inputRowsPerSecond = 0;
        private double processedRowsPerSecond = 0;
    }
}

Register the source right after SparkContext is created

    querySource = new StreamingQuerySource(getSparkSession().sparkContext().appName());
    SparkEnv.get().metricsSystem().registerSource(querySource);

Update data in StreamingQueryListener.onProgress(event)

  querySource.updateProgress(event.progress());

Config metrics.properties

*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=xxx
*.sink.graphite.port=9109
*.sink.graphite.period=10
*.sink.graphite.unit=seconds

# Enable jvm source for instance master, worker, driver and executor
master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

Sample output in graphite exporter (mapped to prometheus format)

streaming_query{application="local-1538032184639",model="model1",qty="batchId"} 38
streaming_query{application="local-1538032184639",model="model1r",qty="inputRowsPerSecond"} 2.5
streaming_query{application="local-1538032184639",model="model1",qty="numInputRows"} 5
streaming_query{application="local-1538032184639",model=model1",qty="processedRowsPerSecond"} 0.81
Leon
  • 3,124
  • 31
  • 36
2

here's an excellent tutorial which covers all the setps you need to setup Spark's MetricsSystem with Graphite. That should do the trick:

http://www.hammerlab.org/2015/02/27/monitoring-spark-with-graphite-and-grafana/

Erik Schmiegelow
  • 2,739
  • 1
  • 18
  • 22
  • Thanks Erik for your response, it's quite useful! but did you happen to add your own metrics in the application code? I'm not talking about stuff that are already monitored by spark but other things like rate of insertion of rows into VoltDB within each partition? (or any other custom made metrics in the code). I'm struggling with implementing custom measurements in my application – Gideon Sep 29 '15 at 16:35