From 86e318513a578f0e35b20cc14e9cf62bae1b96d4 Mon Sep 17 00:00:00 2001 From: Charles Yu Date: Sun, 8 Feb 2026 12:06:04 -0500 Subject: [PATCH 1/6] Track external accumulators in tracer instead of using SparkInfo values --- .../spark/AbstractDatadogSparkListener.java | 20 +- .../spark/SparkAggregatedTaskMetrics.java | 53 +++- .../instrumentation/spark/SparkSQLUtils.java | 17 +- .../spark/AbstractSpark24SqlTest.groovy | 46 ++- .../spark/AbstractSpark32SqlTest.groovy | 282 +++++++++++++----- .../spark/DatadogSpark212Listener.java | 36 +++ .../spark/DatadogSpark213Listener.java | 36 +++ 7 files changed, 399 insertions(+), 91 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index e490588e503..9e9ae33de07 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -40,6 +40,7 @@ import org.apache.spark.ExceptionFailure; import org.apache.spark.SparkConf; import org.apache.spark.TaskFailedReason; +import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.AccumulableInfo; import org.apache.spark.scheduler.JobFailed; import org.apache.spark.scheduler.SparkListener; @@ -64,6 +65,7 @@ import org.apache.spark.sql.streaming.StateOperatorProgress; import org.apache.spark.sql.streaming.StreamingQueryListener; import org.apache.spark.sql.streaming.StreamingQueryProgress; +import org.apache.spark.util.AccumulatorV2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; @@ -127,8 +129,10 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private final HashMap liveExecutors = new HashMap<>(); // There is no easy way to know if an accumulator is not useful anymore (meaning it is not part of - // an active SQL query) - // so capping the size of the collection storing them + // an active SQL query) so capping the size of the collection storing them + // TODO (CY): Is this potentially the reason why some Spark Plans aren't showing up consistently? + // If we know we don't need the accumulator values, can we drop all associated data and just map + // stage ID -> accumulator ID? Put this behind some FF private final Map accumulators = new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE); @@ -229,6 +233,12 @@ public void setupOpenLineage(DDTraceId traceId) { /** Parent Ids of a Stage. Provide an implementation based on a specific scala version */ protected abstract int[] getStageParentIds(StageInfo info); + /** + * All External Accumulators associated with a given task. Provide an implementation based on a + * specific scala version + */ + protected abstract List getExternalAccumulators(TaskMetrics metrics); + @Override public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) { this.applicationStart = applicationStart; @@ -671,7 +681,7 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl SparkPlanInfo sqlPlan = sqlPlans.get(sqlExecutionId); if (sqlPlan != null) { - SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageId); + SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageMetric, stageId); } span.finish(completionTimeMs * 1000); @@ -685,7 +695,9 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) { SparkAggregatedTaskMetrics stageMetric = stageMetrics.get(stageSpanKey); if (stageMetric != null) { - stageMetric.addTaskMetrics(taskEnd); + // Not happy that we have to extract external accumulators here, but needed as we're dealing + // with Seq which varies across Scala versions + stageMetric.addTaskMetrics(taskEnd, getExternalAccumulators(taskEnd.taskMetrics())); } if (taskEnd.taskMetrics() != null) { diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java index 757f20f75f5..5b0167a1da1 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java @@ -1,13 +1,20 @@ package datadog.trace.instrumentation.spark; +import com.fasterxml.jackson.core.JsonGenerator; import datadog.metrics.api.Histogram; import datadog.trace.api.Config; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.spark.TaskFailedReason; import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.sql.execution.metric.SQLMetricInfo; +import org.apache.spark.util.AccumulatorV2; class SparkAggregatedTaskMetrics { private static final double HISTOGRAM_RELATIVE_ACCURACY = 1 / 32.0; @@ -59,13 +66,17 @@ class SparkAggregatedTaskMetrics { private Histogram shuffleWriteBytesHistogram; private Histogram diskBytesSpilledHistogram; + // Used for Spark SQL Plan metrics ONLY, don't put in regular span for now + private Map externalAccumulableHistograms; + public SparkAggregatedTaskMetrics() {} public SparkAggregatedTaskMetrics(long availableExecutorTime) { this.previousAvailableExecutorTime = availableExecutorTime; } - public void addTaskMetrics(SparkListenerTaskEnd taskEnd) { + public void addTaskMetrics( + SparkListenerTaskEnd taskEnd, List externalAccumulators) { taskCompletedCount += 1; if (taskEnd.taskInfo().attemptNumber() > 0) { @@ -127,6 +138,31 @@ public void addTaskMetrics(SparkListenerTaskEnd taskEnd) { shuffleWriteBytesHistogram, taskMetrics.shuffleWriteMetrics().bytesWritten()); diskBytesSpilledHistogram = lazyHistogramAccept(diskBytesSpilledHistogram, taskMetrics.diskBytesSpilled()); + + // TODO (CY): Should we also look at TaskInfo accumulable update values as a backup? Is that + // only needed for SHS? + if (externalAccumulators != null && !externalAccumulators.isEmpty()) { + if (externalAccumulableHistograms == null) { + externalAccumulableHistograms = new HashMap<>(externalAccumulators.size()); + } + + externalAccumulators.forEach( + acc -> { + Histogram hist = externalAccumulableHistograms.get(acc.id()); + if (hist == null) { + hist = + Histogram.newHistogram(HISTOGRAM_RELATIVE_ACCURACY, HISTOGRAM_MAX_NUM_BINS); + } + + try { + // As of spark 3.5, all SQL metrics are Long, safeguard if it changes in new + // versions + hist.accept((Long) acc.value()); + externalAccumulableHistograms.put(acc.id(), hist); + } catch (ClassCastException ignored) { + } + }); + } } } } @@ -276,6 +312,21 @@ private Histogram lazyHistogramAccept(Histogram hist, double value) { return hist; } + // Used to put external accum metrics to JSON for Spark SQL plans + public void externalAccumToJson(JsonGenerator generator, SQLMetricInfo info) throws IOException { + if (externalAccumulableHistograms != null) { + Histogram hist = externalAccumulableHistograms.get(info.accumulatorId()); + String name = info.name(); + + if (name != null && hist != null) { + generator.writeStartObject(); + generator.writeStringField(name, histogramToBase64(hist)); + generator.writeStringField("type", info.metricType()); + generator.writeEndObject(); + } + } + } + public static long computeTaskRunTime(TaskMetrics metrics) { return metrics.executorDeserializeTime() + metrics.executorRunTime() diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java index 33718a4b0dc..7eff461a00a 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkSQLUtils.java @@ -24,6 +24,7 @@ public static void addSQLPlanToStageSpan( AgentSpan span, SparkPlanInfo plan, Map accumulators, + SparkAggregatedTaskMetrics stageMetric, int stageId) { Set parentStageIds = new HashSet<>(); SparkPlanInfoForStage planForStage = @@ -32,7 +33,7 @@ public static void addSQLPlanToStageSpan( span.setTag("_dd.spark.sql_parent_stage_ids", parentStageIds.toString()); if (planForStage != null) { - String json = planForStage.toJson(accumulators); + String json = planForStage.toJson(stageMetric); span.setTag("_dd.spark.sql_plan", json); } } @@ -143,7 +144,7 @@ public SparkPlanInfoForStage(SparkPlanInfo plan, List chi this.children = children; } - public String toJson(Map accumulators) { + public String toJson(SparkAggregatedTaskMetrics stageMetric) { // Using the jackson JSON lib used by spark // https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0 ObjectMapper mapper = @@ -151,7 +152,7 @@ public String toJson(Map accumulators) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (JsonGenerator generator = mapper.getFactory().createGenerator(baos)) { - this.toJson(generator, accumulators, mapper); + this.toJson(generator, mapper, stageMetric); } catch (IOException e) { return null; } @@ -160,7 +161,7 @@ public String toJson(Map accumulators) { } private void toJson( - JsonGenerator generator, Map accumulators, ObjectMapper mapper) + JsonGenerator generator, ObjectMapper mapper, SparkAggregatedTaskMetrics stageMetric) throws IOException { generator.writeStartObject(); generator.writeStringField("node", plan.nodeName()); @@ -199,11 +200,7 @@ private void toJson( generator.writeFieldName("metrics"); generator.writeStartArray(); for (SQLMetricInfo metric : metrics) { - long accumulatorId = metric.accumulatorId(); - AccumulatorWithStage acc = accumulators.get(accumulatorId); - if (acc != null) { - acc.toJson(generator, metric); - } + stageMetric.externalAccumToJson(generator, metric); } generator.writeEndArray(); } @@ -213,7 +210,7 @@ private void toJson( generator.writeFieldName("children"); generator.writeStartArray(); for (SparkPlanInfoForStage child : children) { - child.toJson(generator, accumulators, mapper); + child.toJson(generator, mapper, stageMetric); } generator.writeEndArray(); } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy index a34941d1be0..569e8c440e3 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy @@ -159,12 +159,12 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { // Each metric is a dict { "metric_name": "metric_value", "type": "metric_type" } expectedMetric.each { key, expectedValue -> - assert actualMetric.containsKey(key): prefix + "metric key \"$key\" not found in $actualMetric" + assert actualMetric.containsKey(key): prefix + "metric key \"$key\" not found in $actualMetric. \n\tactual: $actual.metrics, \n\texpected: $expected.metrics" // Some metric values are duration that will varies between runs // In the case, setting the expected value to "any" skips the assertion def actualValue = actualMetric[key] - assert expectedValue == "any" || actualValue == expectedValue: prefix + "value of metric key \"$key\" does not match \"$expectedValue\", got $actualValue" + assert expectedValue == "any" || actualValue == expectedValue: prefix + "value of metric key \"$key\" does not match \"$expectedValue\", got $actualValue. \n\tactual: $actual.metrics, \n\texpected: $expected.metrics" } } } @@ -296,9 +296,17 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "number of output rows": "any", "type": "sum" }, + { + "avg hash probe (min, med, max)": "any", + "type": "average" + }, { "peak memory total (min, med, max)": "any", "type": "size" + }, + { + "spill size total (min, med, max)": "any", + "type": "size" } ], "children": [ @@ -317,7 +325,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "number of output rows": 3, + "number of output rows": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", "type": "sum" } ] @@ -367,12 +375,16 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "type": "average" }, { - "number of output rows": 2, + "number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YFhoAIQAAAAAAAPA/", "type": "sum" }, { "peak memory total (min, med, max)": "any", "type": "size" + }, + { + "spill size total (min, med, max)": "any", + "type": "size" } ], "children": [ @@ -572,6 +584,18 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { { "number of output rows": "any", "type": "sum" + }, + { + "spill size total (min, med, max)": "any", + "type": "size" + }, + { + "avg hash probe (min, med, max)": "any", + "type": "average" + }, + { + "peak memory total (min, med, max)": "any", + "type": "size" } ], "children": [ @@ -628,6 +652,10 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "_dd.unparsed" : "any" }, "metrics": [ + { + "spill size total (min, med, max)": "any", + "type": "size" + }, { "peak memory total (min, med, max)": "any", "type": "size" @@ -676,9 +704,17 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "_dd.unparsed" : "any" }, "metrics": [ + { + "spill size total (min, med, max)": "any", + "type": "size" + }, { "peak memory total (min, med, max)": "any", "type": "size" + }, + { + "sort time total (min, med, max)": "any", + "type": "timing" } ], "children": [ @@ -731,7 +767,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "number of output rows": 1, + "number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YABoA", "type": "sum" } ], diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy index a80eb6ab1cf..bcd49144239 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy @@ -56,7 +56,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "size" }, { - "shuffle records written": 3, + "shuffle records written": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", "type": "sum" }, { @@ -93,16 +93,28 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "number of output rows": 3, + "number of sort fallback tasks": "any", "type": "sum" }, { - "peak memory": "any", - "type": "size" + "avg hash probe bucket list iters": "any", + "type": "average" }, { "time in aggregation build": "any", "type": "timing" + }, + { + "spill size": "any", + "type": "size" + }, + { + "number of output rows": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", + "type": "sum" + }, + { + "peak memory": "any", + "type": "size" } ], "children": [ @@ -116,7 +128,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "number of output rows": 3, + "number of output rows": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", "type": "sum" } ] @@ -156,21 +168,29 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "resultExpressions" : [ "string_col#0", "avg(double_col#1)#4 AS avg(double_col)#5" ] }, "metrics": [ + { + "number of sort fallback tasks": "any", + "type": "sum" + }, { "avg hash probe bucket list iters": "any", "type": "average" }, { - "number of output rows": 2, - "type": "sum" + "time in aggregation build": "any", + "type": "timing" }, { - "peak memory": "any", + "spill size": "any", "type": "size" }, { - "time in aggregation build": "any", - "type": "timing" + "number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YFhoA", + "type": "sum" + }, + { + "peak memory": "any", + "type": "size" } ], "children": [ @@ -214,7 +234,11 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "data size": "any", + "records read": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YIhoA", + "type": "sum" + }, + { + "remote bytes read to disk": "any", "type": "size" }, { @@ -222,28 +246,40 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "timing" }, { - "local blocks read": "any", - "type": "sum" + "shuffle bytes written": "any", + "type": "size" + }, + { + "data size": "any", + "type": "size" }, { "local bytes read": "any", "type": "size" }, { - "records read": 3, - "type": "sum" + "shuffle write time": "any", + "type": "nsTiming" }, { - "shuffle bytes written": "any", + "remote bytes read": "any", "type": "size" }, { - "shuffle records written": 3, + "local blocks read": "any", "type": "sum" }, { - "shuffle write time": "any", - "type": "nsTiming" + "remote blocks read": "any", + "type": "sum" + }, + { + "shuffle records written": "any", + "type": "sum" + }, + { + "number of partitions": "any", + "type": "sum" } ] } @@ -283,7 +319,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "size" }, { - "shuffle records written": 2, + "shuffle records written": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YFhoA", "type": "sum" }, { @@ -319,10 +355,22 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "resultExpressions" : [ "string_col#0", "avg(double_col#1)#4 AS avg(double_col)#5" ] }, "metrics": [ + { + "number of sort fallback tasks": "any", + "type": "sum" + }, { "avg hash probe bucket list iters": "any", "type": "average" }, + { + "time in aggregation build": "any", + "type": "timing" + }, + { + "spill size": "any", + "type": "size" + }, { "number of output rows": "any", "type": "sum" @@ -330,10 +378,6 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { { "peak memory": "any", "type": "size" - }, - { - "time in aggregation build": "any", - "type": "timing" } ], "children": [ @@ -377,7 +421,11 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "data size": "any", + "records read": "any", + "type": "sum" + }, + { + "remote bytes read to disk": "any", "type": "size" }, { @@ -385,28 +433,40 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "timing" }, { - "local blocks read": "any", - "type": "sum" + "shuffle bytes written": "any", + "type": "size" + }, + { + "data size": "any", + "type": "size" }, { "local bytes read": "any", "type": "size" }, { - "records read": "any", - "type": "sum" + "shuffle write time": "any", + "type": "nsTiming" }, { - "shuffle bytes written": "any", + "remote bytes read": "any", "type": "size" }, + { + "local blocks read": "any", + "type": "sum" + }, + { + "remote blocks read": "any", + "type": "sum" + }, { "shuffle records written": "any", "type": "sum" }, { - "shuffle write time": "any", - "type": "nsTiming" + "number of partitions": "any", + "type": "sum" } ] } @@ -509,7 +569,11 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "data size": "any", + "records read": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YFhoA", + "type": "sum" + }, + { + "remote bytes read to disk": "any", "type": "size" }, { @@ -517,28 +581,40 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "timing" }, { - "local blocks read": "any", - "type": "sum" + "shuffle bytes written": "any", + "type": "size" + }, + { + "data size": "any", + "type": "size" }, { "local bytes read": "any", "type": "size" }, { - "records read": 2, - "type": "sum" + "shuffle write time": "any", + "type": "nsTiming" }, { - "shuffle bytes written": "any", + "remote bytes read": "any", "type": "size" }, { - "shuffle records written": 2, + "local blocks read": "any", "type": "sum" }, { - "shuffle write time": "any", - "type": "nsTiming" + "remote blocks read": "any", + "type": "sum" + }, + { + "shuffle records written": "CgkJCCGEEEII8T8SABoAIQAAAAAAAPA/", + "type": "sum" + }, + { + "number of partitions": "any", + "type": "sum" } ] } @@ -774,7 +850,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "size" }, { - "shuffle records written": 1, + "shuffle records written": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YABoA", "type": "sum" }, { @@ -811,12 +887,28 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "number of output rows": 1, + "number of sort fallback tasks": "any", "type": "sum" }, + { + "avg hash probe bucket list iters": "any", + "type": "average" + }, { "time in aggregation build": "any", "type": "timing" + }, + { + "spill size": "any", + "type": "size" + }, + { + "number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YABoA", + "type": "sum" + }, + { + "peak memory": "any", + "type": "size" } ], "children": [ @@ -928,7 +1020,11 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "data size": "any", + "records read": "any", + "type": "sum" + }, + { + "remote bytes read to disk": "any", "type": "size" }, { @@ -936,28 +1032,40 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "timing" }, { - "local blocks read": "any", - "type": "sum" + "shuffle bytes written": "any", + "type": "size" + }, + { + "data size": "any", + "type": "size" }, { "local bytes read": "any", "type": "size" }, { - "records read": "any", - "type": "sum" + "shuffle write time": "any", + "type": "nsTiming" }, { - "shuffle bytes written": "any", + "remote bytes read": "any", "type": "size" }, + { + "local blocks read": "any", + "type": "sum" + }, + { + "remote blocks read": "any", + "type": "sum" + }, { "shuffle records written": "any", "type": "sum" }, { - "shuffle write time": "any", - "type": "nsTiming" + "number of partitions": "any", + "type": "sum" } ] } @@ -1054,7 +1162,11 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "data size": "any", + "records read": "any", + "type": "sum" + }, + { + "remote bytes read to disk": "any", "type": "size" }, { @@ -1062,28 +1174,40 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "timing" }, { - "local blocks read": "any", - "type": "sum" + "shuffle bytes written": "any", + "type": "size" + }, + { + "data size": "any", + "type": "size" }, { "local bytes read": "any", "type": "size" }, { - "records read": "any", - "type": "sum" + "shuffle write time": "any", + "type": "nsTiming" }, { - "shuffle bytes written": "any", + "remote bytes read": "any", "type": "size" }, + { + "local blocks read": "any", + "type": "sum" + }, + { + "remote blocks read": "any", + "type": "sum" + }, { "shuffle records written": "any", "type": "sum" }, { - "shuffle write time": "any", - "type": "nsTiming" + "number of partitions": "any", + "type": "sum" } ] } @@ -1138,13 +1262,13 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "resultExpressions" : [ "count(1)#42L AS count#43L" ] }, "metrics": [ - { - "number of output rows": "any", - "type": "sum" - }, { "time in aggregation build": "any", "type": "timing" + }, + { + "number of output rows": "any", + "type": "sum" } ], "children": [ @@ -1173,7 +1297,11 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { }, "metrics": [ { - "data size": "any", + "records read": "any", + "type": "sum" + }, + { + "remote bytes read to disk": "any", "type": "size" }, { @@ -1181,28 +1309,40 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "type": "timing" }, { - "local blocks read": "any", - "type": "sum" + "shuffle bytes written": "any", + "type": "size" + }, + { + "data size": "any", + "type": "size" }, { "local bytes read": "any", "type": "size" }, { - "records read": "any", - "type": "sum" + "shuffle write time": "any", + "type": "nsTiming" }, { - "shuffle bytes written": "any", + "remote bytes read": "any", "type": "size" }, + { + "local blocks read": "any", + "type": "sum" + }, + { + "remote blocks read": "any", + "type": "sum" + }, { "shuffle records written": "any", "type": "sum" }, { - "shuffle write time": "any", - "type": "nsTiming" + "number of partitions": "any", + "type": "sum" } ] } diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java index fdae211077e..8c6374fdaaa 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java @@ -1,14 +1,20 @@ package datadog.trace.instrumentation.spark; +import datadog.trace.util.MethodHandles; +import java.lang.invoke.MethodHandle; import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.spark.SparkConf; +import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.StageInfo; import org.apache.spark.sql.execution.SparkPlanInfo; import org.apache.spark.sql.execution.metric.SQLMetricInfo; +import org.apache.spark.util.AccumulatorV2; +import scala.Function1; import scala.collection.JavaConverters; +import scala.collection.mutable.ArrayBuffer; /** * DatadogSparkListener compiled for Scala 2.12 @@ -17,6 +23,13 @@ * compiled with the specific scala version */ public class DatadogSpark212Listener extends AbstractDatadogSparkListener { + private static final MethodHandles methodLoader = + new MethodHandles(ClassLoader.getSystemClassLoader()); + private static final MethodHandle externalAccums = + methodLoader.method(TaskMetrics.class, "externalAccums"); + private static final MethodHandle withExternalAccums = + methodLoader.method(TaskMetrics.class, "withExternalAccums", new Class[] {Function1.class}); + public DatadogSpark212Listener(SparkConf sparkConf, String appId, String sparkVersion) { super(sparkConf, appId, sparkVersion); } @@ -62,4 +75,27 @@ protected int[] getStageParentIds(StageInfo info) { return parentIds; } + + @Override + protected List getExternalAccumulators(TaskMetrics metrics) { + if (metrics == null) { + return null; + } + + Function1 lambda = + (Function1, List>) + accumulators -> JavaConverters.seqAsJavaList(accumulators); + List res = methodLoader.invoke(withExternalAccums, metrics, lambda); + if (res != null) { + return res; + } + + // withExternalAccums didn't work, try the legacy method + ArrayBuffer accumulators = methodLoader.invoke(externalAccums, metrics); + if (accumulators != null && !accumulators.isEmpty()) { + return JavaConverters.seqAsJavaList(accumulators); + } + + return null; + } } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java index 115cdcbb9b0..498503f73a6 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java @@ -1,13 +1,19 @@ package datadog.trace.instrumentation.spark; +import datadog.trace.util.MethodHandles; +import java.lang.invoke.MethodHandle; import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.spark.SparkConf; +import org.apache.spark.executor.TaskMetrics; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.StageInfo; import org.apache.spark.sql.execution.SparkPlanInfo; import org.apache.spark.sql.execution.metric.SQLMetricInfo; +import org.apache.spark.util.AccumulatorV2; +import scala.Function1; +import scala.collection.mutable.ArrayBuffer; import scala.jdk.javaapi.CollectionConverters; /** @@ -17,6 +23,13 @@ * compiled with the specific scala version */ public class DatadogSpark213Listener extends AbstractDatadogSparkListener { + private static final MethodHandles methodLoader = + new MethodHandles(ClassLoader.getSystemClassLoader()); + private static final MethodHandle externalAccums = + methodLoader.method(TaskMetrics.class, "externalAccums"); + private static final MethodHandle withExternalAccums = + methodLoader.method(TaskMetrics.class, "withExternalAccums", new Class[] {Function1.class}); + public DatadogSpark213Listener(SparkConf sparkConf, String appId, String sparkVersion) { super(sparkConf, appId, sparkVersion); } @@ -62,4 +75,27 @@ protected int[] getStageParentIds(StageInfo info) { return parentIds; } + + @Override + protected List getExternalAccumulators(TaskMetrics metrics) { + if (metrics == null) { + return null; + } + + Function1 lambda = + (Function1, List>) + accumulators -> CollectionConverters.asJava(accumulators); + List res = methodLoader.invoke(withExternalAccums, metrics, lambda); + if (res != null) { + return res; + } + + // withExternalAccums didn't work, try the legacy method + ArrayBuffer accumulators = methodLoader.invoke(externalAccums, metrics); + if (accumulators != null && !accumulators.isEmpty()) { + return CollectionConverters.asJava(accumulators); + } + + return null; + } } From 3128df8a0765d05769e82a56cc153e9a9cb0565a Mon Sep 17 00:00:00 2001 From: Charles Yu Date: Tue, 24 Feb 2026 15:03:37 -0500 Subject: [PATCH 2/6] Create and implement getSum --- .../src/main/java/datadog/metrics/api/Histogram.java | 2 ++ .../src/main/java/datadog/metrics/api/NoOpHistogram.java | 5 +++++ .../java/datadog/metrics/impl/DDSketchHistogram.java | 9 +++++++++ 3 files changed, 16 insertions(+) diff --git a/products/metrics/metrics-api/src/main/java/datadog/metrics/api/Histogram.java b/products/metrics/metrics-api/src/main/java/datadog/metrics/api/Histogram.java index 1cbdcf0d5ac..fc43e1ab802 100644 --- a/products/metrics/metrics-api/src/main/java/datadog/metrics/api/Histogram.java +++ b/products/metrics/metrics-api/src/main/java/datadog/metrics/api/Histogram.java @@ -5,6 +5,8 @@ public interface Histogram { + double getSum(); + double getCount(); boolean isEmpty(); diff --git a/products/metrics/metrics-api/src/main/java/datadog/metrics/api/NoOpHistogram.java b/products/metrics/metrics-api/src/main/java/datadog/metrics/api/NoOpHistogram.java index 6645e3bc402..d7ae7829386 100644 --- a/products/metrics/metrics-api/src/main/java/datadog/metrics/api/NoOpHistogram.java +++ b/products/metrics/metrics-api/src/main/java/datadog/metrics/api/NoOpHistogram.java @@ -12,6 +12,11 @@ public double getCount() { return 0; } + @Override + public double getSum() { + return 0; + } + @Override public boolean isEmpty() { return true; diff --git a/products/metrics/metrics-lib/src/main/java/datadog/metrics/impl/DDSketchHistogram.java b/products/metrics/metrics-lib/src/main/java/datadog/metrics/impl/DDSketchHistogram.java index 645c6900c32..83750304267 100644 --- a/products/metrics/metrics-lib/src/main/java/datadog/metrics/impl/DDSketchHistogram.java +++ b/products/metrics/metrics-lib/src/main/java/datadog/metrics/impl/DDSketchHistogram.java @@ -12,9 +12,11 @@ /** Wrapper around the DDSketch library so that it can be used in an instrumentation */ public class DDSketchHistogram implements Histogram { private final DDSketch sketch; + private double sum; public DDSketchHistogram(DDSketch sketch) { this.sketch = sketch; + this.sum = 0; } @Override @@ -22,6 +24,11 @@ public double getCount() { return sketch.getCount(); } + @Override + public double getSum() { + return sum; + } + @Override public boolean isEmpty() { return sketch.isEmpty(); @@ -30,11 +37,13 @@ public boolean isEmpty() { @Override public void accept(double value) { sketch.accept(value); + sum += value; } @Override public void accept(double value, double count) { sketch.accept(value, count); + sum += value * count; } @Override From bba18ebdc4192ecd2efe0039e6d16bcc1edddacb Mon Sep 17 00:00:00 2001 From: Charles Yu Date: Tue, 24 Feb 2026 15:04:41 -0500 Subject: [PATCH 3/6] Send summed SQL plan metric values --- .../spark/SparkAggregatedTaskMetrics.java | 1 + .../spark/AbstractSpark24SqlTest.groovy | 38 ++++- .../spark/AbstractSpark32SqlTest.groovy | 139 ++++++++++++++++++ 3 files changed, 177 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java index 5b0167a1da1..e3bb75a8ba5 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java @@ -321,6 +321,7 @@ public void externalAccumToJson(JsonGenerator generator, SQLMetricInfo info) thr if (name != null && hist != null) { generator.writeStartObject(); generator.writeStringField(name, histogramToBase64(hist)); + generator.writeNumberField("sum", hist.getSum()); generator.writeStringField("type", info.metricType()); generator.writeEndObject(); } diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy index 569e8c440e3..18f55459cb5 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark24SqlTest.groovy @@ -157,7 +157,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { assert expectedMetric.size() == actualMetric.size(): prefix + "metric size of $expectedMetric does not match $actualMetric" - // Each metric is a dict { "metric_name": "metric_value", "type": "metric_type" } + // Each metric is a dict { "metric_name": "metric_value", "type": "metric_type", "sum": "metric_sum" } expectedMetric.each { key, expectedValue -> assert actualMetric.containsKey(key): prefix + "metric key \"$key\" not found in $actualMetric. \n\tactual: $actual.metrics, \n\texpected: $expected.metrics" @@ -259,6 +259,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "data size total (min, med, max)": "any", + "sum": "any", "type": "size" } ], @@ -270,6 +271,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "duration total (min, med, max)": "any", + "sum": "any", "type": "timing" } ], @@ -290,22 +292,27 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "aggregate time total (min, med, max)": "any", + "sum": "any", "type": "timing" }, { "number of output rows": "any", + "sum": "any", "type": "sum" }, { "avg hash probe (min, med, max)": "any", + "sum": "any", "type": "average" }, { "peak memory total (min, med, max)": "any", + "sum": "any", "type": "size" }, { "spill size total (min, med, max)": "any", + "sum": "any", "type": "size" } ], @@ -326,6 +333,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "number of output rows": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", + "sum": 3, "type": "sum" } ] @@ -348,6 +356,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "duration total (min, med, max)": "any", + "sum": "any", "type": "timing" } ], @@ -368,22 +377,27 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "aggregate time total (min, med, max)": "any", + "sum": "any", "type": "timing" }, { "avg hash probe (min, med, max)": "any", + "sum": "any", "type": "average" }, { "number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YFhoAIQAAAAAAAPA/", + "sum": 2, "type": "sum" }, { "peak memory total (min, med, max)": "any", + "sum": "any", "type": "size" }, { "spill size total (min, med, max)": "any", + "sum": "any", "type": "size" } ], @@ -473,6 +487,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "data size total (min, med, max)": "any", + "sum": "any", "type": "size" } ], @@ -488,6 +503,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "number of output rows": "any", + "sum": "any", "type": "sum" } ] @@ -513,6 +529,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "data size total (min, med, max)": "any", + "sum": "any", "type": "size" } ], @@ -528,6 +545,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "number of output rows": "any", + "sum": "any", "type": "sum" } ] @@ -548,6 +566,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "data size total (min, med, max)": "any", + "sum": "any", "type": "size" } ], @@ -559,6 +578,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "duration total (min, med, max)": "any", + "sum": "any", "type": "timing" } ], @@ -579,22 +599,27 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "aggregate time total (min, med, max)": "any", + "sum": "any", "type": "timing" }, { "number of output rows": "any", + "sum": "any", "type": "sum" }, { "spill size total (min, med, max)": "any", + "sum": "any", "type": "size" }, { "avg hash probe (min, med, max)": "any", + "sum": "any", "type": "average" }, { "peak memory total (min, med, max)": "any", + "sum": "any", "type": "size" } ], @@ -621,6 +646,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "number of output rows": "any", + "sum": "any", "type": "sum" } ], @@ -637,6 +663,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "duration total (min, med, max)": "any", + "sum": "any", "type": "timing" } ], @@ -654,14 +681,17 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "spill size total (min, med, max)": "any", + "sum": "any", "type": "size" }, { "peak memory total (min, med, max)": "any", + "sum": "any", "type": "size" }, { "sort time total (min, med, max)": "any", + "sum": "any", "type": "timing" } ], @@ -689,6 +719,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "duration total (min, med, max)": "any", + "sum": "any", "type": "timing" } ], @@ -706,14 +737,17 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "spill size total (min, med, max)": "any", + "sum": "any", "type": "size" }, { "peak memory total (min, med, max)": "any", + "sum": "any", "type": "size" }, { "sort time total (min, med, max)": "any", + "sum": "any", "type": "timing" } ], @@ -748,6 +782,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "duration total (min, med, max)": "any", + "sum": "any", "type": "timing" } ], @@ -768,6 +803,7 @@ abstract class AbstractSpark24SqlTest extends InstrumentationSpecification { "metrics": [ { "number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YABoA", + "sum": 1, "type": "sum" } ], diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy index bcd49144239..597052e32d2 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSpark32SqlTest.groovy @@ -49,18 +49,22 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "data size": "any", + "sum": "any", "type": "size" }, { "shuffle bytes written": "any", + "sum": "any", "type": "size" }, { "shuffle records written": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", + "sum": 3, "type": "sum" }, { "shuffle write time": "any", + "sum": "any", "type": "nsTiming" } ], @@ -72,6 +76,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "duration": "any", + "sum": "any", "type": "timing" } ], @@ -94,26 +99,32 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "number of sort fallback tasks": "any", + "sum": "any", "type": "sum" }, { "avg hash probe bucket list iters": "any", + "sum": "any", "type": "average" }, { "time in aggregation build": "any", + "sum": "any", "type": "timing" }, { "spill size": "any", + "sum": "any", "type": "size" }, { "number of output rows": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", + "sum": 3, "type": "sum" }, { "peak memory": "any", + "sum": "any", "type": "size" } ], @@ -129,6 +140,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "number of output rows": "CgkJCCGEEEII8T8SZBJgAAAAAAAA8D8AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAPA/GAAaAA==", + "sum": 3, "type": "sum" } ] @@ -148,6 +160,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "duration": "any", + "sum": "any", "type": "timing" } ], @@ -170,26 +183,32 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "number of sort fallback tasks": "any", + "sum": "any", "type": "sum" }, { "avg hash probe bucket list iters": "any", + "sum": "any", "type": "average" }, { "time in aggregation build": "any", + "sum": "any", "type": "timing" }, { "spill size": "any", + "sum": "any", "type": "size" }, { "number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YFhoA", + "sum": 2.0, "type": "sum" }, { "peak memory": "any", + "sum": "any", "type": "size" } ], @@ -235,50 +254,62 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "records read": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YIhoA", + "sum": 3.0, "type": "sum" }, { "remote bytes read to disk": "any", + "sum": "any", "type": "size" }, { "fetch wait time": "any", + "sum": "any", "type": "timing" }, { "shuffle bytes written": "any", + "sum": "any", "type": "size" }, { "data size": "any", + "sum": "any", "type": "size" }, { "local bytes read": "any", + "sum": "any", "type": "size" }, { "shuffle write time": "any", + "sum": "any", "type": "nsTiming" }, { "remote bytes read": "any", + "sum": "any", "type": "size" }, { "local blocks read": "any", + "sum": "any", "type": "sum" }, { "remote blocks read": "any", + "sum": "any", "type": "sum" }, { "shuffle records written": "any", + "sum": "any", "type": "sum" }, { "number of partitions": "any", + "sum": "any", "type": "sum" } ] @@ -312,18 +343,22 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "data size": "any", + "sum": "any", "type": "size" }, { "shuffle bytes written": "any", + "sum": "any", "type": "size" }, { "shuffle records written": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YFhoA", + "sum": 2.0, "type": "sum" }, { "shuffle write time": "any", + "sum": "any", "type": "nsTiming" } ], @@ -335,6 +370,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "duration": "any", + "sum": "any", "type": "timing" } ], @@ -357,26 +393,32 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "number of sort fallback tasks": "any", + "sum": "any", "type": "sum" }, { "avg hash probe bucket list iters": "any", + "sum": "any", "type": "average" }, { "time in aggregation build": "any", + "sum": "any", "type": "timing" }, { "spill size": "any", + "sum": "any", "type": "size" }, { "number of output rows": "any", + "sum": "any", "type": "sum" }, { "peak memory": "any", + "sum": "any", "type": "size" } ], @@ -422,50 +464,62 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "records read": "any", + "sum": "any", "type": "sum" }, { "remote bytes read to disk": "any", + "sum": "any", "type": "size" }, { "fetch wait time": "any", + "sum": "any", "type": "timing" }, { "shuffle bytes written": "any", + "sum": "any", "type": "size" }, { "data size": "any", + "sum": "any", "type": "size" }, { "local bytes read": "any", + "sum": "any", "type": "size" }, { "shuffle write time": "any", + "sum": "any", "type": "nsTiming" }, { "remote bytes read": "any", + "sum": "any", "type": "size" }, { "local blocks read": "any", + "sum": "any", "type": "sum" }, { "remote blocks read": "any", + "sum": "any", "type": "sum" }, { "shuffle records written": "any", + "sum": "any", "type": "sum" }, { "number of partitions": "any", + "sum": "any", "type": "sum" } ] @@ -491,6 +545,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "duration": "any", + "sum": "any", "type": "timing" } ], @@ -517,14 +572,17 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "peak memory": "any", + "sum": "any", "type": "size" }, { "sort time": "any", + "sum": "any", "type": "timing" }, { "spill size": "any", + "sum": "any", "type": "size" } ], @@ -570,50 +628,62 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "records read": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YFhoA", + "sum": 2.0, "type": "sum" }, { "remote bytes read to disk": "any", + "sum": "any", "type": "size" }, { "fetch wait time": "any", + "sum": "any", "type": "timing" }, { "shuffle bytes written": "any", + "sum": "any", "type": "size" }, { "data size": "any", + "sum": "any", "type": "size" }, { "local bytes read": "any", + "sum": "any", "type": "size" }, { "shuffle write time": "any", + "sum": "any", "type": "nsTiming" }, { "remote bytes read": "any", + "sum": "any", "type": "size" }, { "local blocks read": "any", + "sum": "any", "type": "sum" }, { "remote blocks read": "any", + "sum": "any", "type": "sum" }, { "shuffle records written": "CgkJCCGEEEII8T8SABoAIQAAAAAAAPA/", + "sum": 0.0, "type": "sum" }, { "number of partitions": "any", + "sum": "any", "type": "sum" } ] @@ -744,18 +814,22 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "data size": "any", + "sum": "any", "type": "size" }, { "shuffle bytes written": "any", + "sum": "any", "type": "size" }, { "shuffle records written": "any", + "sum": "any", "type": "sum" }, { "shuffle write time": "any", + "sum": "any", "type": "nsTiming" } ], @@ -771,6 +845,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "number of output rows": "any", + "sum": "any", "type": "sum" } ] @@ -796,18 +871,22 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "data size": "any", + "sum": "any", "type": "size" }, { "shuffle bytes written": "any", + "sum": "any", "type": "size" }, { "shuffle records written": "any", + "sum": "any", "type": "sum" }, { "shuffle write time": "any", + "sum": "any", "type": "nsTiming" } ], @@ -823,6 +902,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "number of output rows": "any", + "sum": "any", "type": "sum" } ] @@ -843,18 +923,22 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "data size": "any", + "sum": "any", "type": "size" }, { "shuffle bytes written": "any", + "sum": "any", "type": "size" }, { "shuffle records written": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YABoA", + "sum": 1, "type": "sum" }, { "shuffle write time": "any", + "sum": "any", "type": "nsTiming" } ], @@ -866,6 +950,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "duration": "any", + "sum": "any", "type": "timing" } ], @@ -888,26 +973,32 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "number of sort fallback tasks": "any", + "sum": "any", "type": "sum" }, { "avg hash probe bucket list iters": "any", + "sum": "any", "type": "average" }, { "time in aggregation build": "any", + "sum": "any", "type": "timing" }, { "spill size": "any", + "sum": "any", "type": "size" }, { "number of output rows": "CgkJCCGEEEII8T8SDBIIAAAAAAAA8D8YABoA", + "sum": 1, "type": "sum" }, { "peak memory": "any", + "sum": "any", "type": "size" } ], @@ -935,6 +1026,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "number of output rows": "any", + "sum": "any", "type": "sum" } ], @@ -951,6 +1043,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "duration": "any", + "sum": "any", "type": "timing" } ], @@ -968,14 +1061,17 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "peak memory": "any", + "sum": "any", "type": "size" }, { "sort time": "any", + "sum": "any", "type": "timing" }, { "spill size": "any", + "sum": "any", "type": "size" } ], @@ -1021,50 +1117,62 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "records read": "any", + "sum": "any", "type": "sum" }, { "remote bytes read to disk": "any", + "sum": "any", "type": "size" }, { "fetch wait time": "any", + "sum": "any", "type": "timing" }, { "shuffle bytes written": "any", + "sum": "any", "type": "size" }, { "data size": "any", + "sum": "any", "type": "size" }, { "local bytes read": "any", + "sum": "any", "type": "size" }, { "shuffle write time": "any", + "sum": "any", "type": "nsTiming" }, { "remote bytes read": "any", + "sum": "any", "type": "size" }, { "local blocks read": "any", + "sum": "any", "type": "sum" }, { "remote blocks read": "any", + "sum": "any", "type": "sum" }, { "shuffle records written": "any", + "sum": "any", "type": "sum" }, { "number of partitions": "any", + "sum": "any", "type": "sum" } ] @@ -1093,6 +1201,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "duration": "any", + "sum": "any", "type": "timing" } ], @@ -1110,14 +1219,17 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "peak memory": "any", + "sum": "any", "type": "size" }, { "sort time": "any", + "sum": "any", "type": "timing" }, { "spill size": "any", + "sum": "any", "type": "size" } ], @@ -1163,50 +1275,62 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "records read": "any", + "sum": "any", "type": "sum" }, { "remote bytes read to disk": "any", + "sum": "any", "type": "size" }, { "fetch wait time": "any", + "sum": "any", "type": "timing" }, { "shuffle bytes written": "any", + "sum": "any", "type": "size" }, { "data size": "any", + "sum": "any", "type": "size" }, { "local bytes read": "any", + "sum": "any", "type": "size" }, { "shuffle write time": "any", + "sum": "any", "type": "nsTiming" }, { "remote bytes read": "any", + "sum": "any", "type": "size" }, { "local blocks read": "any", + "sum": "any", "type": "sum" }, { "remote blocks read": "any", + "sum": "any", "type": "sum" }, { "shuffle records written": "any", + "sum": "any", "type": "sum" }, { "number of partitions": "any", + "sum": "any", "type": "sum" } ] @@ -1242,6 +1366,7 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "duration": "any", + "sum": "any", "type": "timing" } ], @@ -1264,10 +1389,12 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "time in aggregation build": "any", + "sum": "any", "type": "timing" }, { "number of output rows": "any", + "sum": "any", "type": "sum" } ], @@ -1298,50 +1425,62 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification { "metrics": [ { "records read": "any", + "sum": "any", "type": "sum" }, { "remote bytes read to disk": "any", + "sum": "any", "type": "size" }, { "fetch wait time": "any", + "sum": "any", "type": "timing" }, { "shuffle bytes written": "any", + "sum": "any", "type": "size" }, { "data size": "any", + "sum": "any", "type": "size" }, { "local bytes read": "any", + "sum": "any", "type": "size" }, { "shuffle write time": "any", + "sum": "any", "type": "nsTiming" }, { "remote bytes read": "any", + "sum": "any", "type": "size" }, { "local blocks read": "any", + "sum": "any", "type": "sum" }, { "remote blocks read": "any", + "sum": "any", "type": "sum" }, { "shuffle records written": "any", + "sum": "any", "type": "sum" }, { "number of partitions": "any", + "sum": "any", "type": "sum" } ] From 7e4b7dec9942cb07b1292c507811e363c6ef3f69 Mon Sep 17 00:00:00 2001 From: Charles Yu Date: Wed, 4 Mar 2026 10:32:19 -0500 Subject: [PATCH 4/6] Limit external accumulators to 5,000 per stage --- .../instrumentation/spark/SparkAggregatedTaskMetrics.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java index e3bb75a8ba5..f9ef35ef377 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java @@ -7,7 +7,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Base64; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.spark.TaskFailedReason; @@ -19,6 +18,7 @@ class SparkAggregatedTaskMetrics { private static final double HISTOGRAM_RELATIVE_ACCURACY = 1 / 32.0; private static final int HISTOGRAM_MAX_NUM_BINS = 512; + private static final int MAX_ACCUMULATOR_SIZE = 5000; private final boolean isSparkTaskHistogramEnabled = Config.get().isSparkTaskHistogramEnabled(); private long executorDeserializeTime = 0L; @@ -143,7 +143,7 @@ public void addTaskMetrics( // only needed for SHS? if (externalAccumulators != null && !externalAccumulators.isEmpty()) { if (externalAccumulableHistograms == null) { - externalAccumulableHistograms = new HashMap<>(externalAccumulators.size()); + externalAccumulableHistograms = new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE); } externalAccumulators.forEach( From f313fa1fc866926586643f329c7651cd220d9af9 Mon Sep 17 00:00:00 2001 From: Charles Yu Date: Tue, 10 Mar 2026 10:52:55 -0400 Subject: [PATCH 5/6] Use compensated sum to limit rounding errors --- .../metrics/impl/DDSketchHistogram.java | 32 +++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/products/metrics/metrics-lib/src/main/java/datadog/metrics/impl/DDSketchHistogram.java b/products/metrics/metrics-lib/src/main/java/datadog/metrics/impl/DDSketchHistogram.java index 83750304267..efe21d2c1bf 100644 --- a/products/metrics/metrics-lib/src/main/java/datadog/metrics/impl/DDSketchHistogram.java +++ b/products/metrics/metrics-lib/src/main/java/datadog/metrics/impl/DDSketchHistogram.java @@ -13,10 +13,16 @@ public class DDSketchHistogram implements Histogram { private final DDSketch sketch; private double sum; + // We use a compensated sum to avoid accumulating rounding errors. + // See https://en.wikipedia.org/wiki/Kahan_summation_algorithm. + private double sumCompensation; // Low order bits of sum + private double simpleSum; // Used to compute right sum for non-finite inputs public DDSketchHistogram(DDSketch sketch) { this.sketch = sketch; this.sum = 0; + this.simpleSum = 0; + this.sumCompensation = 0; } @Override @@ -26,7 +32,15 @@ public double getCount() { @Override public double getSum() { - return sum; + // Better error bounds to add both terms as the final sum + final double tmp = sum + sumCompensation; + if (Double.isNaN(tmp) && Double.isInfinite(simpleSum)) { + // If the compensated sum is spuriously NaN from accumulating one or more same-signed infinite + // values, return the correctly-signed infinity stored in simpleSum. + return simpleSum; + } else { + return tmp; + } } @Override @@ -37,13 +51,13 @@ public boolean isEmpty() { @Override public void accept(double value) { sketch.accept(value); - sum += value; + updateSum(value); } @Override public void accept(double value, double count) { sketch.accept(value, count); - sum += value * count; + updateSum(value * count); } @Override @@ -112,4 +126,16 @@ public void clear() { public ByteBuffer serialize() { return sketch.serialize(); } + + private void updateSum(double value) { + simpleSum += value; + sumWithCompensation(value); + } + + private void sumWithCompensation(double value) { + final double tmp = value - sumCompensation; + final double velvel = sum + tmp; // Little wolf of rounding error + sumCompensation = (velvel - sum) - tmp; + sum = velvel; + } } From b203263f9756f38139f1eca2aeda0e41859a15c4 Mon Sep 17 00:00:00 2001 From: Charles Yu Date: Tue, 10 Mar 2026 11:01:40 -0400 Subject: [PATCH 6/6] Cast to Number type instead of Long --- .../trace/instrumentation/spark/SparkAggregatedTaskMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java index f9ef35ef377..30721371050 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkAggregatedTaskMetrics.java @@ -157,7 +157,7 @@ public void addTaskMetrics( try { // As of spark 3.5, all SQL metrics are Long, safeguard if it changes in new // versions - hist.accept((Long) acc.value()); + hist.accept(((Number) acc.value()).doubleValue()); externalAccumulableHistograms.put(acc.id(), hist); } catch (ClassCastException ignored) { }