Class: Kafka::Pause

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/pause.rb

Overview

Manages the pause state of a partition.

The processing of messages in a partition can be paused, e.g. if there was an exception during processing. This could be caused by a downstream service not being available. A typical way of solving such an issue is to back off for a little while and then try again. In order to do that, pause the partition.

Instance Method Summary collapse

Constructor Details

#initialize(clock: Time) ⇒ Pause

Returns a new instance of Pause.

 12 13 14 15 16 17 18 19
# File 'lib/kafka/pause.rb', line 12 def initialize(clock: Time) @clock = clock @started_at = nil @pauses = 0 @timeout = nil @max_timeout = nil @exponential_backoff = false end 

Instance Method Details

#expired?Boolean

Whether the pause has expired.

Returns:

  • (Boolean)
 66 67 68 69 70 71 72
# File 'lib/kafka/pause.rb', line 66 def expired? # We never expire the pause if timeout is nil. return false if @timeout.nil? # Have we passed the end of the pause duration? @clock.now >= ends_at end 

#pause!(timeout: nil, max_timeout: nil, exponential_backoff: false) ⇒ Object

Mark the partition as paused.

If exponential backoff is enabled, each subsequent pause of a partition will cause a doubling of the actual timeout, i.e. for pause number n, the actual timeout will be 2^n * timeout.

Only when #reset! is called is this state cleared.

Parameters:

  • timeout (nil, Integer) (defaults to: nil)

    if specified, the partition will automatically resume after this many seconds.

  • exponential_backoff (Boolean) (defaults to: false)

    whether to enable exponential timeouts.

 32 33 34 35 36 37 38
# File 'lib/kafka/pause.rb', line 32 def pause!(timeout: nil, max_timeout: nil, exponential_backoff: false) @started_at = @clock.now @timeout = timeout @max_timeout = max_timeout @exponential_backoff = exponential_backoff @pauses += 1 end 

#pause_durationObject

 57 58 59 60 61 62 63
# File 'lib/kafka/pause.rb', line 57 def pause_duration if paused? Time.now - @started_at else 0 end end 

#paused?Boolean

Whether the partition is currently paused. The pause may have expired, in which case #expired? should be checked as well.

Returns:

  • (Boolean)
 52 53 54 55
# File 'lib/kafka/pause.rb', line 52 def paused? # This is nil if we're not currently paused. !@started_at.nil? end 

#reset!Object

Resets the pause state, ensuring that the next pause is not exponential.

 75 76 77
# File 'lib/kafka/pause.rb', line 75 def reset! @pauses = 0 end 

#resume!Object

Resumes the partition.

The number of pauses is still retained, and if the partition is paused again it may be with an exponential backoff.

 44 45 46 47 48
# File 'lib/kafka/pause.rb', line 44 def resume! @started_at = nil @timeout = nil @max_timeout = nil end