Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ To install manually, first clone this here repo and:
.. parsed-literal::

cd kiel
python setup.py install
# optionally update version in __init__.py
pip install .


Documentation
Expand Down
4 changes: 2 additions & 2 deletions kiel/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version_info = (0, 9, 4)
version_info = (0, 9, 4, 1)

__version__ = ".".join(map(str, version_info)) + "-dev"
__version__ = ".".join(map(str, version_info)) #+ "-dev"
11 changes: 8 additions & 3 deletions kiel/clients/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


log = logging.getLogger(__name__)

# log.setLevel(logging.DEBUG) # uncomment for troubleshooting

class BaseConsumer(Client):
"""
Expand Down Expand Up @@ -128,9 +128,11 @@ def consume(self, topic, start=None):
)

results = yield self.send(requests)
# log.debug(results)
raise gen.Return([
msg for messageset in results.values() for msg in messageset
if messageset
if msg # Skip empty messages
# BUG: troubleshoot why there are None values
])

def handle_fetch_response(self, response):
Expand Down Expand Up @@ -195,6 +197,9 @@ def deserialize_messages(self, topic_name, partition):
continue

messages.append(value)
self.offsets[topic_name][partition.partition_id] = offset + 1
# log.debug("offset %d => ", self.offsets[topic_name][partition.partition_id])
if offset>=0: # FIX: offset may have special value (e.g., def. END(-1))
self.offsets[topic_name][partition.partition_id] = offset + 1
# log.debug("%d", self.offsets[topic_name][partition.partition_id])

return messages
6 changes: 5 additions & 1 deletion kiel/clients/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class SingleConsumer(BaseConsumer):
BEGINNING = -2
#: special offset api value for 'very latest offset'
END = -1
#: For api ref. see https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/api/OffsetRequest.scala

@property
def allocation(self):
Expand Down Expand Up @@ -104,7 +105,10 @@ def handle_offset_response(self, response):
code = partition.error_code
if code == errors.no_error:
offset = partition.offsets[0]
self.offsets[topic][partition.partition_id] = offset
# log.debug("offset %d ==> ", self.offsets[topic][partition.partition_id])
if offset>=0:
self.offsets[topic][partition.partition_id] = offset
# log.debug("%d", self.offsets[topic][partition.partition_id])
elif code in errors.retriable:
self.heal_cluster = True
self.synced_offsets.discard(topic)
Expand Down
1 change: 1 addition & 0 deletions kiel/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def heal(self, response=None):

missing_conns = yield self.process_brokers(response.brokers)
missing_topics = self.process_topics(response.topics)

while missing_conns or missing_topics:
response = yield self.get_metadata(topics=list(missing_topics))
missing_conns = yield self.process_brokers(response.brokers)
Expand Down
5 changes: 4 additions & 1 deletion kiel/iterables.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import generator_stop

def drain(iterable):
"""
Helper method that empties an iterable as it is iterated over.
Expand All @@ -23,4 +25,5 @@ def next_item(coll):
try:
yield next_item(iterable)
except (IndexError, KeyError):
raise StopIteration
return
# merge from fork https://github.com/denissmirnov/kiel/commit/fa80aa1ccd790c0fbbd8cc46a72162195e1aed69