diff --git a/arroyo/backends/kafka/consumer.py b/arroyo/backends/kafka/consumer.py index db30a9c2..9799dab8 100644 --- a/arroyo/backends/kafka/consumer.py +++ b/arroyo/backends/kafka/consumer.py @@ -865,37 +865,46 @@ def __init__(self, configuration: Mapping[str, Any]) -> None: self.producer_name = configuration.get("client.id") or None self.__metrics = get_metrics() self.__produce_counters: MutableMapping[str, int] = defaultdict(int) + self.__callback_latency: list[float] = [] self.__reset_metrics() def __metrics_delivery_callback( self, error: Optional[KafkaError], _message: ConfluentMessage, + time_of_produce: float, ) -> None: if error is not None: status = "error" else: status = "success" self.__produce_counters[status] += 1 + self.__callback_latency.append(time.time() - time_of_produce) self.__throttled_record() def __delivery_callback( self, user_callback: Optional[DeliveryCallback], + time_of_produce: float, ) -> DeliveryCallback: - def wrapped(error: Optional[KafkaError], message: ConfluentMessage) -> None: - self.__metrics_delivery_callback(error, message) + def wrapped( + error: Optional[KafkaError], + message: ConfluentMessage, + time_of_produce: float, + ) -> None: + self.__metrics_delivery_callback(error, message, time_of_produce) if user_callback is not None: user_callback(error, message) - return wrapped + return partial(wrapped, time_of_produce=time_of_produce) def produce(self, *args: Any, **kwargs: Any) -> None: # callback and on_delivery are aliases, callback takes precedence over on_delivery callback = kwargs.pop("callback", None) on_delivery = kwargs.pop("on_delivery", None) user_callback = callback or on_delivery - wrapped_callback = self.__delivery_callback(user_callback) + time_of_produce = time.time() + wrapped_callback = self.__delivery_callback(user_callback, time_of_produce) super().produce(*args, **kwargs, on_delivery=wrapped_callback) # type: ignore[misc] def __flush_metrics(self) -> None: @@ -908,6 +917,14 @@ def __flush_metrics(self) -> None: value=count, tags=tags, ) + for latency in self.__callback_latency: + self.__metrics.timing( + name="arroyo.producer.callback_latency", + value=latency, + tags={ + "producer_name": self.producer_name if self.producer_name else "N/A" + }, + ) self.__reset_metrics() def flush(self, timeout: float | None = -1) -> int: @@ -917,6 +934,7 @@ def flush(self, timeout: float | None = -1) -> int: def __reset_metrics(self) -> None: self.__produce_counters.clear() + self.__callback_latency.clear() self.__last_record_time = time.time() def __throttled_record(self) -> None: diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index a7917410..fda6c359 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -125,6 +125,9 @@ # Gauge: Producer message count metric from librdkafka statistics # Tagged by producer_name "arroyo.producer.librdkafka.message_count", + # Time: Latency between when a message is produced, and when its delivery callback is called + # Tagged by producer_name + "arroyo.producer.callback_latency", # Gauge: Maximum producer message count from librdkafka statistics # Tagged by producer_name "arroyo.producer.librdkafka.message_count_max", diff --git a/tests/backends/test_confluent_producer.py b/tests/backends/test_confluent_producer.py index 91fb88bb..e47acb2e 100644 --- a/tests/backends/test_confluent_producer.py +++ b/tests/backends/test_confluent_producer.py @@ -6,7 +6,7 @@ from confluent_kafka import Producer as ConfluentKafkaProducer from arroyo.backends.kafka.consumer import ConfluentProducer -from tests.metrics import Increment, TestingMetricsBackend +from tests.metrics import Increment, TestingMetricsBackend, Timing class TestConfluentProducer: @@ -28,7 +28,7 @@ def test_metrics_callback_records_success(self) -> None: {"bootstrap.servers": "fake:9092", "client.id": "test-producer-name"} ) mock_message = mock.Mock(spec=ConfluentMessage) - producer._ConfluentProducer__metrics_delivery_callback(None, mock_message) # type: ignore[attr-defined] + producer._ConfluentProducer__metrics_delivery_callback(None, mock_message, 1000.0) # type: ignore[attr-defined] producer.flush() # Flush buffered metrics assert ( Increment( @@ -44,13 +44,38 @@ def test_metrics_callback_records_error(self) -> None: producer = ConfluentProducer({"bootstrap.servers": "fake:9092"}) mock_error = mock.Mock(spec=KafkaError) mock_message = mock.Mock(spec=ConfluentMessage) - producer._ConfluentProducer__metrics_delivery_callback(mock_error, mock_message) # type: ignore[attr-defined] + producer._ConfluentProducer__metrics_delivery_callback(mock_error, mock_message, 1000.0) # type: ignore[attr-defined] producer.flush() # Flush buffered metrics assert ( Increment("arroyo.producer.produce_status", 1, {"status": "error"}) in TestingMetricsBackend.calls ) + @mock.patch("arroyo.backends.kafka.consumer.time.time") + def test_metrics_callback_records_callback_latency( + self, mock_time: mock.Mock + ) -> None: + """Test that the metrics callback records a callback_latency timing metric""" + mock_time.return_value = 1000.5 + + producer = ConfluentProducer( + {"bootstrap.servers": "fake:9092", "client.id": "test-producer-name"} + ) + mock_message = mock.Mock(spec=ConfluentMessage) + producer._ConfluentProducer__metrics_delivery_callback( # type: ignore[attr-defined] + None, mock_message, time_of_produce=1000.0 + ) + producer.flush() + + assert ( + Timing( + "arroyo.producer.callback_latency", + 0.5, + {"producer_name": "test-producer-name"}, + ) + in TestingMetricsBackend.calls + ) + def test_delivery_callback_wraps_user_callback(self) -> None: """Test that the delivery callback wrapper calls both metrics and user callbacks""" producer = ConfluentProducer( @@ -63,7 +88,7 @@ def user_callback( ) -> None: user_callback_invoked.append((error, message)) - wrapped = producer._ConfluentProducer__delivery_callback(user_callback) # type: ignore[attr-defined] + wrapped = producer._ConfluentProducer__delivery_callback(user_callback, 1000.0) # type: ignore[attr-defined] mock_message = mock.Mock(spec=ConfluentMessage) wrapped(None, mock_message) producer.flush() # Flush buffered metrics