Skip to content

Commit fc648ae

Browse files
authored
feat: Support seek subscription in AdminClient (#176)
Seek subscription performs an out-of-band seek for a subscription to a specified target, which may be a backlog location, publish timestamp or event timestamp.
1 parent 3b9b717 commit fc648ae

File tree

8 files changed

+197
-4
lines changed

8 files changed

+197
-4
lines changed

google/cloud/pubsublite/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@
6262
from google.cloud.pubsublite_v1.types.admin import ListTopicSubscriptionsResponse
6363
from google.cloud.pubsublite_v1.types.admin import ListTopicsRequest
6464
from google.cloud.pubsublite_v1.types.admin import ListTopicsResponse
65+
from google.cloud.pubsublite_v1.types.admin import OperationMetadata
66+
from google.cloud.pubsublite_v1.types.admin import SeekSubscriptionRequest
67+
from google.cloud.pubsublite_v1.types.admin import SeekSubscriptionResponse
6568
from google.cloud.pubsublite_v1.types.admin import TopicPartitions
6669
from google.cloud.pubsublite_v1.types.admin import UpdateSubscriptionRequest
6770
from google.cloud.pubsublite_v1.types.admin import UpdateTopicRequest
@@ -70,6 +73,7 @@
7073
from google.cloud.pubsublite_v1.types.common import PubSubMessage
7174
from google.cloud.pubsublite_v1.types.common import SequencedMessage
7275
from google.cloud.pubsublite_v1.types.common import Subscription
76+
from google.cloud.pubsublite_v1.types.common import TimeTarget
7377
from google.cloud.pubsublite_v1.types.common import Topic
7478
from google.cloud.pubsublite_v1.types.cursor import CommitCursorRequest
7579
from google.cloud.pubsublite_v1.types.cursor import CommitCursorResponse
@@ -148,6 +152,7 @@
148152
"MessagePublishRequest",
149153
"MessagePublishResponse",
150154
"MessageResponse",
155+
"OperationMetadata",
151156
"PartitionAssignment",
152157
"PartitionAssignmentAck",
153158
"PartitionAssignmentRequest",
@@ -159,6 +164,8 @@
159164
"PublishResponse",
160165
"PublisherServiceAsyncClient",
161166
"PublisherServiceClient",
167+
"SeekSubscriptionRequest",
168+
"SeekSubscriptionResponse",
162169
"SeekRequest",
163170
"SeekResponse",
164171
"SequencedCommitCursorRequest",
@@ -171,6 +178,7 @@
171178
"SubscriberServiceAsyncClient",
172179
"SubscriberServiceClient",
173180
"Subscription",
181+
"TimeTarget",
174182
"Topic",
175183
"TopicPartitions",
176184
"TopicStatsServiceAsyncClient",

google/cloud/pubsublite/admin_client.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import Optional, List
15+
from typing import Optional, List, Union
1616

1717
from overrides import overrides
1818
from google.api_core.client_options import ClientOptions
19+
from google.api_core.operation import Operation
1920
from google.auth.credentials import Credentials
2021
from google.protobuf.field_mask_pb2 import FieldMask
2122

@@ -31,6 +32,8 @@
3132
LocationPath,
3233
TopicPath,
3334
BacklogLocation,
35+
PublishTime,
36+
EventTime,
3437
)
3538
from google.cloud.pubsublite.types.paths import ReservationPath
3639
from google.cloud.pubsublite_v1 import (
@@ -129,6 +132,14 @@ def update_subscription(
129132
) -> Subscription:
130133
return self._impl.update_subscription(subscription, update_mask)
131134

135+
@overrides
136+
def seek_subscription(
137+
self,
138+
subscription_path: SubscriptionPath,
139+
target: Union[BacklogLocation, PublishTime, EventTime],
140+
) -> Operation:
141+
return self._impl.seek_subscription(subscription_path, target)
142+
132143
@overrides
133144
def delete_subscription(self, subscription_path: SubscriptionPath):
134145
return self._impl.delete_subscription(subscription_path)

google/cloud/pubsublite/admin_client_interface.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,17 @@
1313
# limitations under the License.
1414

1515
from abc import ABC, abstractmethod
16-
from typing import List
16+
from typing import List, Union
1717

18+
from google.api_core.operation import Operation
1819
from google.cloud.pubsublite.types import (
1920
CloudRegion,
2021
TopicPath,
2122
LocationPath,
2223
SubscriptionPath,
2324
BacklogLocation,
25+
PublishTime,
26+
EventTime,
2427
)
2528
from google.cloud.pubsublite.types.paths import ReservationPath
2629
from google.cloud.pubsublite_v1 import Topic, Subscription, Reservation
@@ -88,6 +91,24 @@ def update_subscription(
8891
) -> Subscription:
8992
"""Update the masked fields of the provided subscription."""
9093

94+
@abstractmethod
95+
def seek_subscription(
96+
self,
97+
subscription_path: SubscriptionPath,
98+
target: Union[BacklogLocation, PublishTime, EventTime],
99+
) -> Operation:
100+
"""Initiate an out-of-band seek for a subscription to a specified target.
101+
102+
The seek target may be timestamps or named positions within the message
103+
backlog See https://cloud.google.com/pubsub/lite/docs/seek for more
104+
information.
105+
106+
Returns:
107+
google.api_core.operation.Operation with:
108+
result type: google.cloud.pubsublite.SeekSubscriptionResponse
109+
metadata type: google.cloud.pubsublite.OperationMetadata
110+
"""
111+
91112
@abstractmethod
92113
def delete_subscription(self, subscription_path: SubscriptionPath):
93114
"""Delete a subscription and all associated messages."""

google/cloud/pubsublite/internal/wire/admin_client_impl.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import List
15+
from typing import List, Union
1616

17+
from google.api_core.exceptions import InvalidArgument
18+
from google.api_core.operation import Operation
1719
from google.protobuf.field_mask_pb2 import FieldMask
1820

1921
from google.cloud.pubsublite.admin_client_interface import AdminClientInterface
@@ -23,6 +25,8 @@
2325
LocationPath,
2426
TopicPath,
2527
BacklogLocation,
28+
PublishTime,
29+
EventTime,
2630
)
2731
from google.cloud.pubsublite.types.paths import ReservationPath
2832
from google.cloud.pubsublite_v1 import (
@@ -31,6 +35,8 @@
3135
AdminServiceClient,
3236
TopicPartitions,
3337
Reservation,
38+
TimeTarget,
39+
SeekSubscriptionRequest,
3440
)
3541

