Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 45 additions & 48 deletions packages/core/src/tracing/spans/spanBuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ const MAX_SPANS_PER_ENVELOPE = 1000;

const MAX_TRACE_WEIGHT_IN_BYTES = 5_000_000;

interface TraceBucket {
spans: Set<SerializedStreamedSpanWithSegmentSpan>;
size: number;
timeout: ReturnType<typeof setTimeout>;
}

export interface SpanBufferOptions {
/**
* Max spans per trace before auto-flush
Expand All @@ -26,7 +32,8 @@ export interface SpanBufferOptions {
maxSpanLimit?: number;

/**
* Flush interval in ms
* Per-trace flush timeout in ms. A timeout is started when a trace bucket is first created
* and fires flush() for that specific trace when it expires.
* Must be greater than 0.
*
* @default 5_000
Expand All @@ -44,7 +51,7 @@ export interface SpanBufferOptions {

/**
* A buffer for serialized streamed span JSON objects that flushes them to Sentry in Span v2 envelopes.
* Handles interval-based flushing, size thresholds, and graceful shutdown.
* Handles per-trace timeout-based flushing, size thresholds, and graceful shutdown.
* Also handles computation of the Dynamic Sampling Context (DSC) for the trace, if it wasn't yet
* frozen onto the segment span.
*
Expand All @@ -54,19 +61,16 @@ export interface SpanBufferOptions {
* still active and modifyable when child spans are added to the buffer.
*/
export class SpanBuffer {
/* Bucket spans by their trace id */
private _traceMap: Map<string, Set<SerializedStreamedSpanWithSegmentSpan>>;
private _traceWeightMap: Map<string, number>;
/* Bucket spans by their trace id, along with accumulated size and a per-trace flush timeout */
private _traceBuckets: Map<string, TraceBucket>;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to take this PR as an opportunity to refactor the data structure of the buffer from multiple maps to one, holding all information as an object in one map


private _flushIntervalId: ReturnType<typeof setInterval> | null;
private _client: Client;
private _maxSpanLimit: number;
private _flushInterval: number;
private _maxTraceWeight: number;

public constructor(client: Client, options?: SpanBufferOptions) {
this._traceMap = new Map();
this._traceWeightMap = new Map();
this._traceBuckets = new Map();
this._client = client;

const { maxSpanLimit, flushInterval, maxTraceWeightInBytes } = options ?? {};
Expand All @@ -79,21 +83,17 @@ export class SpanBuffer {
this._maxTraceWeight =
maxTraceWeightInBytes && maxTraceWeightInBytes > 0 ? maxTraceWeightInBytes : MAX_TRACE_WEIGHT_IN_BYTES;

this._flushIntervalId = null;
this._debounceFlushInterval();

this._client.on('flush', () => {
this.drain();
});

this._client.on('close', () => {
// No need to drain the buffer here as `Client.close()` internally already calls `Client.flush()`
// which already invokes the `flush` hook and thus drains the buffer.
if (this._flushIntervalId) {
clearInterval(this._flushIntervalId);
}
this._traceMap.clear();
this._traceWeightMap.clear();
this._traceBuckets.forEach(bucket => {
clearTimeout(bucket.timeout);
});
this._traceBuckets.clear();
});
}

Expand All @@ -102,57 +102,62 @@ export class SpanBuffer {
*/
public add(spanJSON: SerializedStreamedSpanWithSegmentSpan): void {
const traceId = spanJSON.trace_id;
let traceBucket = this._traceMap.get(traceId);
if (traceBucket) {
traceBucket.add(spanJSON);
} else {
traceBucket = new Set([spanJSON]);
this._traceMap.set(traceId, traceBucket);
let bucket = this._traceBuckets.get(traceId);

if (!bucket) {
bucket = {
spans: new Set(),
size: 0,
timeout: safeUnref(
setTimeout(() => {
this.flush(traceId);
}, this._flushInterval),
),
};
this._traceBuckets.set(traceId, bucket);
}

const newWeight = (this._traceWeightMap.get(traceId) ?? 0) + estimateSerializedSpanSizeInBytes(spanJSON);
this._traceWeightMap.set(traceId, newWeight);
bucket.spans.add(spanJSON);
bucket.size += estimateSerializedSpanSizeInBytes(spanJSON);

if (traceBucket.size >= this._maxSpanLimit || newWeight >= this._maxTraceWeight) {
if (bucket.spans.size >= this._maxSpanLimit || bucket.size >= this._maxTraceWeight) {
this.flush(traceId);
this._debounceFlushInterval();
}
}

/**
* Drain and flush all buffered traces.
*/
public drain(): void {
if (!this._traceMap.size) {
if (!this._traceBuckets.size) {
return;
}

DEBUG_BUILD && debug.log(`Flushing span tree map with ${this._traceMap.size} traces`);
DEBUG_BUILD && debug.log(`Flushing span tree map with ${this._traceBuckets.size} traces`);

this._traceMap.forEach((_, traceId) => {
this._traceBuckets.forEach((_, traceId) => {
this.flush(traceId);
});
this._debounceFlushInterval();
}

/**
* Flush spans of a specific trace.
* In contrast to {@link SpanBuffer.flush}, this method does not flush all traces, but only the one with the given traceId.
* In contrast to {@link SpanBuffer.drain}, this method does not flush all traces, but only the one with the given traceId.
*/
public flush(traceId: string): void {
const traceBucket = this._traceMap.get(traceId);
if (!traceBucket) {
const bucket = this._traceBuckets.get(traceId);
if (!bucket) {
return;
}

if (!traceBucket.size) {
// we should never get here, given we always add a span when we create a new bucket
if (!bucket.spans.size) {
// we should never get here, given we always add a span when we create a new bucket
// and delete the bucket once we flush out the trace
this._removeTrace(traceId);
return;
}

const spans = Array.from(traceBucket);
const spans = Array.from(bucket.spans);

const segmentSpan = spans[0]?._segmentSpan;
if (!segmentSpan) {
Expand Down Expand Up @@ -181,18 +186,10 @@ export class SpanBuffer {
}

private _removeTrace(traceId: string): void {
this._traceMap.delete(traceId);
this._traceWeightMap.delete(traceId);
}

private _debounceFlushInterval(): void {
if (this._flushIntervalId) {
clearInterval(this._flushIntervalId);
const bucket = this._traceBuckets.get(traceId);
if (bucket) {
clearTimeout(bucket.timeout);
}
this._flushIntervalId = safeUnref(
setInterval(() => {
this.drain();
}, this._flushInterval),
);
this._traceBuckets.delete(traceId);
}
}
4 changes: 2 additions & 2 deletions packages/core/test/lib/tracing/spans/spanBuffer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ describe('SpanBuffer', () => {
expect(sentEnvelopes[1]?.[1]?.[0]?.[1]?.items[0]?.trace_id).toBe('trace456');
});

it('drains on interval', () => {
it('flushes trace after per-trace timeout', () => {
const buffer = new SpanBuffer(client, { flushInterval: 1000 });

const segmentSpan1 = new SentrySpan({ name: 'segment', sampled: true });
Expand Down Expand Up @@ -106,7 +106,7 @@ describe('SpanBuffer', () => {

expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1);

// since the buffer is now empty, it should not send anything anymore
// the trace bucket was removed after flushing, so no timeout remains and no further sends occur
vi.advanceTimersByTime(1000);
expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1);
});
Expand Down
Loading