Skip to content

Commit b59cc8d

Browse files
authored
docs: Add ingestion from GCS sample (#1273)
1 parent c240fde commit b59cc8d

File tree

3 files changed

+140
-1
lines changed

3 files changed

+140
-1
lines changed

samples/snippets/publisher.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,88 @@ def create_topic_with_kinesis_ingestion(
103103
# [END pubsub_create_topic_with_kinesis_ingestion]
104104

105105

106+
def create_topic_with_cloud_storage_ingestion(
107+
project_id: str,
108+
topic_id: str,
109+
bucket: str,
110+
input_format: str,
111+
text_delimiter: str,
112+
match_glob: str,
113+
minimum_object_create_time: str,
114+
) -> None:
115+
"""Create a new Pub/Sub topic with Cloud Storage Ingestion Settings."""
116+
# [START pubsub_create_topic_with_cloud_storage_ingestion]
117+
from google.cloud import pubsub_v1
118+
from google.protobuf import timestamp_pb2
119+
from google.pubsub_v1.types import Topic
120+
from google.pubsub_v1.types import IngestionDataSourceSettings
121+
122+
# TODO(developer)
123+
# project_id = "your-project-id"
124+
# topic_id = "your-topic-id"
125+
# bucket = "your-bucket"
126+
# input_format = "text" (can be one of "text", "avro", "pubsub_avro")
127+
# text_delimiter = "\n"
128+
# match_glob = "**.txt"
129+
# minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"
130+
131+
publisher = pubsub_v1.PublisherClient()
132+
topic_path = publisher.topic_path(project_id, topic_id)
133+
134+
cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
135+
bucket=bucket,
136+
)
137+
if input_format == "text":
138+
cloud_storage_settings.text_format = (
139+
IngestionDataSourceSettings.CloudStorage.TextFormat(
140+
delimiter=text_delimiter
141+
)
142+
)
143+
elif input_format == "avro":
144+
cloud_storage_settings.avro_format = (
145+
IngestionDataSourceSettings.CloudStorage.AvroFormat()
146+
)
147+
elif input_format == "pubsub_avro":
148+
cloud_storage_settings.pubsub_avro_format = (
149+
IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
150+
)
151+
else:
152+
print(
153+
"Invalid input_format: "
154+
+ input_format
155+
+ "; must be in ('text', 'avro', 'pubsub_avro')"
156+
)
157+
return
158+
159+
if match_glob:
160+
cloud_storage_settings.match_glob = match_glob
161+
162+
if minimum_object_create_time:
163+
try:
164+
minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
165+
minimum_object_create_time_timestamp.FromJsonString(
166+
minimum_object_create_time
167+
)
168+
cloud_storage_settings.minimum_object_create_time = (
169+
minimum_object_create_time_timestamp
170+
)
171+
except ValueError:
172+
print("Invalid minimum_object_create_time: " + minimum_object_create_time)
173+
return
174+
175+
request = Topic(
176+
name=topic_path,
177+
ingestion_data_source_settings=IngestionDataSourceSettings(
178+
cloud_storage=cloud_storage_settings,
179+
),
180+
)
181+
182+
topic = publisher.create_topic(request=request)
183+
184+
print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")
185+
# [END pubsub_create_topic_with_cloud_storage_ingestion]
186+
187+
106188
def update_topic_type(
107189
project_id: str,
108190
topic_id: str,
@@ -615,6 +697,19 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
615697
create_topic_with_kinesis_ingestion_parser.add_argument("aws_role_arn")
616698
create_topic_with_kinesis_ingestion_parser.add_argument("gcp_service_account")
617699

700+
create_topic_with_cloud_storage_ingestion_parser = subparsers.add_parser(
701+
"create_cloud_storage_ingestion",
702+
help=create_topic_with_cloud_storage_ingestion.__doc__,
703+
)
704+
create_topic_with_cloud_storage_ingestion_parser.add_argument("topic_id")
705+
create_topic_with_cloud_storage_ingestion_parser.add_argument("bucket")
706+
create_topic_with_cloud_storage_ingestion_parser.add_argument("input_format")
707+
create_topic_with_cloud_storage_ingestion_parser.add_argument("text_delimiter")
708+
create_topic_with_cloud_storage_ingestion_parser.add_argument("match_glob")
709+
create_topic_with_cloud_storage_ingestion_parser.add_argument(
710+
"minimum_object_create_time"
711+
)
712+
618713
update_topic_type_parser = subparsers.add_parser(
619714
"update_kinesis_ingestion", help=update_topic_type.__doc__
620715
)
@@ -693,6 +788,16 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
693788
args.aws_role_arn,
694789
args.gcp_service_account,
695790
)
791+
elif args.command == "create_cloud_storage_ingestion":
792+
create_topic_with_cloud_storage_ingestion(
793+
args.project_id,
794+
args.topic_id,
795+
args.bucket,
796+
args.input_format,
797+
args.text_delimiter,
798+
args.match_glob,
799+
args.minimum_object_create_time,
800+
)
696801
elif args.command == "update_kinesis_ingestion":
697802
update_topic_type(
698803
args.project_id,

samples/snippets/publisher_test.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,40 @@ def test_create_topic_with_kinesis_ingestion(
162162
publisher_client.delete_topic(request={"topic": topic_path})
163163

164164

165+
def test_create_topic_with_cloud_storage_ingestion(
166+
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
167+
) -> None:
168+
# The scope of `topic_path` is limited to this function.
169+
topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID)
170+
171+
bucket = "pubsub-cloud-storage-bucket"
172+
input_format = "text"
173+
text_delimiter = ","
174+
match_glob = "**.txt"
175+
minimum_object_create_time = "1970-01-01T00:00:01Z"
176+
177+
try:
178+
publisher_client.delete_topic(request={"topic": topic_path})
179+
except NotFound:
180+
pass
181+
182+
publisher.create_topic_with_cloud_storage_ingestion(
183+
PROJECT_ID,
184+
TOPIC_ID,
185+
bucket,
186+
input_format,
187+
text_delimiter,
188+
match_glob,
189+
minimum_object_create_time,
190+
)
191+
192+
out, _ = capsys.readouterr()
193+
assert f"Created topic: {topic_path} with Cloud Storage Ingestion Settings" in out
194+
195+
# Clean up resource created for the test.
196+
publisher_client.delete_topic(request={"topic": topic_path})
197+
198+
165199
def test_update_topic_type(
166200
publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str]
167201
) -> None:

samples/snippets/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
google-cloud-pubsub==2.25.0
1+
google-cloud-pubsub==2.26.0
22
avro==1.12.0
33
protobuf===4.24.4; python_version == '3.7'
44
protobuf==5.28.0; python_version >= '3.8'

0 commit comments

Comments
 (0)