2727
2828from google .cloud .bigtable .data ._cross_sync import CrossSync
2929
30- from . import TEST_FAMILY , TEST_FAMILY_2
30+ from . import TEST_FAMILY , TEST_FAMILY_2 , TEST_AGGREGATE_FAMILY
3131
3232
3333__CROSS_SYNC_OUTPUT__ = "tests.system.data.test_system_autogen"
@@ -76,6 +76,27 @@ async def add_row(
7676 await self .target .client ._gapic_client .mutate_row (request )
7777 self .rows .append (row_key )
7878
79+ @CrossSync .convert
80+ async def add_aggregate_row (
81+ self , row_key , * , family = TEST_AGGREGATE_FAMILY , qualifier = b"q" , input = 0
82+ ):
83+ request = {
84+ "table_name" : self .target .table_name ,
85+ "row_key" : row_key ,
86+ "mutations" : [
87+ {
88+ "add_to_cell" : {
89+ "family_name" : family ,
90+ "column_qualifier" : {"raw_value" : qualifier },
91+ "timestamp" : {"raw_timestamp_micros" : 0 },
92+ "input" : {"int_value" : input },
93+ }
94+ }
95+ ],
96+ }
97+ await self .target .client ._gapic_client .mutate_row (request )
98+ self .rows .append (row_key )
99+
79100 @CrossSync .convert
80101 async def delete_rows (self ):
81102 if self .rows :
@@ -132,7 +153,17 @@ def column_family_config(self):
132153 """
133154 from google .cloud .bigtable_admin_v2 import types
134155
135- return {TEST_FAMILY : types .ColumnFamily (), TEST_FAMILY_2 : types .ColumnFamily ()}
156+ int_aggregate_type = types .Type .Aggregate (
157+ input_type = types .Type (int64_type = {"encoding" : {"big_endian_bytes" : {}}}),
158+ sum = {},
159+ )
160+ return {
161+ TEST_FAMILY : types .ColumnFamily (),
162+ TEST_FAMILY_2 : types .ColumnFamily (),
163+ TEST_AGGREGATE_FAMILY : types .ColumnFamily (
164+ value_type = types .Type (aggregate_type = int_aggregate_type )
165+ ),
166+ }
136167
137168 @pytest .fixture (scope = "session" )
138169 def init_table_id (self ):
@@ -281,6 +312,37 @@ async def test_mutation_set_cell(self, target, temp_rows):
281312 # ensure cell is updated
282313 assert (await self ._retrieve_cell_value (target , row_key )) == new_value
283314
315+ @CrossSync .pytest
316+ @pytest .mark .usefixtures ("target" )
317+ @CrossSync .Retry (
318+ predicate = retry .if_exception_type (ClientError ), initial = 1 , maximum = 5
319+ )
320+ async def test_mutation_add_to_cell (self , target , temp_rows ):
321+ """
322+ Test add to cell mutation
323+ """
324+ from google .cloud .bigtable .data .mutations import AddToCell
325+
326+ row_key = b"add_to_cell"
327+ family = TEST_AGGREGATE_FAMILY
328+ qualifier = b"test-qualifier"
329+ # add row to temp_rows, for future deletion
330+ await temp_rows .add_aggregate_row (row_key , family = family , qualifier = qualifier )
331+ # set and check cell value
332+ await target .mutate_row (
333+ row_key , AddToCell (family , qualifier , 1 , timestamp_micros = 0 )
334+ )
335+ encoded_result = await self ._retrieve_cell_value (target , row_key )
336+ int_result = int .from_bytes (encoded_result , byteorder = "big" )
337+ assert int_result == 1
338+ # update again
339+ await target .mutate_row (
340+ row_key , AddToCell (family , qualifier , 9 , timestamp_micros = 0 )
341+ )
342+ encoded_result = await self ._retrieve_cell_value (target , row_key )
343+ int_result = int .from_bytes (encoded_result , byteorder = "big" )
344+ assert int_result == 10
345+
284346 @pytest .mark .skipif (
285347 bool (os .environ .get (BIGTABLE_EMULATOR )), reason = "emulator doesn't use splits"
286348 )
@@ -1123,7 +1185,7 @@ async def test_execute_query_simple(self, client, table_id, instance_id):
11231185 predicate = retry .if_exception_type (ClientError ), initial = 1 , maximum = 5
11241186 )
11251187 async def test_execute_against_target (
1126- self , client , instance_id , table_id , temp_rows
1188+ self , client , instance_id , table_id , temp_rows , column_family_config
11271189 ):
11281190 await temp_rows .add_row (b"row_key_1" )
11291191 result = await client .execute_query (
@@ -1138,14 +1200,19 @@ async def test_execute_against_target(
11381200 assert family_map [b"q" ] == b"test-value"
11391201 assert len (rows [0 ][TEST_FAMILY_2 ]) == 0
11401202 md = result .metadata
1141- assert len (md ) == 3
1203+ # we expect it to fetch each column family, plus _key
1204+ # add additional families here if column_family_config changes
1205+ assert len (md ) == len (column_family_config ) + 1
11421206 assert md ["_key" ].column_type == SqlType .Bytes ()
11431207 assert md [TEST_FAMILY ].column_type == SqlType .Map (
11441208 SqlType .Bytes (), SqlType .Bytes ()
11451209 )
11461210 assert md [TEST_FAMILY_2 ].column_type == SqlType .Map (
11471211 SqlType .Bytes (), SqlType .Bytes ()
11481212 )
1213+ assert md [TEST_AGGREGATE_FAMILY ].column_type == SqlType .Map (
1214+ SqlType .Bytes (), SqlType .Int64 ()
1215+ )
11491216
11501217 @pytest .mark .skipif (
11511218 bool (os .environ .get (BIGTABLE_EMULATOR )),
@@ -1248,7 +1315,7 @@ async def test_execute_query_params(self, client, table_id, instance_id):
12481315 predicate = retry .if_exception_type (ClientError ), initial = 1 , maximum = 5
12491316 )
12501317 async def test_execute_metadata_on_empty_response (
1251- self , client , instance_id , table_id , temp_rows
1318+ self , client , instance_id , table_id , temp_rows , column_family_config
12521319 ):
12531320 await temp_rows .add_row (b"row_key_1" )
12541321 result = await client .execute_query (
@@ -1258,11 +1325,16 @@ async def test_execute_metadata_on_empty_response(
12581325
12591326 assert len (rows ) == 0
12601327 md = result .metadata
1261- assert len (md ) == 3
1328+ # we expect it to fetch each column family, plus _key
1329+ # add additional families here if column_family_config change
1330+ assert len (md ) == len (column_family_config ) + 1
12621331 assert md ["_key" ].column_type == SqlType .Bytes ()
12631332 assert md [TEST_FAMILY ].column_type == SqlType .Map (
12641333 SqlType .Bytes (), SqlType .Bytes ()
12651334 )
12661335 assert md [TEST_FAMILY_2 ].column_type == SqlType .Map (
12671336 SqlType .Bytes (), SqlType .Bytes ()
12681337 )
1338+ assert md [TEST_AGGREGATE_FAMILY ].column_type == SqlType .Map (
1339+ SqlType .Bytes (), SqlType .Int64 ()
1340+ )
0 commit comments