Skip to content

Commit 864c0cc

Browse files
authored
feat: Create subscriptions at a seek target (#383)
Support creating subscriptions at a nominated target location within the message backlog. A seek is performed for publish and event timestamps. Export subscriptions (pre-release feature) are also supported.
1 parent 966872f commit 864c0cc

File tree

4 files changed

+59
-9
lines changed

4 files changed

+59
-9
lines changed

google/cloud/pubsublite/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
from google.cloud.pubsublite_v1.types.admin import UpdateTopicRequest
7171
from google.cloud.pubsublite_v1.types.common import AttributeValues
7272
from google.cloud.pubsublite_v1.types.common import Cursor
73+
from google.cloud.pubsublite_v1.types.common import ExportConfig
7374
from google.cloud.pubsublite_v1.types.common import PubSubMessage
7475
from google.cloud.pubsublite_v1.types.common import Reservation
7576
from google.cloud.pubsublite_v1.types.common import SequencedMessage
@@ -131,6 +132,7 @@
131132
"CursorServiceClient",
132133
"DeleteSubscriptionRequest",
133134
"DeleteTopicRequest",
135+
"ExportConfig",
134136
"FlowControlRequest",
135137
"GetSubscriptionRequest",
136138
"GetTopicPartitionsRequest",

google/cloud/pubsublite/admin_client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,10 @@ def list_topic_subscriptions(self, topic_path: TopicPath) -> List[SubscriptionPa
114114
def create_subscription(
115115
self,
116116
subscription: Subscription,
117-
starting_offset: BacklogLocation = BacklogLocation.END,
117+
target: Union[BacklogLocation, PublishTime, EventTime] = BacklogLocation.END,
118+
starting_offset: Optional[BacklogLocation] = None,
118119
) -> Subscription:
119-
return self._impl.create_subscription(subscription, starting_offset)
120+
return self._impl.create_subscription(subscription, target, starting_offset)
120121

121122
@overrides
122123
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:

google/cloud/pubsublite/admin_client_interface.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
from abc import ABC, abstractmethod
16-
from typing import List, Union
16+
from typing import List, Optional, Union
1717

1818
from google.api_core.operation import Operation
1919
from google.cloud.pubsublite.types import (
@@ -71,11 +71,20 @@ def list_topic_subscriptions(self, topic_path: TopicPath) -> List[SubscriptionPa
7171
def create_subscription(
7272
self,
7373
subscription: Subscription,
74-
starting_offset: BacklogLocation = BacklogLocation.END,
74+
target: Union[BacklogLocation, PublishTime, EventTime] = BacklogLocation.END,
75+
starting_offset: Optional[BacklogLocation] = None,
7576
) -> Subscription:
7677
"""Create a subscription, returns the created subscription. By default
7778
a subscription will only receive messages published after the
78-
subscription was created."""
79+
subscription was created.
80+
81+
`starting_offset` is deprecated. Use `target` to initialize the
82+
subscription to a target location within the message backlog instead.
83+
`starting_offset` has higher precedence if `target` is also set.
84+
85+
A seek is initiated if the target location is a publish or event time.
86+
If the seek fails, the created subscription is not deleted.
87+
"""
7988

8089
@abstractmethod
8190
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:

google/cloud/pubsublite/internal/wire/admin_client_impl.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import List, Union
15+
import logging
16+
from typing import List, Optional, Union
1617

1718
from google.api_core.exceptions import InvalidArgument
1819
from google.api_core.operation import Operation
@@ -38,8 +39,11 @@
3839
TimeTarget,
3940
SeekSubscriptionRequest,
4041
CreateSubscriptionRequest,
42+
ExportConfig,
4143
)
4244

45+
log = logging.getLogger(__name__)
46+
4347

4448
class AdminClientImpl(AdminClientInterface):
4549
_underlying: AdminServiceClient
@@ -85,17 +89,51 @@ def list_topic_subscriptions(self, topic_path: TopicPath) -> List[SubscriptionPa
8589
def create_subscription(
8690
self,
8791
subscription: Subscription,
88-
starting_offset: BacklogLocation = BacklogLocation.END,
92+
target: Union[BacklogLocation, PublishTime, EventTime] = BacklogLocation.END,
93+
starting_offset: Optional[BacklogLocation] = None,
8994
) -> Subscription:
95+
if starting_offset:
96+
log.warning("starting_offset is deprecated. Use target instead.")
97+
target = starting_offset
9098
path = SubscriptionPath.parse(subscription.name)
91-
return self._underlying.create_subscription(
99+
requires_seek = isinstance(target, PublishTime) or isinstance(target, EventTime)
100+
requires_update = (
101+
requires_seek
102+
and subscription.export_config
103+
and subscription.export_config.desired_state == ExportConfig.State.ACTIVE
104+
)
105+
if requires_update:
106+
# Export subscriptions must be paused while seeking. The state is
107+
# later updated to active.
108+
subscription.export_config.desired_state = ExportConfig.State.PAUSED
109+
110+
# Request 1 - Create the subscription.
111+
skip_backlog = False
112+
if isinstance(target, BacklogLocation):
113+
skip_backlog = target == BacklogLocation.END
114+
response = self._underlying.create_subscription(
92115
request=CreateSubscriptionRequest(
93116
parent=str(path.to_location_path()),
94117
subscription=subscription,
95118
subscription_id=path.name,
96-
skip_backlog=(starting_offset == BacklogLocation.END),
119+
skip_backlog=skip_backlog,
97120
)
98121
)
122+
# Request 2 (optional) - seek the subscription.
123+
if requires_seek:
124+
self.seek_subscription(subscription_path=path, target=target)
125+
# Request 3 (optional) - make the export subscription active.
126+
if requires_update:
127+
response = self.update_subscription(
128+
subscription=Subscription(
129+
name=response.name,
130+
export_config=ExportConfig(
131+
desired_state=ExportConfig.State.ACTIVE,
132+
),
133+
),
134+
update_mask=FieldMask(paths=["export_config.desired_state"]),
135+
)
136+
return response
99137

100138
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:
101139
return self._underlying.get_subscription(name=str(subscription_path))

0 commit comments

Comments
 (0)