Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 81 additions & 90 deletions src/Aspire.Dashboard/Api/TelemetryApiService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Threading.Channels;
using Aspire.Dashboard.Model;
using Aspire.Dashboard.Model.Assistant;
using Aspire.Dashboard.Model.Otlp;
Expand Down Expand Up @@ -45,22 +46,18 @@ internal sealed class TelemetryApiService(
var spanFilters = new List<TelemetryFilter>();
var searchTextFragments = ParseAndApplySearchFilters(search, spanFilters, AddSpanFiltersFromQualifiers, key => ResolveSpanFieldKey(key) is not null);

// Get spans for all resource keys
var allSpans = new List<OtlpSpan>();
foreach (var resourceKey in resourceKeys)
// Get spans for all resource keys (empty list means no filter / all resources)
var result = telemetryRepository.GetSpans(new GetSpansRequest
{
var result = telemetryRepository.GetSpans(new GetSpansRequest
{
ResourceKey = resourceKey,
StartIndex = 0,
Count = int.MaxValue,
Filters = spanFilters,
TraceId = traceId,
HasError = hasError,
TextFragments = searchTextFragments
});
allSpans.AddRange(result.PagedResult.Items);
}
ResourceKeys = resourceKeys,
StartIndex = 0,
Count = int.MaxValue,
Filters = spanFilters,
TraceId = traceId,
HasError = hasError,
TextFragments = searchTextFragments
});
var allSpans = result.PagedResult.Items;

var totalCount = allSpans.Count;

Expand Down Expand Up @@ -102,20 +99,16 @@ internal sealed class TelemetryApiService(
var traceFilters = new List<TelemetryFilter>();
var searchTextFragments = ParseAndApplySearchFilters(search, traceFilters, AddSpanFiltersFromQualifiers, key => ResolveSpanFieldKey(key) is not null);

// Get traces for all resource keys
var allTraces = new List<OtlpTrace>();
foreach (var resourceKey in resourceKeys)
// Get traces for all resource keys (empty list means no filter / all resources)
var result = telemetryRepository.GetTraces(new GetTracesRequest
{
var result = telemetryRepository.GetTraces(new GetTracesRequest
{
ResourceKey = resourceKey,
StartIndex = 0,
Count = int.MaxValue,
Filters = traceFilters,
TextFragments = searchTextFragments
});
allTraces.AddRange(result.PagedResult.Items);
}
ResourceKeys = resourceKeys,
StartIndex = 0,
Count = int.MaxValue,
Filters = traceFilters,
TextFragments = searchTextFragments
});
var allTraces = result.PagedResult.Items;

var traces = allTraces;

Expand Down Expand Up @@ -220,22 +213,17 @@ internal sealed class TelemetryApiService(

var searchTextFragments = ParseAndApplySearchFilters(search, filters, AddLogFiltersFromQualifiers, key => ResolveLogFieldKey(key) is not null);

// Get logs for all resource keys
var allLogs = new List<OtlpLogEntry>();
foreach (var resourceKey in resourceKeys)
// Get logs for all resource keys (empty list means no filter / all resources)
var result = telemetryRepository.GetLogs(new GetLogsContext
{
var result = telemetryRepository.GetLogs(new GetLogsContext
{
ResourceKey = resourceKey,
StartIndex = 0,
Count = int.MaxValue,
Filters = filters,
TextFragments = searchTextFragments
});
allLogs.AddRange(result.Items);
}
ResourceKeys = resourceKeys,
StartIndex = 0,
Count = int.MaxValue,
Filters = filters,
TextFragments = searchTextFragments
});

var logs = allLogs;
var logs = result.Items;

var totalCount = logs.Count;

