From e1318cba07d2933aeeaead290f296bbc5ffcb71b Mon Sep 17 00:00:00 2001 From: Oren N Date: Thu, 21 Mar 2024 23:39:50 +0200 Subject: [PATCH] Fix for python 3.7 generator..StopIteration (denissmirnov);Fix for partition offset reseting to BEGINNING each time causing consume() to re-read all messages;Workaround fix for errors after each message;Added troubleshooting;Version bump;README --- README.rst | 3 ++- kiel/__init__.py | 4 ++-- kiel/clients/consumer.py | 11 ++++++++--- kiel/clients/single.py | 6 +++++- kiel/cluster.py | 1 + kiel/iterables.py | 5 ++++- 6 files changed, 22 insertions(+), 8 deletions(-) diff --git a/README.rst b/README.rst index f792d3e..46a93c6 100644 --- a/README.rst +++ b/README.rst @@ -47,7 +47,8 @@ To install manually, first clone this here repo and: .. parsed-literal:: cd kiel - python setup.py install + # optionally update version in __init__.py + pip install . Documentation diff --git a/kiel/__init__.py b/kiel/__init__.py index a390954..375f84e 100644 --- a/kiel/__init__.py +++ b/kiel/__init__.py @@ -1,3 +1,3 @@ -version_info = (0, 9, 4) +version_info = (0, 9, 4, 1) -__version__ = ".".join(map(str, version_info)) + "-dev" +__version__ = ".".join(map(str, version_info)) #+ "-dev" diff --git a/kiel/clients/consumer.py b/kiel/clients/consumer.py index 209f4ee..98863ba 100644 --- a/kiel/clients/consumer.py +++ b/kiel/clients/consumer.py @@ -14,7 +14,7 @@ log = logging.getLogger(__name__) - +# log.setLevel(logging.DEBUG) # uncomment for troubleshooting class BaseConsumer(Client): """ @@ -128,9 +128,11 @@ def consume(self, topic, start=None): ) results = yield self.send(requests) +# log.debug(results) raise gen.Return([ msg for messageset in results.values() for msg in messageset - if messageset + if msg # Skip empty messages + # BUG: troubleshoot why there are None values ]) def handle_fetch_response(self, response): @@ -195,6 +197,9 @@ def deserialize_messages(self, topic_name, partition): continue messages.append(value) - self.offsets[topic_name][partition.partition_id] = offset + 1 +# log.debug("offset %d => ", self.offsets[topic_name][partition.partition_id]) + if offset>=0: # FIX: offset may have special value (e.g., def. END(-1)) + self.offsets[topic_name][partition.partition_id] = offset + 1 +# log.debug("%d", self.offsets[topic_name][partition.partition_id]) return messages diff --git a/kiel/clients/single.py b/kiel/clients/single.py index 867b8dc..3f749e6 100644 --- a/kiel/clients/single.py +++ b/kiel/clients/single.py @@ -32,6 +32,7 @@ class SingleConsumer(BaseConsumer): BEGINNING = -2 #: special offset api value for 'very latest offset' END = -1 + #: For api ref. see https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/api/OffsetRequest.scala @property def allocation(self): @@ -104,7 +105,10 @@ def handle_offset_response(self, response): code = partition.error_code if code == errors.no_error: offset = partition.offsets[0] - self.offsets[topic][partition.partition_id] = offset +# log.debug("offset %d ==> ", self.offsets[topic][partition.partition_id]) + if offset>=0: + self.offsets[topic][partition.partition_id] = offset +# log.debug("%d", self.offsets[topic][partition.partition_id]) elif code in errors.retriable: self.heal_cluster = True self.synced_offsets.discard(topic) diff --git a/kiel/cluster.py b/kiel/cluster.py index a7d02c5..e399be3 100644 --- a/kiel/cluster.py +++ b/kiel/cluster.py @@ -137,6 +137,7 @@ def heal(self, response=None): missing_conns = yield self.process_brokers(response.brokers) missing_topics = self.process_topics(response.topics) + while missing_conns or missing_topics: response = yield self.get_metadata(topics=list(missing_topics)) missing_conns = yield self.process_brokers(response.brokers) diff --git a/kiel/iterables.py b/kiel/iterables.py index 547de6d..8918ab7 100644 --- a/kiel/iterables.py +++ b/kiel/iterables.py @@ -1,3 +1,5 @@ +from __future__ import generator_stop + def drain(iterable): """ Helper method that empties an iterable as it is iterated over. @@ -23,4 +25,5 @@ def next_item(coll): try: yield next_item(iterable) except (IndexError, KeyError): - raise StopIteration + return + # merge from fork https://github.com/denissmirnov/kiel/commit/fa80aa1ccd790c0fbbd8cc46a72162195e1aed69