3642

@@ -105,6 +111,25 @@ def update_subscription(
105111
subscription=subscription, update_mask=update_mask
106112
)
107113

114+
def seek_subscription(
115+
self,
116+
subscription_path: SubscriptionPath,
117+
target: Union[BacklogLocation, PublishTime, EventTime],
118+
) -> Operation:
119+
request = SeekSubscriptionRequest(name=str(subscription_path))
120+
if isinstance(target, PublishTime):
121+
request.time_target = TimeTarget(publish_time=target.value)
122+
elif isinstance(target, EventTime):
123+
request.time_target = TimeTarget(event_time=target.value)
124+
elif isinstance(target, BacklogLocation):
125+
if target == BacklogLocation.END:
126+
request.named_target = SeekSubscriptionRequest.NamedTarget.HEAD
127+
else:
128+
request.named_target = SeekSubscriptionRequest.NamedTarget.TAIL
129+
else:
130+
raise InvalidArgument("A valid seek target must be specified.")
131+
return self._underlying.seek_subscription(request=request)
132+
108133
def delete_subscription(self, subscription_path: SubscriptionPath):
109134
self._underlying.delete_subscription(name=str(subscription_path))
110135

google/cloud/pubsublite/types/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from .paths import LocationPath, TopicPath, SubscriptionPath
1919
from .message_metadata import MessageMetadata
2020
from .flow_control_settings import FlowControlSettings, DISABLED_FLOW_CONTROL
21-
from .backlog_location import BacklogLocation
21+
from .backlog_location import BacklogLocation, PublishTime, EventTime
2222

2323
__all__ = (
2424
"CloudRegion",
@@ -30,4 +30,6 @@
3030
"SubscriptionPath",
3131
"TopicPath",
3232
"BacklogLocation",
33+
"PublishTime",
34+
"EventTime",
3335
)

google/cloud/pubsublite/types/backlog_location.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# limitations under the License.
1414

1515
import enum
16+
from datetime import datetime
17+
from typing import NamedTuple
1618

1719

1820
class BacklogLocation(enum.Enum):
@@ -22,3 +24,15 @@ class BacklogLocation(enum.Enum):
2224

2325
BEGINNING = 0
2426
END = 1
27+
28+
29+
class PublishTime(NamedTuple):
30+
"""The publish timestamp of a message."""
31+
32+
value: datetime
33+
34+
35+
class EventTime(NamedTuple):
36+
"""A user-defined event timestamp of a message."""
37+
38+
value: datetime

