From 5845b8d996162bf4fd1e7758ac1e90e53fd573f8 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Thu, 18 Jun 2026 14:56:31 -0400 Subject: [PATCH 1/3] feat(producer): add metrics for callback execution latency --- arroyo/backends/kafka/consumer.py | 46 ++++++++++++++++++++--- arroyo/utils/metric_defs.py | 3 ++ tests/backends/test_confluent_producer.py | 27 ++++++++++++- tests/backends/test_kafka_producer.py | 35 ++++++++++++++++- 4 files changed, 104 insertions(+), 7 deletions(-) diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index 7a84dd43..06e8d486 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -767,6 +767,7 @@ def __init__( self.producer_name = configuration.get("client.id") or None self.__metrics = get_metrics() self.__produce_counters: MutableMapping[str, int] = defaultdict(int) + self.__callback_latency: list[float] = [] self.__reset_metrics() # The worker must execute in a separate thread to ensure that callbacks @@ -795,8 +796,10 @@ def __delivery_callback( payload: KafkaPayload, error: KafkaError, message: ConfluentMessage, + time_of_produce: float, ) -> None: self.__produce_counters["error" if error is not None else "success"] += 1 + self.__callback_latency.append(time.time() - time_of_produce) self.__throttled_record() if error is not None: @@ -844,11 +847,17 @@ def produce( future = Future() future.set_running_or_notify_cancel() + time_of_produce = time.time() produce( value=payload.value, key=payload.key, headers=list(payload.headers), - on_delivery=partial(self.__delivery_callback, future, payload), + on_delivery=partial( + self.__delivery_callback, + future, + payload, + time_of_produce=time_of_produce, + ), ) return future @@ -866,10 +875,19 @@ def __flush_metrics(self) -> None: value=count, tags=tags, ) + for latency in self.__callback_latency: + self.__metrics.timing( + name="arroyo.producer.callback_latency", + value=latency, + tags={ + "producer_name": self.producer_name if self.producer_name else "N/A" + }, + ) self.__reset_metrics() def __reset_metrics(self) -> None: self.__produce_counters.clear() + self.__callback_latency.clear() self.__last_record_time = time.time() def __throttled_record(self) -> None: @@ -894,37 +912,46 @@ def __init__(self, configuration: Mapping[str, Any]) -> None: self.producer_name = configuration.get("client.id") or None self.__metrics = get_metrics() self.__produce_counters: MutableMapping[str, int] = defaultdict(int) + self.__callback_latency: list[float] = [] self.__reset_metrics() def __metrics_delivery_callback( self, error: Optional[KafkaError], _message: ConfluentMessage, + time_of_produce: float, ) -> None: if error is not None: status = "error" else: status = "success" self.__produce_counters[status] += 1 + self.__callback_latency.append(time.time() - time_of_produce) self.__throttled_record() def __delivery_callback( self, user_callback: Optional[DeliveryCallback], + time_of_produce: float, ) -> DeliveryCallback: - def wrapped(error: Optional[KafkaError], message: ConfluentMessage) -> None: - self.__metrics_delivery_callback(error, message) + def wrapped( + error: Optional[KafkaError], + message: ConfluentMessage, + time_of_produce: float, + ) -> None: + self.__metrics_delivery_callback(error, message, time_of_produce) if user_callback is not None: user_callback(error, message) - return wrapped + return partial(wrapped, time_of_produce=time_of_produce) def produce(self, *args: Any, **kwargs: Any) -> None: # callback and on_delivery are aliases, callback takes precedence over on_delivery callback = kwargs.pop("callback", None) on_delivery = kwargs.pop("on_delivery", None) user_callback = callback or on_delivery - wrapped_callback = self.__delivery_callback(user_callback) + time_of_produce = time.time() + wrapped_callback = self.__delivery_callback(user_callback, time_of_produce) super().produce(*args, **kwargs, on_delivery=wrapped_callback) # type: ignore[misc] def __flush_metrics(self) -> None: @@ -937,6 +964,14 @@ def __flush_metrics(self) -> None: value=count, tags=tags, ) + for latency in self.__callback_latency: + self.__metrics.timing( + name="arroyo.producer.callback_latency", + value=latency, + tags={ + "producer_name": self.producer_name if self.producer_name else "N/A" + }, + ) self.__reset_metrics() def flush(self, timeout: float | None = -1) -> int: @@ -946,6 +981,7 @@ def flush(self, timeout: float | None = -1) -> int: def __reset_metrics(self) -> None: self.__produce_counters.clear() + self.__callback_latency.clear() self.__last_record_time = time.time() def __throttled_record(self) -> None: diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index a7917410..fda6c359 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -125,6 +125,9 @@ # Gauge: Producer message count metric from librdkafka statistics # Tagged by producer_name "arroyo.producer.librdkafka.message_count", + # Time: Latency between when a message is produced, and when its delivery callback is called + # Tagged by producer_name + "arroyo.producer.callback_latency", # Gauge: Maximum producer message count from librdkafka statistics # Tagged by producer_name "arroyo.producer.librdkafka.message_count_max", diff --git a/tests/backends/test_confluent_producer.py b/tests/backends/test_confluent_producer.py index 91fb88bb..57df7cfe 100644 --- a/tests/backends/test_confluent_producer.py +++ b/tests/backends/test_confluent_producer.py @@ -6,7 +6,7 @@ from confluent_kafka import Producer as ConfluentKafkaProducer from arroyo.backends.kafka.consumer import ConfluentProducer -from tests.metrics import Increment, TestingMetricsBackend +from tests.metrics import Increment, TestingMetricsBackend, Timing class TestConfluentProducer: @@ -51,6 +51,31 @@ def test_metrics_callback_records_error(self) -> None: in TestingMetricsBackend.calls ) + @mock.patch("arroyo.backends.kafka.consumer.time.time") + def test_metrics_callback_records_callback_latency( + self, mock_time: mock.Mock + ) -> None: + """Test that the metrics callback records a callback_latency timing metric""" + mock_time.return_value = 1000.5 + + producer = ConfluentProducer( + {"bootstrap.servers": "fake:9092", "client.id": "test-producer-name"} + ) + mock_message = mock.Mock(spec=ConfluentMessage) + producer._ConfluentProducer__metrics_delivery_callback( # type: ignore[attr-defined] + None, mock_message, time_of_produce=1000.0 + ) + producer.flush() + + assert ( + Timing( + "arroyo.producer.callback_latency", + 0.5, + {"producer_name": "test-producer-name"}, + ) + in TestingMetricsBackend.calls + ) + def test_delivery_callback_wraps_user_callback(self) -> None: """Test that the delivery callback wrapper calls both metrics and user callbacks""" producer = ConfluentProducer( diff --git a/tests/backends/test_kafka_producer.py b/tests/backends/test_kafka_producer.py index 7d1164b6..a0a02559 100644 --- a/tests/backends/test_kafka_producer.py +++ b/tests/backends/test_kafka_producer.py @@ -8,7 +8,7 @@ from arroyo.backends.kafka.configuration import producer_stats_callback from arroyo.backends.kafka.consumer import KafkaPayload, KafkaProducer from arroyo.types import BrokerValue -from tests.metrics import Increment, TestingMetricsBackend +from tests.metrics import Increment, TestingMetricsBackend, Timing @mock.patch("arroyo.backends.kafka.configuration.get_metrics") @@ -159,3 +159,36 @@ def test_delivery_callback_records_error(self) -> None: Increment("arroyo.producer.produce_status", 1, {"status": "error"}) in TestingMetricsBackend.calls ) + + @mock.patch("arroyo.backends.kafka.consumer.time.time") + def test_delivery_callback_records_callback_latency( + self, mock_time: mock.Mock + ) -> None: + """The delivery callback records a callback_latency timing metric""" + mock_time.return_value = 1000.5 + + producer = KafkaProducer( + {"bootstrap.servers": "fake:9092", "client.id": "test-producer-name"} + ) + payload = KafkaPayload(None, b"value", []) + future: Future[BrokerValue[KafkaPayload]] = Future() + + mock_message = mock.Mock(spec=ConfluentMessage) + mock_message.timestamp.return_value = (TIMESTAMP_CREATE_TIME, 1234567890000) + mock_message.topic.return_value = "test-topic" + mock_message.partition.return_value = 0 + mock_message.offset.return_value = 0 + + producer._KafkaProducer__delivery_callback( # type: ignore[attr-defined] + future, payload, None, mock_message, time_of_produce=1000.0 + ) + producer._KafkaProducer__flush_metrics() # type: ignore[attr-defined] + + assert ( + Timing( + "arroyo.producer.callback_latency", + 0.5, + {"producer_name": "test-producer-name"}, + ) + in TestingMetricsBackend.calls + ) From 2de8e7b665858d9ac9ce01c8f1874ffc2136d187 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Thu, 18 Jun 2026 15:12:31 -0400 Subject: [PATCH 2/3] just use ConfluentProducer --- arroyo/backends/kafka/consumer.py | 20 +---------------- tests/backends/test_confluent_producer.py | 27 +---------------------- 2 files changed, 2 insertions(+), 45 deletions(-) diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index 06e8d486..019a284a 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -767,7 +767,6 @@ def __init__( self.producer_name = configuration.get("client.id") or None self.__metrics = get_metrics() self.__produce_counters: MutableMapping[str, int] = defaultdict(int) - self.__callback_latency: list[float] = [] self.__reset_metrics() # The worker must execute in a separate thread to ensure that callbacks @@ -796,10 +795,8 @@ def __delivery_callback( payload: KafkaPayload, error: KafkaError, message: ConfluentMessage, - time_of_produce: float, ) -> None: self.__produce_counters["error" if error is not None else "success"] += 1 - self.__callback_latency.append(time.time() - time_of_produce) self.__throttled_record() if error is not None: @@ -847,17 +844,11 @@ def produce( future = Future() future.set_running_or_notify_cancel() - time_of_produce = time.time() produce( value=payload.value, key=payload.key, headers=list(payload.headers), - on_delivery=partial( - self.__delivery_callback, - future, - payload, - time_of_produce=time_of_produce, - ), + on_delivery=partial(self.__delivery_callback, future, payload), ) return future @@ -875,19 +866,10 @@ def __flush_metrics(self) -> None: value=count, tags=tags, ) - for latency in self.__callback_latency: - self.__metrics.timing( - name="arroyo.producer.callback_latency", - value=latency, - tags={ - "producer_name": self.producer_name if self.producer_name else "N/A" - }, - ) self.__reset_metrics() def __reset_metrics(self) -> None: self.__produce_counters.clear() - self.__callback_latency.clear() self.__last_record_time = time.time() def __throttled_record(self) -> None: diff --git a/tests/backends/test_confluent_producer.py b/tests/backends/test_confluent_producer.py index 57df7cfe..91fb88bb 100644 --- a/tests/backends/test_confluent_producer.py +++ b/tests/backends/test_confluent_producer.py @@ -6,7 +6,7 @@ from confluent_kafka import Producer as ConfluentKafkaProducer from arroyo.backends.kafka.consumer import ConfluentProducer -from tests.metrics import Increment, TestingMetricsBackend, Timing +from tests.metrics import Increment, TestingMetricsBackend class TestConfluentProducer: @@ -51,31 +51,6 @@ def test_metrics_callback_records_error(self) -> None: in TestingMetricsBackend.calls ) - @mock.patch("arroyo.backends.kafka.consumer.time.time") - def test_metrics_callback_records_callback_latency( - self, mock_time: mock.Mock - ) -> None: - """Test that the metrics callback records a callback_latency timing metric""" - mock_time.return_value = 1000.5 - - producer = ConfluentProducer( - {"bootstrap.servers": "fake:9092", "client.id": "test-producer-name"} - ) - mock_message = mock.Mock(spec=ConfluentMessage) - producer._ConfluentProducer__metrics_delivery_callback( # type: ignore[attr-defined] - None, mock_message, time_of_produce=1000.0 - ) - producer.flush() - - assert ( - Timing( - "arroyo.producer.callback_latency", - 0.5, - {"producer_name": "test-producer-name"}, - ) - in TestingMetricsBackend.calls - ) - def test_delivery_callback_wraps_user_callback(self) -> None: """Test that the delivery callback wrapper calls both metrics and user callbacks""" producer = ConfluentProducer( From 753e864f4dc9d83a58fbfe1e2a9a2c19f74d0cdc Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Thu, 18 Jun 2026 15:16:29 -0400 Subject: [PATCH 3/3] add test --- tests/backends/test_confluent_producer.py | 33 ++++++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/tests/backends/test_confluent_producer.py b/tests/backends/test_confluent_producer.py index 91fb88bb..e47acb2e 100644 --- a/tests/backends/test_confluent_producer.py +++ b/tests/backends/test_confluent_producer.py @@ -6,7 +6,7 @@ from confluent_kafka import Producer as ConfluentKafkaProducer from arroyo.backends.kafka.consumer import ConfluentProducer -from tests.metrics import Increment, TestingMetricsBackend +from tests.metrics import Increment, TestingMetricsBackend, Timing class TestConfluentProducer: @@ -28,7 +28,7 @@ def test_metrics_callback_records_success(self) -> None: {"bootstrap.servers": "fake:9092", "client.id": "test-producer-name"} ) mock_message = mock.Mock(spec=ConfluentMessage) - producer._ConfluentProducer__metrics_delivery_callback(None, mock_message) # type: ignore[attr-defined] + producer._ConfluentProducer__metrics_delivery_callback(None, mock_message, 1000.0) # type: ignore[attr-defined] producer.flush() # Flush buffered metrics assert ( Increment( @@ -44,13 +44,38 @@ def test_metrics_callback_records_error(self) -> None: producer = ConfluentProducer({"bootstrap.servers": "fake:9092"}) mock_error = mock.Mock(spec=KafkaError) mock_message = mock.Mock(spec=ConfluentMessage) - producer._ConfluentProducer__metrics_delivery_callback(mock_error, mock_message) # type: ignore[attr-defined] + producer._ConfluentProducer__metrics_delivery_callback(mock_error, mock_message, 1000.0) # type: ignore[attr-defined] producer.flush() # Flush buffered metrics assert ( Increment("arroyo.producer.produce_status", 1, {"status": "error"}) in TestingMetricsBackend.calls ) + @mock.patch("arroyo.backends.kafka.consumer.time.time") + def test_metrics_callback_records_callback_latency( + self, mock_time: mock.Mock + ) -> None: + """Test that the metrics callback records a callback_latency timing metric""" + mock_time.return_value = 1000.5 + + producer = ConfluentProducer( + {"bootstrap.servers": "fake:9092", "client.id": "test-producer-name"} + ) + mock_message = mock.Mock(spec=ConfluentMessage) + producer._ConfluentProducer__metrics_delivery_callback( # type: ignore[attr-defined] + None, mock_message, time_of_produce=1000.0 + ) + producer.flush() + + assert ( + Timing( + "arroyo.producer.callback_latency", + 0.5, + {"producer_name": "test-producer-name"}, + ) + in TestingMetricsBackend.calls + ) + def test_delivery_callback_wraps_user_callback(self) -> None: """Test that the delivery callback wrapper calls both metrics and user callbacks""" producer = ConfluentProducer( @@ -63,7 +88,7 @@ def user_callback( ) -> None: user_callback_invoked.append((error, message)) - wrapped = producer._ConfluentProducer__delivery_callback(user_callback) # type: ignore[attr-defined] + wrapped = producer._ConfluentProducer__delivery_callback(user_callback, 1000.0) # type: ignore[attr-defined] mock_message = mock.Mock(spec=ConfluentMessage) wrapped(None, mock_message) producer.flush() # Flush buffered metrics