Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions arroyo/backends/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,37 +865,46 @@ def __init__(self, configuration: Mapping[str, Any]) -> None:
self.producer_name = configuration.get("client.id") or None
self.__metrics = get_metrics()
self.__produce_counters: MutableMapping[str, int] = defaultdict(int)
self.__callback_latency: list[float] = []
self.__reset_metrics()

def __metrics_delivery_callback(
self,
error: Optional[KafkaError],
_message: ConfluentMessage,
time_of_produce: float,
) -> None:
if error is not None:
status = "error"
else:
status = "success"
self.__produce_counters[status] += 1
self.__callback_latency.append(time.time() - time_of_produce)
self.__throttled_record()

def __delivery_callback(
self,
user_callback: Optional[DeliveryCallback],
time_of_produce: float,
) -> DeliveryCallback:
def wrapped(error: Optional[KafkaError], message: ConfluentMessage) -> None:
self.__metrics_delivery_callback(error, message)
def wrapped(
error: Optional[KafkaError],
message: ConfluentMessage,
time_of_produce: float,
) -> None:
self.__metrics_delivery_callback(error, message, time_of_produce)
if user_callback is not None:
user_callback(error, message)

return wrapped
return partial(wrapped, time_of_produce=time_of_produce)

def produce(self, *args: Any, **kwargs: Any) -> None:
# callback and on_delivery are aliases, callback takes precedence over on_delivery
callback = kwargs.pop("callback", None)
on_delivery = kwargs.pop("on_delivery", None)
user_callback = callback or on_delivery
wrapped_callback = self.__delivery_callback(user_callback)
time_of_produce = time.time()
wrapped_callback = self.__delivery_callback(user_callback, time_of_produce)
super().produce(*args, **kwargs, on_delivery=wrapped_callback) # type: ignore[misc]

def __flush_metrics(self) -> None:
Expand All @@ -908,6 +917,14 @@ def __flush_metrics(self) -> None:
value=count,
tags=tags,
)
for latency in self.__callback_latency:
self.__metrics.timing(
name="arroyo.producer.callback_latency",
value=latency,
tags={
"producer_name": self.producer_name if self.producer_name else "N/A"
},
)
Comment thread
bmckerry marked this conversation as resolved.
self.__reset_metrics()

def flush(self, timeout: float | None = -1) -> int:
Expand All @@ -917,6 +934,7 @@ def flush(self, timeout: float | None = -1) -> int:

def __reset_metrics(self) -> None:
self.__produce_counters.clear()
self.__callback_latency.clear()
self.__last_record_time = time.time()

def __throttled_record(self) -> None:
Expand Down
3 changes: 3 additions & 0 deletions arroyo/utils/metric_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@
# Gauge: Producer message count metric from librdkafka statistics
# Tagged by producer_name
"arroyo.producer.librdkafka.message_count",
# Time: Latency between when a message is produced, and when its delivery callback is called
# Tagged by producer_name
"arroyo.producer.callback_latency",
# Gauge: Maximum producer message count from librdkafka statistics
# Tagged by producer_name
"arroyo.producer.librdkafka.message_count_max",
Expand Down
33 changes: 29 additions & 4 deletions tests/backends/test_confluent_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from confluent_kafka import Producer as ConfluentKafkaProducer

from arroyo.backends.kafka.consumer import ConfluentProducer
from tests.metrics import Increment, TestingMetricsBackend
from tests.metrics import Increment, TestingMetricsBackend, Timing


class TestConfluentProducer:
Expand All @@ -28,7 +28,7 @@ def test_metrics_callback_records_success(self) -> None:
{"bootstrap.servers": "fake:9092", "client.id": "test-producer-name"}
)
mock_message = mock.Mock(spec=ConfluentMessage)
producer._ConfluentProducer__metrics_delivery_callback(None, mock_message) # type: ignore[attr-defined]
producer._ConfluentProducer__metrics_delivery_callback(None, mock_message, 1000.0) # type: ignore[attr-defined]
producer.flush() # Flush buffered metrics
assert (
Increment(
Expand All @@ -44,13 +44,38 @@ def test_metrics_callback_records_error(self) -> None:
producer = ConfluentProducer({"bootstrap.servers": "fake:9092"})
mock_error = mock.Mock(spec=KafkaError)
mock_message = mock.Mock(spec=ConfluentMessage)
producer._ConfluentProducer__metrics_delivery_callback(mock_error, mock_message) # type: ignore[attr-defined]
producer._ConfluentProducer__metrics_delivery_callback(mock_error, mock_message, 1000.0) # type: ignore[attr-defined]
producer.flush() # Flush buffered metrics
assert (
Increment("arroyo.producer.produce_status", 1, {"status": "error"})
in TestingMetricsBackend.calls
)

@mock.patch("arroyo.backends.kafka.consumer.time.time")
def test_metrics_callback_records_callback_latency(
self, mock_time: mock.Mock
) -> None:
"""Test that the metrics callback records a callback_latency timing metric"""
mock_time.return_value = 1000.5

producer = ConfluentProducer(
{"bootstrap.servers": "fake:9092", "client.id": "test-producer-name"}
)
mock_message = mock.Mock(spec=ConfluentMessage)
producer._ConfluentProducer__metrics_delivery_callback( # type: ignore[attr-defined]
None, mock_message, time_of_produce=1000.0
)
producer.flush()

assert (
Timing(
"arroyo.producer.callback_latency",
0.5,
{"producer_name": "test-producer-name"},
)
in TestingMetricsBackend.calls
)

def test_delivery_callback_wraps_user_callback(self) -> None:
"""Test that the delivery callback wrapper calls both metrics and user callbacks"""
producer = ConfluentProducer(
Expand All @@ -63,7 +88,7 @@ def user_callback(
) -> None:
user_callback_invoked.append((error, message))

wrapped = producer._ConfluentProducer__delivery_callback(user_callback) # type: ignore[attr-defined]
wrapped = producer._ConfluentProducer__delivery_callback(user_callback, 1000.0) # type: ignore[attr-defined]
mock_message = mock.Mock(spec=ConfluentMessage)
wrapped(None, mock_message)
producer.flush() # Flush buffered metrics
Expand Down
Loading