Skip to content

Commit 6fa03be

Browse files
feat: add BigQuery configuration for subscriptions (#685)
* feat: add BigQuery configuration for subscriptions PiperOrigin-RevId: 449031535 Source-Link: googleapis/googleapis@feec34d Source-Link: googleapis/googleapis-gen@89664e9 Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiODk2NjRlOTcwOGMxOWQ1MzJjNjNmN2ExNmZkNzljYjYzMWQ4N2FhMSJ9 * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 8513f53 commit 6fa03be

File tree

8 files changed

+117
-8
lines changed

8 files changed

+117
-8
lines changed

google/pubsub/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from google.pubsub_v1.services.subscriber.async_client import SubscriberAsyncClient
2525

2626
from google.pubsub_v1.types.pubsub import AcknowledgeRequest
27+
from google.pubsub_v1.types.pubsub import BigQueryConfig
2728
from google.pubsub_v1.types.pubsub import CreateSnapshotRequest
2829
from google.pubsub_v1.types.pubsub import DeadLetterPolicy
2930
from google.pubsub_v1.types.pubsub import DeleteSnapshotRequest
@@ -88,6 +89,7 @@
8889
"SubscriberClient",
8990
"SubscriberAsyncClient",
9091
"AcknowledgeRequest",
92+
"BigQueryConfig",
9193
"CreateSnapshotRequest",
9294
"DeadLetterPolicy",
9395
"DeleteSnapshotRequest",

google/pubsub_v1/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from .services.subscriber import SubscriberAsyncClient
2323

2424
from .types.pubsub import AcknowledgeRequest
25+
from .types.pubsub import BigQueryConfig
2526
from .types.pubsub import CreateSnapshotRequest
2627
from .types.pubsub import DeadLetterPolicy
2728
from .types.pubsub import DeleteSnapshotRequest
@@ -83,6 +84,7 @@
8384
"SchemaServiceAsyncClient",
8485
"SubscriberAsyncClient",
8586
"AcknowledgeRequest",
87+
"BigQueryConfig",
8688
"CreateSchemaRequest",
8789
"CreateSnapshotRequest",
8890
"DeadLetterPolicy",

google/pubsub_v1/services/subscriber/async_client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,9 @@ async def sample_create_subscription():
295295
should not be set.
296296
push_config (:class:`google.pubsub_v1.types.PushConfig`):
297297
If push delivery is used with this subscription, this
298-
field is used to configure it. An empty ``pushConfig``
299-
signifies that the subscriber will pull and ack messages
298+
field is used to configure it. Either ``pushConfig`` or
299+
``bigQueryConfig`` can be set, but not both. If both are
300+
empty, then the subscriber will pull and ack messages
300301
using API methods.
301302
302303
This corresponds to the ``push_config`` field

google/pubsub_v1/services/subscriber/client.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -578,8 +578,9 @@ def sample_create_subscription():
578578
should not be set.
579579
push_config (google.pubsub_v1.types.PushConfig):
580580
If push delivery is used with this subscription, this
581-
field is used to configure it. An empty ``pushConfig``
582-
signifies that the subscriber will pull and ack messages
581+
field is used to configure it. Either ``pushConfig`` or
582+
``bigQueryConfig`` can be set, but not both. If both are
583+
empty, then the subscriber will pull and ack messages
583584
using API methods.
584585
585586
This corresponds to the ``push_config`` field

google/pubsub_v1/types/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from .pubsub import (
1919
AcknowledgeRequest,
20+
BigQueryConfig,
2021
CreateSnapshotRequest,
2122
DeadLetterPolicy,
2223
DeleteSnapshotRequest,
@@ -87,6 +88,7 @@
8788
__all__ = (
8889
"TimeoutType",
8990
"AcknowledgeRequest",
91+
"BigQueryConfig",
9092
"CreateSnapshotRequest",
9193
"DeadLetterPolicy",
9294
"DeleteSnapshotRequest",

google/pubsub_v1/types/pubsub.py

Lines changed: 92 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
"DeadLetterPolicy",
4747
"ExpirationPolicy",
4848
"PushConfig",
49+
"BigQueryConfig",
4950
"ReceivedMessage",
5051
"GetSubscriptionRequest",
5152
"UpdateSubscriptionRequest",
@@ -581,9 +582,16 @@ class Subscription(proto.Message):
581582
deleted.
582583
push_config (google.pubsub_v1.types.PushConfig):
583584
If push delivery is used with this subscription, this field
584-
is used to configure it. An empty ``pushConfig`` signifies
585-
that the subscriber will pull and ack messages using API
586-
methods.
585+
is used to configure it. Either ``pushConfig`` or
586+
``bigQueryConfig`` can be set, but not both. If both are
587+
empty, then the subscriber will pull and ack messages using
588+
API methods.
589+
bigquery_config (google.pubsub_v1.types.BigQueryConfig):
590+
If delivery to BigQuery is used with this subscription, this
591+
field is used to configure it. Either ``pushConfig`` or
592+
``bigQueryConfig`` can be set, but not both. If both are
593+
empty, then the subscriber will pull and ack messages using
594+
API methods.
587595
ack_deadline_seconds (int):
588596
The approximate amount of time (on a best-effort basis)
589597
Pub/Sub waits for the subscriber to acknowledge receipt
@@ -700,8 +708,18 @@ class Subscription(proto.Message):
700708
subscribers. See the ``message_retention_duration`` field in
701709
``Topic``. This field is set only in responses from the
702710
server; it is ignored if it is set in any requests.
711+
state (google.pubsub_v1.types.Subscription.State):
712+
Output only. An output-only field indicating
713+
whether or not the subscription can receive
714+
messages.
703715
"""
704716

717+
class State(proto.Enum):
718+
r"""Possible states for a subscription."""
719+
STATE_UNSPECIFIED = 0
720+
ACTIVE = 1
721+
RESOURCE_ERROR = 2
722+
705723
name = proto.Field(
706724
proto.STRING,
707725
number=1,
@@ -715,6 +733,11 @@ class Subscription(proto.Message):
715733
number=4,
716734
message="PushConfig",
717735
)
736+
bigquery_config = proto.Field(
737+
proto.MESSAGE,
738+
number=18,
739+
message="BigQueryConfig",
740+
)
718741
ack_deadline_seconds = proto.Field(
719742
proto.INT32,
720743
number=5,
@@ -769,6 +792,11 @@ class Subscription(proto.Message):
769792
number=17,
770793
message=duration_pb2.Duration,
771794
)
795+
state = proto.Field(
796+
proto.ENUM,
797+
number=19,
798+
enum=State,
799+
)
772800

773801

774802
class RetryPolicy(proto.Message):
@@ -980,6 +1008,67 @@ class OidcToken(proto.Message):
9801008
)
9811009

9821010

1011+
class BigQueryConfig(proto.Message):
1012+
r"""Configuration for a BigQuery subscription.
1013+
1014+
Attributes:
1015+
table (str):
1016+
The name of the table to which to write data,
1017+
of the form {projectId}:{datasetId}.{tableId}
1018+
use_topic_schema (bool):
1019+
When true, use the topic's schema as the
1020+
columns to write to in BigQuery, if it exists.
1021+
write_metadata (bool):
1022+
When true, write the subscription name, message_id,
1023+
publish_time, attributes, and ordering_key to additional
1024+
columns in the table. The subscription name, message_id, and
1025+
publish_time fields are put in their own columns while all
1026+
other message properties (other than data) are written to a
1027+
JSON object in the attributes column.
1028+
drop_unknown_fields (bool):
1029+
When true and use_topic_schema is true, any fields that are
1030+
a part of the topic schema that are not part of the BigQuery
1031+
table schema are dropped when writing to BigQuery.
1032+
Otherwise, the schemas must be kept in sync and any messages
1033+
with extra fields are not written and remain in the
1034+
subscription's backlog.
1035+
state (google.pubsub_v1.types.BigQueryConfig.State):
1036+
Output only. An output-only field that
1037+
indicates whether or not the subscription can
1038+
receive messages.
1039+
"""
1040+
1041+
class State(proto.Enum):
1042+
r"""Possible states for a BigQuery subscription."""
1043+
STATE_UNSPECIFIED = 0
1044+
ACTIVE = 1
1045+
PERMISSION_DENIED = 2
1046+
NOT_FOUND = 3
1047+
SCHEMA_MISMATCH = 4
1048+
1049+
table = proto.Field(
1050+
proto.STRING,
1051+
number=1,
1052+
)
1053+
use_topic_schema = proto.Field(
1054+
proto.BOOL,
1055+
number=2,
1056+
)
1057+
write_metadata = proto.Field(
1058+
proto.BOOL,
1059+
number=3,
1060+
)
1061+
drop_unknown_fields = proto.Field(
1062+
proto.BOOL,
1063+
number=4,
1064+
)
1065+
state = proto.Field(
1066+
proto.ENUM,
1067+
number=5,
1068+
enum=State,
1069+
)
1070+
1071+
9831072
class ReceivedMessage(proto.Message):
9841073
r"""A message and its corresponding acknowledgment ID.
9851074

scripts/fixup_pubsub_v1_keywords.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class pubsubCallTransformer(cst.CSTTransformer):
4242
'acknowledge': ('subscription', 'ack_ids', ),
4343
'create_schema': ('parent', 'schema', 'schema_id', ),
4444
'create_snapshot': ('name', 'subscription', 'labels', ),
45-
'create_subscription': ('name', 'topic', 'push_config', 'ack_deadline_seconds', 'retain_acked_messages', 'message_retention_duration', 'labels', 'enable_message_ordering', 'expiration_policy', 'filter', 'dead_letter_policy', 'retry_policy', 'detached', 'enable_exactly_once_delivery', 'topic_message_retention_duration', ),
45+
'create_subscription': ('name', 'topic', 'push_config', 'bigquery_config', 'ack_deadline_seconds', 'retain_acked_messages', 'message_retention_duration', 'labels', 'enable_message_ordering', 'expiration_policy', 'filter', 'dead_letter_policy', 'retry_policy', 'detached', 'enable_exactly_once_delivery', 'topic_message_retention_duration', 'state', ),
4646
'create_topic': ('name', 'labels', 'message_storage_policy', 'kms_key_name', 'schema_settings', 'satisfies_pzs', 'message_retention_duration', ),
4747
'delete_schema': ('name', ),
4848
'delete_snapshot': ('snapshot', ),

tests/unit/gapic/pubsub_v1/test_subscriber.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,7 @@ def test_create_subscription(request_type, transport: str = "grpc"):
665665
filter="filter_value",
666666
detached=True,
667667
enable_exactly_once_delivery=True,
668+
state=pubsub.Subscription.State.ACTIVE,
668669
)
669670
response = client.create_subscription(request)
670671

@@ -683,6 +684,7 @@ def test_create_subscription(request_type, transport: str = "grpc"):
683684
assert response.filter == "filter_value"
684685
assert response.detached is True
685686
assert response.enable_exactly_once_delivery is True
687+
assert response.state == pubsub.Subscription.State.ACTIVE
686688

687689

688690
def test_create_subscription_empty_call():
@@ -731,6 +733,7 @@ async def test_create_subscription_async(
731733
filter="filter_value",
732734
detached=True,
733735
enable_exactly_once_delivery=True,
736+
state=pubsub.Subscription.State.ACTIVE,
734737
)
735738
)
736739
response = await client.create_subscription(request)
@@ -750,6 +753,7 @@ async def test_create_subscription_async(
750753
assert response.filter == "filter_value"
751754
assert response.detached is True
752755
assert response.enable_exactly_once_delivery is True
756+
assert response.state == pubsub.Subscription.State.ACTIVE
753757

754758

755759
@pytest.mark.asyncio
@@ -963,6 +967,7 @@ def test_get_subscription(request_type, transport: str = "grpc"):
963967
filter="filter_value",
964968
detached=True,
965969
enable_exactly_once_delivery=True,
970+
state=pubsub.Subscription.State.ACTIVE,
966971
)
967972
response = client.get_subscription(request)
968973

@@ -981,6 +986,7 @@ def test_get_subscription(request_type, transport: str = "grpc"):
981986
assert response.filter == "filter_value"
982987
assert response.detached is True
983988
assert response.enable_exactly_once_delivery is True
989+
assert response.state == pubsub.Subscription.State.ACTIVE
984990

985991

986992
def test_get_subscription_empty_call():
@@ -1025,6 +1031,7 @@ async def test_get_subscription_async(
10251031
filter="filter_value",
10261032
detached=True,
10271033
enable_exactly_once_delivery=True,
1034+
state=pubsub.Subscription.State.ACTIVE,
10281035
)
10291036
)
10301037
response = await client.get_subscription(request)
@@ -1044,6 +1051,7 @@ async def test_get_subscription_async(
10441051
assert response.filter == "filter_value"
10451052
assert response.detached is True
10461053
assert response.enable_exactly_once_delivery is True
1054+
assert response.state == pubsub.Subscription.State.ACTIVE
10471055

10481056

10491057
@pytest.mark.asyncio
@@ -1221,6 +1229,7 @@ def test_update_subscription(request_type, transport: str = "grpc"):
12211229
filter="filter_value",
12221230
detached=True,
12231231
enable_exactly_once_delivery=True,
1232+
state=pubsub.Subscription.State.ACTIVE,
12241233
)
12251234
response = client.update_subscription(request)
12261235

@@ -1239,6 +1248,7 @@ def test_update_subscription(request_type, transport: str = "grpc"):
12391248
assert response.filter == "filter_value"
12401249
assert response.detached is True
12411250
assert response.enable_exactly_once_delivery is True
1251+
assert response.state == pubsub.Subscription.State.ACTIVE
12421252

12431253

12441254
def test_update_subscription_empty_call():
@@ -1287,6 +1297,7 @@ async def test_update_subscription_async(
12871297
filter="filter_value",
12881298
detached=True,
12891299
enable_exactly_once_delivery=True,
1300+
state=pubsub.Subscription.State.ACTIVE,
12901301
)
12911302
)
12921303
response = await client.update_subscription(request)
@@ -1306,6 +1317,7 @@ async def test_update_subscription_async(
13061317
assert response.filter == "filter_value"
13071318
assert response.detached is True
13081319
assert response.enable_exactly_once_delivery is True
1320+
assert response.state == pubsub.Subscription.State.ACTIVE
13091321

13101322

13111323
@pytest.mark.asyncio

0 commit comments

Comments
 (0)