Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.spark.ExceptionFailure;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskFailedReason;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.scheduler.AccumulableInfo;
import org.apache.spark.scheduler.JobFailed;
import org.apache.spark.scheduler.SparkListener;
Expand All @@ -64,6 +65,7 @@
import org.apache.spark.sql.streaming.StateOperatorProgress;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import org.apache.spark.sql.streaming.StreamingQueryProgress;
import org.apache.spark.util.AccumulatorV2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
Expand Down Expand Up @@ -127,8 +129,10 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
private final HashMap<String, SparkListenerExecutorAdded> liveExecutors = new HashMap<>();

// There is no easy way to know if an accumulator is not useful anymore (meaning it is not part of
// an active SQL query)
// so capping the size of the collection storing them
// an active SQL query) so capping the size of the collection storing them
// TODO (CY): Is this potentially the reason why some Spark Plans aren't showing up consistently?
// If we know we don't need the accumulator values, can we drop all associated data and just map
// stage ID -> accumulator ID? Put this behind some FF
private final Map<Long, SparkSQLUtils.AccumulatorWithStage> accumulators =
new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE);

Expand Down Expand Up @@ -229,6 +233,12 @@ public void setupOpenLineage(DDTraceId traceId) {
/** Parent Ids of a Stage. Provide an implementation based on a specific scala version */
protected abstract int[] getStageParentIds(StageInfo info);

/**
* All External Accumulators associated with a given task. Provide an implementation based on a
* specific scala version
*/
protected abstract List<AccumulatorV2> getExternalAccumulators(TaskMetrics metrics);

@Override
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
this.applicationStart = applicationStart;
Expand Down Expand Up @@ -672,7 +682,7 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl

SparkPlanInfo sqlPlan = sqlPlans.get(sqlExecutionId);
if (sqlPlan != null) {
SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageId);
SparkSQLUtils.addSQLPlanToStageSpan(span, sqlPlan, accumulators, stageMetric, stageId);
}