Expand Down Expand Up @@ -266,18 +254,9 @@ public async IAsyncEnumerable<string> FollowSpansAsync(
string? search,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// Resolve resource keys
var resources = telemetryRepository.GetResources();
var resourceKeys = ResolveResourceKeys(resources, resourceNames);

// For streaming, if resources were specified but can't be resolved, filter everything out
var hasResourceFilter = resourceNames is { Length: > 0 };
var invalidResourceFilter = hasResourceFilter && resourceKeys is null;

if (invalidResourceFilter)
{
yield break;
}
// Resolve resource keys, waiting for the resource to appear if it doesn't exist yet.
// Throws OperationCanceledException if the client disconnects before the resource appears.
var resourceKeys = await WaitForResourceKeysAsync(resourceNames, cancellationToken).ConfigureAwait(false);

// Convert structured search qualifiers into TelemetryFilter objects for per-span filtering
List<TelemetryFilter> spanFilters = [];
Expand All @@ -286,7 +265,7 @@ public async IAsyncEnumerable<string> FollowSpansAsync(
// Build the watch request with all filters pushed into the repository
var watchRequest = new WatchSpansRequest
{
ResourceKey = resourceKeys is { Count: 1 } ? resourceKeys[0] : null,
ResourceKeys = resourceKeys,
Filters = spanFilters,
TraceId = traceId,
HasError = hasError,
Expand All @@ -296,14 +275,6 @@ public async IAsyncEnumerable<string> FollowSpansAsync(
// Watch spans with filtering done inside the repository
await foreach (var span in telemetryRepository.WatchSpansAsync(watchRequest, cancellationToken).ConfigureAwait(false))
{
// Multi-resource filtering: repository only supports single ResourceKey,
// so for multi-resource queries we filter here.
if (resourceKeys is { Count: > 1 } &&
!resourceKeys.Any(k => k?.EqualsCompositeName(span.Source.ResourceKey.GetCompositeName()) == true))
{
continue;
}

// Use compact JSON for NDJSON streaming (no indentation)
yield return TelemetryExportService.ConvertSpanToJson(span, _outgoingPeerResolvers, logs: null, indent: false);
}
Expand All @@ -320,18 +291,9 @@ public async IAsyncEnumerable<string> FollowLogsAsync(
string? search,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
// Resolve resource keys
var resources = telemetryRepository.GetResources();
var resourceKeys = ResolveResourceKeys(resources, resourceNames);

// For streaming, if resources were specified but can't be resolved, filter everything out
var hasResourceFilter = resourceNames is { Length: > 0 };
var invalidResourceFilter = hasResourceFilter && resourceKeys is null;

if (invalidResourceFilter)
{
yield break;
}
// Resolve resource keys, waiting for the resource to appear if it doesn't exist yet.
// Throws OperationCanceledException if the client disconnects before the resource appears.
var resourceKeys = await WaitForResourceKeysAsync(resourceNames, cancellationToken).ConfigureAwait(false);

// Build filters
var filters = new List<TelemetryFilter>();
Expand Down Expand Up @@ -365,22 +327,14 @@ public async IAsyncEnumerable<string> FollowLogsAsync(
// Build the watch request with all filters pushed into the repository
var watchRequest = new WatchLogsRequest
{
ResourceKey = resourceKeys is { Count: 1 } ? resourceKeys[0] : null,
ResourceKeys = resourceKeys,
Filters = filters,
TextFragments = searchTextFragments
};

// Watch logs with filtering done inside the repository
await foreach (var log in telemetryRepository.WatchLogsAsync(watchRequest, cancellationToken).ConfigureAwait(false))
{
// Multi-resource filtering: repository only supports single ResourceKey,
// so for multi-resource queries we filter here.
if (resourceKeys is { Count: > 1 } &&
!resourceKeys.Any(k => k?.EqualsCompositeName(log.ResourceView.ResourceKey.GetCompositeName()) == true))
{
continue;
}

var otlpData = TelemetryExportService.ConvertLogsToOtlpJson([log]);
yield return JsonSerializer.Serialize(otlpData, OtlpJsonSerializerContext.DefaultOptions);
}
Expand Down Expand Up @@ -562,27 +516,64 @@ private static void AddSpanFiltersFromQualifiers(SearchFilter parsedSearch, List
_ => FilterCondition.Contains
};

/// <summary>
/// Resolves resource names to ResourceKeys, waiting for the resources to appear if they
/// don't exist yet. This enables streaming subscriptions started before telemetry arrives
/// to pick up data once the resource is first seen.
/// Throws OperationCanceledException if cancellation is triggered before the resources appear.
/// </summary>
private async Task<List<ResourceKey>> WaitForResourceKeysAsync(string[]? resourceNames, CancellationToken cancellationToken)
{
if (resourceNames is null || resourceNames.Length == 0)
{
// No filter - return immediately without allocating a channel or subscription.
return [];
}

// Subscribe before the first check so no notification can be missed between
// GetResources() and the subscription registration.
var signal = Channel.CreateBounded<bool>(new BoundedChannelOptions(1) { FullMode = BoundedChannelFullMode.DropOldest });
using var subscription = telemetryRepository.OnNewResources(() =>
{
signal.Writer.TryWrite(true);
return Task.CompletedTask;
});

while (true)
{
var resources = telemetryRepository.GetResources();
if (ResolveResourceKeys(resources, resourceNames) is { } result)
{
return result;
}

await signal.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
}
}

/// <summary>
/// Resolves resource names to ResourceKeys.
/// Returns null if any specified resource is not found.
/// If no resources are specified, returns a list with a single null key (no filter).
/// Returns an empty list when no resource filter is specified (meaning all resources).
/// </summary>
private static List<ResourceKey?>? ResolveResourceKeys(IReadOnlyList<OtlpResource> resources, string[]? resourceNames)
private static List<ResourceKey>? ResolveResourceKeys(IReadOnlyList<OtlpResource> resources, string[]? resourceNames)
{
if (resourceNames is null || resourceNames.Length == 0)
{
// No filter - return a list with null to indicate "all resources"
return [null];
return [];
}

var keys = new List<ResourceKey?>();
var keys = new List<ResourceKey>();
foreach (var resourceName in resourceNames)
{
if (!AIHelpers.TryResolveResourceForTelemetry(resources, resourceName, out _, out var resourceKey))
{
return null;
}
keys.Add(resourceKey);
if (resourceKey is { } key)
{
keys.Add(key);
}
}
return keys;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public async Task<string> GetStructuredLogsAsync(
// If support is added for ordering logs by timestamp then improve this.
var logs = TelemetryRepository.GetLogs(new GetLogsContext
{
ResourceKey = resourceKey,
ResourceKeys = resourceKey is { } rk ? [rk] : [],
StartIndex = 0,
Count = int.MaxValue,
Filters = []
Expand Down Expand Up @@ -170,7 +170,7 @@ public async Task<string> GetTracesAsync(

var traces = TelemetryRepository.GetTraces(new GetTracesRequest
{
ResourceKey = resourceKey,
ResourceKeys = resourceKey is { } rk ? [rk] : [],
StartIndex = 0,
Count = int.MaxValue,
Filters = []
Expand Down Expand Up @@ -207,7 +207,7 @@ public async Task<string> GetTraceStructuredLogsAsync(

var logs = TelemetryRepository.GetLogs(new GetLogsContext
{
ResourceKey = null,
ResourceKeys = [],
Count = int.MaxValue,
StartIndex = 0,
Filters = [traceIdFilter]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ private static List<OtlpLogEntry> GetSpanLogEntries(TelemetryRepository telemetr
{
var logsContext = new GetLogsContext
{
ResourceKey = null,
ResourceKeys = [],
Count = int.MaxValue,
StartIndex = 0,
Filters = [
Expand Down
4 changes: 2 additions & 2 deletions src/Aspire.Dashboard/Model/StructuredLogsViewModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public PagedResult<OtlpLogEntry> GetLogs()

logs = _telemetryRepository.GetLogs(new GetLogsContext
{
ResourceKey = ResourceKey,
ResourceKeys = ResourceKey is { } key ? [key] : [],
StartIndex = StartIndex,
Count = Count,
Filters = filters
Expand Down Expand Up @@ -154,7 +154,7 @@ public PagedResult<OtlpLogEntry> GetErrorLogs(int count)

var errorLogs = _telemetryRepository.GetLogs(new GetLogsContext
{
ResourceKey = ResourceKey,
ResourceKeys = ResourceKey is { } key ? [key] : [],
StartIndex = 0,
Count = count,
Filters = filters
Expand Down
4 changes: 2 additions & 2 deletions src/Aspire.Dashboard/Model/TracesViewModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public PagedResult<OtlpTrace> GetTraces()

var result = _telemetryRepository.GetTraces(new GetTracesRequest
{
ResourceKey = ResourceKey,
ResourceKeys = ResourceKey is { } key ? [key] : [],
StartIndex = StartIndex,
Count = Count,
Filters = filters,
Expand Down Expand Up @@ -116,7 +116,7 @@ public PagedResult<OtlpTrace> GetErrorTraces(int count)

var errorTraces = _telemetryRepository.GetTraces(new GetTracesRequest
{
ResourceKey = ResourceKey,
ResourceKeys = ResourceKey is { } key ? [key] : [],
StartIndex = 0,
Count = count,
Filters = filters,
Expand Down
4 changes: 2 additions & 2 deletions src/Aspire.Dashboard/Otlp/Storage/GetLogsContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ namespace Aspire.Dashboard.Otlp.Storage;

public sealed class GetLogsContext
{
public required ResourceKey? ResourceKey { get; init; }
public required IReadOnlyList<ResourceKey> ResourceKeys { get; init; }
public required int StartIndex { get; init; }
public required int Count { get; init; }
public required List<TelemetryFilter> Filters { get; init; }
public string[]? TextFragments { get; init; }

public static GetLogsContext ForResourceKey(ResourceKey resourceKey) => new()
{
ResourceKey = resourceKey,
ResourceKeys = [resourceKey],
StartIndex = 0,
Count = int.MaxValue,
Filters = []
Expand Down
2 changes: 1 addition & 1 deletion src/Aspire.Dashboard/Otlp/Storage/GetSpansRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Aspire.Dashboard.Otlp.Storage;

public sealed class GetSpansRequest
{
public required ResourceKey? ResourceKey { get; init; }
public required IReadOnlyList<ResourceKey> ResourceKeys { get; init; }
public required int StartIndex { get; init; }
public required int Count { get; init; }
public required List<TelemetryFilter> Filters { get; init; }
Expand Down
4 changes: 2 additions & 2 deletions src/Aspire.Dashboard/Otlp/Storage/GetTracesRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Aspire.Dashboard.Otlp.Storage;

public sealed class GetTracesRequest
{
public required ResourceKey? ResourceKey { get; init; }
public required IReadOnlyList<ResourceKey> ResourceKeys { get; init; }
public required int StartIndex { get; init; }
public required int Count { get; init; }
public required List<TelemetryFilter> Filters { get; init; }
Expand All @@ -16,7 +16,7 @@ public sealed class GetTracesRequest

public static GetTracesRequest ForResourceKey(ResourceKey resourceKey) => new()
{
ResourceKey = resourceKey,
ResourceKeys = [resourceKey],
StartIndex = 0,
Count = int.MaxValue,
Filters = []
Expand Down
Loading
Loading