Skip to content

Commit 08870b6

Browse files
authored
Merge pull request #431 from zendesk/dasch/fix-offset-cache
Clear cached offset when seeking to default
2 parents 3aeebc2 + 554f71d commit 08870b6

File tree

1 file changed

+7
-0
lines changed

1 file changed

+7
-0
lines changed

lib/kafka/offset_manager.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ def mark_as_processed(topic, partition, offset)
3636
end
3737

3838
def seek_to_default(topic, partition)
39+
# Remove any cached offset, in case things have changed broker-side.
40+
clear_resolved_offset(topic)
41+
3942
seek_to(topic, partition, -1)
4043
end
4144

@@ -104,6 +107,10 @@ def clear_offsets_excluding(excluded)
104107

105108
private
106109

110+
def clear_resolved_offset(topic)
111+
@resolved_offsets.delete(topic)
112+
end
113+
107114
def resolve_offset(topic, partition)
108115
@resolved_offsets[topic] ||= fetch_resolved_offsets(topic)
109116
@resolved_offsets[topic].fetch(partition)

0 commit comments

Comments
 (0)