samples/snippets/quickstart_test.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from google.api_core.exceptions import NotFound
2323
from google.cloud.pubsublite import AdminClient
2424
from google.cloud.pubsublite.types import (
25+
BacklogLocation,
2526
CloudRegion,
2627
CloudZone,
2728
SubscriptionPath,
@@ -259,6 +260,16 @@ def test_subscriber_example(topic_path, subscription_path, capsys):
259260
assert f"Received {message}" in out
260261

261262

263+
def test_seek_lite_subscription_example(capsys):
264+
import seek_lite_subscription_example
265+
266+
seek_lite_subscription_example.seek_lite_subscription(
267+
PROJECT_NUMBER, CLOUD_REGION, ZONE_ID, SUBSCRIPTION_ID, BacklogLocation.BEGINNING, False
268+
)
269+
out, _ = capsys.readouterr()
270+
assert "Seek operation" in out
271+
272+
262273
def test_delete_lite_subscription_example(subscription_path, capsys):
263274
import delete_lite_subscription_example
264275

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2021 Google Inc. All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
"""This application demonstrates how to initiate an out-of-band seek for a
18+
subscription with the Pub/Sub Lite API. For more information, see the
19+
documentation at https://cloud.google.com/pubsub/lite/docs/seek.
20+
"""
21+
22+
import argparse
23+
24+
25+
def seek_lite_subscription(project_number, cloud_region, zone_id, subscription_id, seek_target, wait_for_operation):
26+
# [START pubsublite_seek_subscription]
27+
from google.api_core.exceptions import NotFound
28+
from google.cloud.pubsublite import AdminClient
29+
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath
30+
31+
# TODO(developer):
32+
# project_number = 1122334455
33+
# cloud_region = "us-central1"
34+
# zone_id = "a"
35+
# subscription_id = "your-subscription-id"
36+
# seek_target = BacklogLocation.BEGINNING
37+
# wait_for_operation = False
38+
39+
# Possible values for seek_target:
40+
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
41+
# messages.
42+
# - BacklogLocation.END: skips past all current published messages.
43+
# - PublishTime(<datetime>): delivers messages with publish time greater
44+
# than or equal to the specified timestamp.
45+
# - EventTime(<datetime>): seeks to the first message with event time
46+
# greater than or equal to the specified timestamp.
47+
48+
# Waiting for the seek operation to complete is optional. It indicates when
49+
# subscribers for all partitions are receiving messages from the seek
50+
# target. If subscribers are offline, the operation will complete once they
51+
# are online.
52+
53+
cloud_region = CloudRegion(cloud_region)
54+
location = CloudZone(cloud_region, zone_id)
55+
subscription_path = SubscriptionPath(project_number, location, subscription_id)
56+
57+
client = AdminClient(cloud_region)
58+
try:
59+
# Initiate an out-of-band seek for a subscription to the specified
60+
# target. If an operation is returned, the seek has been successfully
61+
# registered and will eventually propagate to subscribers.
62+
seek_operation = client.seek_subscription(subscription_path, seek_target)
63+
print(f"Seek operation: {seek_operation.operation.name}")
64+
except NotFound:
65+
print(f"{subscription_path} not found.")
66+
return
67+
68+
if wait_for_operation:
69+
print("Waiting for operation to complete...")
70+
seek_operation.result()
71+
print(f"Operation completed. Metadata:\n{seek_operation.metadata}")
72+
# [END pubsublite_seek_subscription]
73+
74+
75+
if __name__ == "__main__":
76+
from datetime import datetime
77+
from google.cloud.pubsublite.types import BacklogLocation, PublishTime
78+
79+
parser = argparse.ArgumentParser(
80+
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter,
81+
)
82+
parser.add_argument("project_number", help="Your Google Cloud Project Number")
83+
parser.add_argument("cloud_region", help="Your Cloud Region, e.g. 'us-central1'")
84+
parser.add_argument("zone_id", help="Your Zone ID, e.g. 'a'")
85+
parser.add_argument("subscription_id", help="Your subscription ID")
86+
parser.add_argument("--target", default="BEGINNING", help="Seek target, e.g. 'BEGINNING, 'END' or a timestamp")
87+
parser.add_argument("--wait_for_operation", help="Wait for the seek operation to complete")
88+
89+
args = parser.parse_args()
90+
91+
if args.target == "BEGINNING":
92+
seek_target = BacklogLocation.BEGINNING
93+
elif args.target == "END":
94+
seek_target = BacklogLocation.END
95+
else:
96+
seek_target = PublishTime(datetime.strptime(args.target, "%Y-%m-%d %H:%M:%S"))
97+
98+
seek_lite_subscription(
99+
args.project_number, args.cloud_region, args.zone_id,
100+
args.subscription_id, seek_target, args.wait_for_operation
101+
)

0 commit comments

Comments
 (0)