events = requestContext.billing().collectedEvents();
+ if (events.isEmpty()) {
+ return;
+ }
+ try {
+ responseContext
+ .getHeaders()
+ .add(BILLING_EVENTS_HEADER, OBJECT_WRITER.writeValueAsString(events));
+ } catch (JsonProcessingException e) {
+ LOGGER.error("Failed to serialize {} billing events to response header", events.size(), e);
+ }
+ }
+}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/provider/DefaultBilling.java b/src/main/java/io/stargate/sgv2/jsonapi/service/provider/DefaultBilling.java
index 9ab810bd4a..7e49528055 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/provider/DefaultBilling.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/provider/DefaultBilling.java
@@ -10,6 +10,7 @@
import io.stargate.sgv2.jsonapi.config.feature.ApiFeature;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -18,14 +19,21 @@
import org.slf4j.LoggerFactory;
/**
- * {@link Billing} implementation that emits structured JSON log lines on the {@code billing.events}
- * logger for downstream billing pipelines.
+ * {@link Billing} implementation that dispatches each built event to one or both sinks selected at
+ * construction time:
*
- * Construction is driven by {@link
- * io.stargate.sgv2.jsonapi.api.request.RequestContext#billing()}: the request context decides
- * between this implementation and {@link Billing#NO_OP} based on whether {@link
- * ApiFeature#BILLING_EVENTS_LOGGING} is enabled for the request. This class therefore assumes
- * billing is enabled and unconditionally emits — it does not re-check the feature flag.
+ *
+ * - structured JSON log lines on the {@code billing.events} logger for downstream billing
+ * pipelines — when {@code loggingEnabled} ({@link ApiFeature#BILLING_EVENTS_LOGGING}).
+ *
- an in-memory buffer surfaced via {@link #collectedEvents()} and returned on the {@code
+ * Billing-Events} HTTP response header — when {@code responseEnabled} ({@link
+ * ApiFeature#BILLING_EVENTS_RESPONSE}).
+ *
+ *
+ * Construction is driven by {@link Billing#create} (via {@link
+ * io.stargate.sgv2.jsonapi.api.request.RequestContext#billing()}), which only picks this
+ * implementation when at least one of the two flags is enabled and tells it which sinks to feed.
+ * This class therefore does not re-check the feature flags; if a flag is off, its sink is skipped.
*
*
For each {@link ModelUsage}, up to three events are emitted, one per billable metric ({@link
* BillingEventType.Metric#TOTAL_TOKENS TOTAL_TOKENS}, {@link BillingEventType.Metric#EGRESS_BYTES
@@ -50,18 +58,35 @@ public class DefaultBilling implements Billing {
private final Set internalModelProviders;
private final Set enabledEventTypes;
- public DefaultBilling(BillingConfig config) {
+ /** Whether to emit events on the {@code billing.events} logger. */
+ private final boolean loggingEnabled;
+
+ /** Whether to buffer events in {@link #collectedEvents} for the response header. */
+ private final boolean responseEnabled;
+
+ // Events buffered for the BILLING_EVENTS_RESPONSE sink. Populated only when responseEnabled.
+ // emitEvent can be invoked from concurrent tasks within one request (async embedding / reranking
+ // calls), so the list is synchronized.
+ private final List collectedEvents =
+ Collections.synchronizedList(new ArrayList<>());
+
+ public DefaultBilling(BillingConfig config, boolean loggingEnabled, boolean responseEnabled) {
Objects.requireNonNull(config, "config must not be null");
this.product = requireNonBlank(config.product(), "billing.product");
this.resourceType = requireNonBlank(config.resourceType(), "billing.resource_type");
this.internalModelProviders = Set.copyOf(config.internalModelProviders());
this.enabledEventTypes =
config.enabledEventTypes().map(Set::copyOf).orElse(BillingEventType.ALL);
+ this.loggingEnabled = loggingEnabled;
+ this.responseEnabled = responseEnabled;
}
/**
- * Emits billing events for the given aggregated model usage. The {@code billing.events} logger
- * level is checked first so we skip event construction when the logger is silenced at runtime.
+ * Builds billing events for the given aggregated model usage and dispatches them to whichever
+ * sinks are enabled: the {@code billing.events} logger ({@code loggingEnabled}) and/or the
+ * in-memory buffer read via {@link #collectedEvents()} ({@code responseEnabled}). The {@code
+ * billing.events} logger level is also checked so we skip the log sink when the logger is
+ * silenced at runtime; if no sink is active, event construction is skipped entirely.
*
* @param modelUsage usage data for the model call; must not be null. Callers are expected to
* ensure they have usage data before invoking.
@@ -69,16 +94,36 @@ public DefaultBilling(BillingConfig config) {
@Override
public void emitEvent(ModelUsage modelUsage) {
Objects.requireNonNull(modelUsage, "modelUsage must not be null");
- if (!BILLING_LOGGER.isInfoEnabled()) {
+ boolean shouldLog = loggingEnabled && BILLING_LOGGER.isInfoEnabled();
+ if (!shouldLog && !responseEnabled) {
return;
}
- for (var event : buildEvents(modelUsage)) {
- try {
- BILLING_LOGGER.info(OBJECT_WRITER.writeValueAsString(event));
- } catch (JacksonException e) {
- LOGGER.error("Failed to serialize billing event of type {}", event.eventType(), e);
+ var events = buildEvents(modelUsage);
+ if (shouldLog) {
+ for (var event : events) {
+ try {
+ BILLING_LOGGER.info(OBJECT_WRITER.writeValueAsString(event));
+ } catch (JacksonException e) {
+ LOGGER.error("Failed to serialize billing event of type {}", event.eventType(), e);
+ }
}
}
+ if (responseEnabled) {
+ collectedEvents.addAll(events);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Returns an unmodifiable copy of the events buffered so far for the {@code
+ * BILLING_EVENTS_RESPONSE} sink; empty when {@code responseEnabled} is false.
+ */
+ @Override
+ public List collectedEvents() {
+ synchronized (collectedEvents) {
+ return List.copyOf(collectedEvents);
+ }
}
/**
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/provider/BillingResponseFilterTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/provider/BillingResponseFilterTest.java
new file mode 100644
index 0000000000..3a6851ae9b
--- /dev/null
+++ b/src/test/java/io/stargate/sgv2/jsonapi/service/provider/BillingResponseFilterTest.java
@@ -0,0 +1,124 @@
+package io.stargate.sgv2.jsonapi.service.provider;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.stargate.sgv2.jsonapi.TestConstants;
+import io.stargate.sgv2.jsonapi.api.request.RequestContext;
+import io.stargate.sgv2.jsonapi.config.BillingConfig;
+import io.stargate.sgv2.jsonapi.config.feature.ApiFeature;
+import io.stargate.sgv2.jsonapi.config.feature.ApiFeatures;
+import io.stargate.sgv2.jsonapi.config.feature.FeaturesConfig;
+import jakarta.ws.rs.container.ContainerResponseContext;
+import jakarta.ws.rs.core.MultivaluedHashMap;
+import jakarta.ws.rs.core.MultivaluedMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.junit.jupiter.api.Test;
+
+class BillingResponseFilterTest {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final TestConstants TEST_CONSTANTS = new TestConstants();
+
+ private record BillingAndFeatures(Billing billing, ApiFeatures apiFeatures) {}
+
+ private static BillingAndFeatures newBillingWith(boolean logging, boolean response) {
+ BillingConfig config = mock(BillingConfig.class);
+ when(config.product()).thenReturn("serverless");
+ when(config.resourceType()).thenReturn("serverless_database");
+ when(config.internalModelProviders()).thenReturn(List.of("nvidia"));
+ when(config.enabledEventTypes()).thenReturn(Optional.empty());
+
+ FeaturesConfig featuresConfig = mock(FeaturesConfig.class);
+ Map flags = new HashMap<>();
+ flags.put(ApiFeature.BILLING_EVENTS_LOGGING, String.valueOf(logging));
+ flags.put(ApiFeature.BILLING_EVENTS_RESPONSE, String.valueOf(response));
+ when(featuresConfig.flags()).thenReturn(flags);
+
+ ApiFeatures apiFeatures = ApiFeatures.fromConfigAndRequest(featuresConfig, null);
+ // Billing.create picks DefaultBilling when either flag is on (NO_OP only when both off) — the
+ // same dispatch the filter relies on in production.
+ return new BillingAndFeatures(Billing.create(config, apiFeatures), apiFeatures);
+ }
+
+ private static ModelUsage usage() {
+ return new ModelUsage(
+ ModelProvider.NVIDIA,
+ ModelType.EMBEDDING,
+ "test-model",
+ TEST_CONSTANTS.TENANT,
+ ModelInputType.INDEX,
+ 10,
+ 20,
+ 100,
+ 200,
+ 1000L);
+ }
+
+ private static BillingResponseFilter filterFor(Billing billing, ApiFeatures apiFeatures) {
+ RequestContext rc = mock(RequestContext.class);
+ when(rc.billing()).thenReturn(billing);
+ when(rc.apiFeatures()).thenReturn(apiFeatures);
+ return new BillingResponseFilter(rc);
+ }
+
+ private static ContainerResponseContext responseContextWithHeaders(
+ MultivaluedMap headers) {
+ ContainerResponseContext response = mock(ContainerResponseContext.class);
+ when(response.getHeaders()).thenReturn(headers);
+ return response;
+ }
+
+ @Test
+ void addsHeaderWhenFeatureOnAndEventsPresent() throws Exception {
+ BillingAndFeatures bf = newBillingWith(false, true);
+ bf.billing().emitEvent(usage());
+ BillingResponseFilter filter = filterFor(bf.billing(), bf.apiFeatures());
+
+ MultivaluedMap headers = new MultivaluedHashMap<>();
+ filter.addBillingHeader(responseContextWithHeaders(headers));
+
+ Object headerValue = headers.getFirst(BillingResponseFilter.BILLING_EVENTS_HEADER);
+ assertThat(headerValue).isNotNull();
+ JsonNode parsed = MAPPER.readTree(headerValue.toString());
+ assertThat(parsed.isArray()).isTrue();
+ assertThat(parsed.size()).isEqualTo(3);
+ assertThat(parsed.get(0).get("event_type").asText()).isEqualTo("internal_model_total_tokens");
+ }
+
+ @Test
+ void skipsHeaderWhenFeatureOff() {
+ // RESPONSE off — header must not be added even if LOGGING was on for this request.
+ BillingAndFeatures bf = newBillingWith(true, false);
+ bf.billing().emitEvent(usage());
+ BillingResponseFilter filter = filterFor(bf.billing(), bf.apiFeatures());
+
+ MultivaluedMap headers = new MultivaluedHashMap<>();
+ ContainerResponseContext response = responseContextWithHeaders(headers);
+ filter.addBillingHeader(response);
+
+ assertThat(headers.containsKey(BillingResponseFilter.BILLING_EVENTS_HEADER)).isFalse();
+ // We should never touch the headers either (early return saves the work).
+ verify(response, never()).getHeaders();
+ }
+
+ @Test
+ void skipsHeaderWhenNoEventsCollected() {
+ // RESPONSE on, but no emitEvent calls — header skipped because buffer is empty.
+ BillingAndFeatures bf = newBillingWith(false, true);
+ BillingResponseFilter filter = filterFor(bf.billing(), bf.apiFeatures());
+
+ MultivaluedMap headers = new MultivaluedHashMap<>();
+ filter.addBillingHeader(responseContextWithHeaders(headers));
+
+ assertThat(headers.containsKey(BillingResponseFilter.BILLING_EVENTS_HEADER)).isFalse();
+ }
+}
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/provider/BillingTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/provider/BillingTest.java
index 399155d241..53d879337e 100644
--- a/src/test/java/io/stargate/sgv2/jsonapi/service/provider/BillingTest.java
+++ b/src/test/java/io/stargate/sgv2/jsonapi/service/provider/BillingTest.java
@@ -13,6 +13,7 @@
import io.stargate.sgv2.jsonapi.config.feature.ApiFeatures;
import io.stargate.sgv2.jsonapi.config.feature.FeaturesConfig;
import io.vertx.core.MultiMap;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -65,6 +66,30 @@ void createReturnsNoOpWhenFeatureDisabled() {
assertThat(billing).isSameAs(Billing.NO_OP);
}
+ @Test
+ void createReturnsDefaultBillingWhenOnlyResponseEnabled() {
+ // RESPONSE on / LOGGING off must still produce a real Billing — events are needed for the
+ // response header even when nothing is logged.
+ var billing = Billing.create(validConfig(), features(false, true));
+
+ assertThat(billing).isInstanceOf(DefaultBilling.class);
+ }
+
+ @Test
+ void createReturnsDefaultBillingWhenBothEnabled() {
+ var billing = Billing.create(validConfig(), features(true, true));
+
+ assertThat(billing).isInstanceOf(DefaultBilling.class);
+ }
+
+ @Test
+ void createReturnsNoOpWhenBothDisabled() {
+ // Config isn't consulted when both flags are off — pass null to assert that explicitly.
+ var billing = Billing.create(null, features(false, false));
+
+ assertThat(billing).isSameAs(Billing.NO_OP);
+ }
+
// ============================================================
// Feature-flag precedence at dispatch time
// ============================================================
@@ -131,6 +156,16 @@ private static ApiFeatures featuresWithBilling(boolean enabled) {
return ApiFeatures.fromConfigAndRequest(config, null);
}
+ /** Resolves an {@link ApiFeatures} with both billing flags set explicitly. */
+ private static ApiFeatures features(boolean logging, boolean response) {
+ var config = mock(FeaturesConfig.class);
+ Map flags = new HashMap<>();
+ flags.put(ApiFeature.BILLING_EVENTS_LOGGING, String.valueOf(logging));
+ flags.put(ApiFeature.BILLING_EVENTS_RESPONSE, String.valueOf(response));
+ when(config.flags()).thenReturn(flags);
+ return ApiFeatures.fromConfigAndRequest(config, null);
+ }
+
private ModelUsage stubUsage() {
return new ModelUsage(
ModelProvider.NVIDIA,
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/provider/DefaultBillingTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/provider/DefaultBillingTest.java
index 7657c48282..77ede800c2 100644
--- a/src/test/java/io/stargate/sgv2/jsonapi/service/provider/DefaultBillingTest.java
+++ b/src/test/java/io/stargate/sgv2/jsonapi/service/provider/DefaultBillingTest.java
@@ -221,6 +221,68 @@ public void close() {}
}
}
+ // ============================================================
+ // collectedEvents (BILLING_EVENTS_RESPONSE buffer)
+ // ============================================================
+
+ @Test
+ void buffersEventsWhenResponseEnabled() {
+ // LOGGING off, RESPONSE on — events still build and land in the buffer.
+ var billing = newBilling(false, true);
+ billing.emitEvent(usage(ModelProvider.NVIDIA, ModelType.EMBEDDING));
+
+ assertThat(billing.collectedEvents())
+ .extracting(BillingEvent::eventType)
+ .containsExactly(
+ BillingEventType.INTERNAL_MODEL_TOTAL_TOKENS,
+ BillingEventType.INTERNAL_MODEL_EGRESS_BYTES,
+ BillingEventType.INTERNAL_MODEL_INGRESS_BYTES);
+ }
+
+ @Test
+ void doesNotBufferWhenOnlyLoggingEnabled() {
+ // LOGGING on, RESPONSE off — buffer stays empty so no memory is held when the response
+ // feature is off.
+ var billing = newBilling(true, false);
+ billing.emitEvent(usage(ModelProvider.NVIDIA, ModelType.EMBEDDING));
+
+ assertThat(billing.collectedEvents()).isEmpty();
+ }
+
+ @Test
+ void buffersAcrossMultipleCalls() {
+ var billing = newBilling(false, true);
+ billing.emitEvent(usage(ModelProvider.NVIDIA, ModelType.EMBEDDING));
+ billing.emitEvent(usage(ModelProvider.OPENAI, ModelType.EMBEDDING));
+
+ // 3 events per emitEvent call × 2 calls = 6 events total.
+ assertThat(billing.collectedEvents()).hasSize(6);
+ }
+
+ @Test
+ void collectedEventsReturnsImmutableSnapshot() {
+ var billing = newBilling(false, true);
+ billing.emitEvent(usage(ModelProvider.NVIDIA, ModelType.EMBEDDING));
+
+ var snapshot = billing.collectedEvents();
+ // Snapshot must not reflect later writes — it's a defensive copy.
+ int before = snapshot.size();
+ billing.emitEvent(usage(ModelProvider.OPENAI, ModelType.EMBEDDING));
+ assertThat(snapshot).hasSize(before);
+ // And the snapshot itself must not be modifiable.
+ assertThatThrownBy(snapshot::clear).isInstanceOf(UnsupportedOperationException.class);
+ }
+
+ @Test
+ void collectedEventsEmptyWhenNeitherSinkActive() {
+ // create() never builds DefaultBilling with both flags off, but emitEvent must still be a
+ // no-op (and the buffer empty) if it ever happens.
+ var billing = newBilling(false, false);
+ billing.emitEvent(usage(ModelProvider.NVIDIA, ModelType.EMBEDDING));
+
+ assertThat(billing.collectedEvents()).isEmpty();
+ }
+
// ============================================================
// Helpers
// ============================================================
@@ -230,14 +292,28 @@ private static DefaultBilling newBilling() {
return newBilling(INTERNAL_PROVIDERS, Optional.empty());
}
+ /** Logging on, response off — the default for buildEvents / logging tests. */
private static DefaultBilling newBilling(
List internalProviders, Optional> enabledEventTypes) {
+ return newBilling(internalProviders, enabledEventTypes, true, false);
+ }
+
+ /** Selects which sinks are active, with the default INTERNAL_PROVIDERS / all-events config. */
+ private static DefaultBilling newBilling(boolean loggingEnabled, boolean responseEnabled) {
+ return newBilling(INTERNAL_PROVIDERS, Optional.empty(), loggingEnabled, responseEnabled);
+ }
+
+ private static DefaultBilling newBilling(
+ List internalProviders,
+ Optional> enabledEventTypes,
+ boolean loggingEnabled,
+ boolean responseEnabled) {
var config = mock(BillingConfig.class);
when(config.product()).thenReturn(PRODUCT);
when(config.resourceType()).thenReturn(RESOURCE_TYPE);
when(config.internalModelProviders()).thenReturn(internalProviders);
when(config.enabledEventTypes()).thenReturn(enabledEventTypes);
- return new DefaultBilling(config);
+ return new DefaultBilling(config, loggingEnabled, responseEnabled);
}
private ModelUsage usage(ModelProvider provider, ModelType modelType) {