span.finish(completionTimeMs * 1000);
Expand All @@ -686,7 +696,9 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {

SparkAggregatedTaskMetrics stageMetric = stageMetrics.get(stageSpanKey);
if (stageMetric != null) {
stageMetric.addTaskMetrics(taskEnd);
// Not happy that we have to extract external accumulators here, but needed as we're dealing
// with Seq which varies across Scala versions
stageMetric.addTaskMetrics(taskEnd, getExternalAccumulators(taskEnd.taskMetrics()));
}

if (taskEnd.taskMetrics() != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package datadog.trace.instrumentation.spark;

import com.fasterxml.jackson.core.JsonGenerator;
import datadog.metrics.api.Histogram;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import org.apache.spark.TaskFailedReason;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
import org.apache.spark.util.AccumulatorV2;

class SparkAggregatedTaskMetrics {
private static final double HISTOGRAM_RELATIVE_ACCURACY = 1 / 32.0;
private static final int HISTOGRAM_MAX_NUM_BINS = 512;
private static final int MAX_ACCUMULATOR_SIZE = 5000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you benchmarked to see how much memory overhead this could add if there happened to be 5000 accumulators? Do we know what the expected number of accumulators is typically?

Just trying to get a sense of whether this is a reasonable limit - i.e. does it allow for the most common number of accumulators while safeguarding memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(note this is not a blocker to the PR - just interested in where the 5000 limit came from)

Copy link
Contributor Author

@charlesmyu charlesmyu Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, it was good to think through this properly!

Honestly, this was a bit of a conservative guesstimate based on the previous limit of 50k accumulators, to provide at least a limit of some sort to prevent any runaway Spark apps from blowing through all the memory.

Evaluating a bit more critically, we could probably increase this since we discard the SparkAggregatedTaskMetrics object once a stage is completed as opposed to keeping them in memory for the entire duration of the Spark job. Since Spark should really only be working on a single stage at a time, a limit of 5,000 should be fairly conservative from a memory usage standpoint. It also in our benefit that the previous value is based on an object that should be larger than the AccumulatorV2 we're now using.

Some quick napkin math to make sure this is reasonable from a usage standpoint as well - each stage is composed of multiple operations (speaking anecdotally the most I've seen is 10 operations per stage, so an upper bound of 50 operations should be fairly generous), and we see the most metrics per operation in Spark jobs run by Databricks, which has maybe ~50 metrics per stage in the worst case? This would give 2,500 accumulators we have to track, which gives us a decent amount of overhead.

A lot of anecdotal numbers unfortunately, but hopefully this is already an improvement on the previous 50k limit.

private final boolean isSparkTaskHistogramEnabled = Config.get().isSparkTaskHistogramEnabled();

private long executorDeserializeTime = 0L;
Expand Down Expand Up @@ -59,13 +66,17 @@ class SparkAggregatedTaskMetrics {
private Histogram shuffleWriteBytesHistogram;
private Histogram diskBytesSpilledHistogram;

// Used for Spark SQL Plan metrics ONLY, don't put in regular span for now
private Map<Long, Histogram> externalAccumulableHistograms;

public SparkAggregatedTaskMetrics() {}

public SparkAggregatedTaskMetrics(long availableExecutorTime) {
this.previousAvailableExecutorTime = availableExecutorTime;
}

public void addTaskMetrics(SparkListenerTaskEnd taskEnd) {
public void addTaskMetrics(
SparkListenerTaskEnd taskEnd, List<AccumulatorV2> externalAccumulators) {
taskCompletedCount += 1;

if (taskEnd.taskInfo().attemptNumber() > 0) {
Expand Down Expand Up @@ -127,6 +138,31 @@ public void addTaskMetrics(SparkListenerTaskEnd taskEnd) {
shuffleWriteBytesHistogram, taskMetrics.shuffleWriteMetrics().bytesWritten());
diskBytesSpilledHistogram =
lazyHistogramAccept(diskBytesSpilledHistogram, taskMetrics.diskBytesSpilled());

// TODO (CY): Should we also look at TaskInfo accumulable update values as a backup? Is that
// only needed for SHS?
if (externalAccumulators != null && !externalAccumulators.isEmpty()) {
if (externalAccumulableHistograms == null) {
externalAccumulableHistograms = new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE);
}

externalAccumulators.forEach(
acc -> {
Histogram hist = externalAccumulableHistograms.get(acc.id());
if (hist == null) {
hist =
Histogram.newHistogram(HISTOGRAM_RELATIVE_ACCURACY, HISTOGRAM_MAX_NUM_BINS);
}

try {
// As of spark 3.5, all SQL metrics are Long, safeguard if it changes in new
// versions
hist.accept(((Number) acc.value()).doubleValue());
externalAccumulableHistograms.put(acc.id(), hist);
} catch (ClassCastException ignored) {
}
});
}
}
}
}
Expand Down Expand Up @@ -276,6 +312,22 @@ private Histogram lazyHistogramAccept(Histogram hist, double value) {
return hist;
}

// Used to put external accum metrics to JSON for Spark SQL plans
public void externalAccumToJson(JsonGenerator generator, SQLMetricInfo info) throws IOException {
if (externalAccumulableHistograms != null) {
Histogram hist = externalAccumulableHistograms.get(info.accumulatorId());
String name = info.name();

if (name != null && hist != null) {
generator.writeStartObject();
generator.writeStringField(name, histogramToBase64(hist));
generator.writeNumberField("sum", hist.getSum());
generator.writeStringField("type", info.metricType());
generator.writeEndObject();
}
}
}

public static long computeTaskRunTime(TaskMetrics metrics) {
return metrics.executorDeserializeTime()
+ metrics.executorRunTime()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public static void addSQLPlanToStageSpan(
AgentSpan span,
SparkPlanInfo plan,
Map<Long, AccumulatorWithStage> accumulators,
SparkAggregatedTaskMetrics stageMetric,
int stageId) {
Set<Integer> parentStageIds = new HashSet<>();
SparkPlanInfoForStage planForStage =
Expand All @@ -32,7 +33,7 @@ public static void addSQLPlanToStageSpan(
span.setTag("_dd.spark.sql_parent_stage_ids", parentStageIds.toString());

if (planForStage != null) {
String json = planForStage.toJson(accumulators);
String json = planForStage.toJson(stageMetric);
span.setTag("_dd.spark.sql_plan", json);
}
}
Expand Down Expand Up @@ -143,15 +144,15 @@ public SparkPlanInfoForStage(SparkPlanInfo plan, List<SparkPlanInfoForStage> chi
this.children = children;
}

public String toJson(Map<Long, AccumulatorWithStage> accumulators) {
public String toJson(SparkAggregatedTaskMetrics stageMetric) {
// Using the jackson JSON lib used by spark
// https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.5.0
ObjectMapper mapper =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (JsonGenerator generator = mapper.getFactory().createGenerator(baos)) {
this.toJson(generator, accumulators, mapper);
this.toJson(generator, mapper, stageMetric);
} catch (IOException e) {
return null;
}
Expand All @@ -160,7 +161,7 @@ public String toJson(Map<Long, AccumulatorWithStage> accumulators) {
}

private void toJson(
JsonGenerator generator, Map<Long, AccumulatorWithStage> accumulators, ObjectMapper mapper)
JsonGenerator generator, ObjectMapper mapper, SparkAggregatedTaskMetrics stageMetric)
throws IOException {
generator.writeStartObject();
generator.writeStringField("node", plan.nodeName());
Expand Down Expand Up @@ -199,11 +200,7 @@ private void toJson(
generator.writeFieldName("metrics");
generator.writeStartArray();
for (SQLMetricInfo metric : metrics) {
long accumulatorId = metric.accumulatorId();
AccumulatorWithStage acc = accumulators.get(accumulatorId);
if (acc != null) {
acc.toJson(generator, metric);
}
stageMetric.externalAccumToJson(generator, metric);
}
generator.writeEndArray();
}
Expand All @@ -213,7 +210,7 @@ private void toJson(
generator.writeFieldName("children");
generator.writeStartArray();
for (SparkPlanInfoForStage child : children) {
child.toJson(generator, accumulators, mapper);
child.toJson(generator, mapper, stageMetric);
}
generator.writeEndArray();
}
Expand Down
Loading