Reference documentation and code samples for the Cloud Pub/Sub API class Google::Cloud::PubSub::Subscriber.
Subscriber object used to stream and process messages from a Subscription. See Google::Cloud::PubSub::Subscription#listen
Inherits
- Object
Includes
- MonitorMixin
Example
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" subscriber = sub.listen do |received_message| # process message received_message.acknowledge! end # Start background threads that will call the block passed to listen. subscriber.start # Shut down the subscriber when ready to stop receiving messages. subscriber.stop!
Methods
#callback
def callback() -> ProcThe procedure that will handle the messages received from the subscription.
- (Proc) — the current value of callback
#callback_threads
def callback_threads() -> IntegerThe number of threads used to handle the received messages. Default is 8.
- (Integer) — the current value of callback_threads
#deadline
def deadline() -> NumericThe default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is 60.
- (Numeric) — the current value of deadline
#inventory
def inventory() -> IntegerThe number of received messages to be collected by subscriber. Default is 1,000.
- (Integer) — The maximum number of messages.
#inventory_bytesize
def inventory_bytesize() -> IntegerThe total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB).
- (Integer) — The maximum number of bytes.
#inventory_extension
def inventory_extension() -> IntegerThe number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour).
- (Integer) — The maximum number of seconds.
#inventory_limit
def inventory_limit() -> IntegerThe number of received messages to be collected by subscriber. Default is 1,000.
- (Integer) — The maximum number of messages.
#last_error
def last_error() -> Exception, nilThe most recent unhandled error to occur while listening to messages on the subscriber.
If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.
- (Exception, nil) — error The most recent error raised.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" subscriber = sub.listen do |received_message| # process message received_message.acknowledge! end # Start listening for messages and errors. subscriber.start # If an error was raised, it can be retrieved here: subscriber.last_error #=> nil # Shut down the subscriber when ready to stop receiving messages. subscriber.stop!
#max_duration_per_lease_extension
def max_duration_per_lease_extension() -> IntegerThe maximum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled).
- (Integer) — The maximum number of seconds.
#max_outstanding_bytes
def max_outstanding_bytes() -> IntegerThe total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB).
- (Integer) — The maximum number of bytes.
#max_outstanding_messages
def max_outstanding_messages() -> IntegerThe number of received messages to be collected by subscriber. Default is 1,000.
- (Integer) — The maximum number of messages.
#max_total_lease_duration
def max_total_lease_duration() -> IntegerThe number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour).
- (Integer) — The maximum number of seconds.
#message_ordering
def message_ordering() -> BooleanWhether message ordering has been enabled.
- (Boolean) — the current value of message_ordering
#min_duration_per_lease_extension
def min_duration_per_lease_extension() -> IntegerThe minimum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled).
- (Integer) — The minimum number of seconds.
#on_error
def on_error(&block) { |error| ... }Register to be notified of errors when raised.
If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.
Multiple error handlers can be added.
- (callback) — The block to be called when an error is raised.
- error (Exception) — The error raised.
require "google/cloud/pubsub" pubsub = Google::Cloud::PubSub.new sub = pubsub.subscription "my-topic-sub" subscriber = sub.listen do |received_message| # process message received_message.acknowledge! end # Register to be notified when unhandled errors occur. subscriber.on_error do |error| # log error puts error end # Start listening for messages and errors. subscriber.start # Shut down the subscriber when ready to stop receiving messages. subscriber.stop!
#push_threads
def push_threads() -> IntegerThe number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#modify_ack_deadline!). Default is 4.
- (Integer) — the current value of push_threads
#start
def start() -> SubscriberStarts the subscriber pulling from the subscription and processing the received messages.
- (Subscriber) — returns self so calls can be chained.
#started?
def started?() -> booleanWhether the subscriber has been started.
-  (boolean) — truewhen started,falseotherwise.
#stop
def stop() -> SubscriberImmediately stops the subscriber. No new messages will be pulled from the subscription. Use #wait! to block until all received messages have been processed or released: All actions taken on received messages that have not yet been sent to the API will be sent to the API. All received but unprocessed messages will be released back to the API and redelivered.
- (Subscriber) — returns self so calls can be chained.
#stop!
def stop!(timeout = nil) -> SubscriberStop this subscriber and block until the subscriber is fully stopped and all received messages have been processed or released, or until timeout seconds have passed.
- timeout (Number, nil) — The number of seconds to block until the subscriber is fully stopped. Default will block indefinitely.
- (Subscriber) — returns self so calls can be chained.
#stopped?
def stopped?() -> booleanWhether the subscriber has been stopped.
-  (boolean) — truewhen stopped,falseotherwise.
#streams
def streams() -> IntegerThe number of concurrent streams to open to pull messages from the subscription. Default is 4.
- (Integer) — the current value of streams
#subscription_name
def subscription_name() -> StringThe name of the subscription the messages are pulled from.
- (String) — the current value of subscription_name
#use_legacy_flow_control?
def use_legacy_flow_control?() -> BooleanWhether to enforce flow control at the client side only or to enforce it at both the client and the server. For more details about flow control see https://cloud.google.com/pubsub/docs/pull#config.
server side flow control are enforced.
-  (Boolean) — truewhen only client side flow control is enforced,falsewhen both client and
#wait!
def wait!(timeout = nil) -> SubscriberBlocks until the subscriber is fully stopped and all received messages have been processed or released, or until timeout seconds have passed.
Does not stop the subscriber. To stop the subscriber, first call #stop and then call #wait! to block until the subscriber is stopped.
- timeout (Number, nil) — The number of seconds to block until the subscriber is fully stopped. Default will block indefinitely.
- (Subscriber) — returns self so calls can be chained.