Skip to content

Commit b9ac0d5

Browse files
committed
Reapply "fix(core): Make log_adapter state thread-local"
This reverts commit cf34585.
1 parent cf34585 commit b9ac0d5

File tree

2 files changed

+47
-26
lines changed

2 files changed

+47
-26
lines changed

bigframes/core/log_adapter.py

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
import functools
1616
import inspect
1717
import threading
18-
from typing import List, Optional
18+
from typing import Optional
1919

2020
from google.cloud import bigquery
2121
import pandas
2222

23-
_lock = threading.Lock()
23+
_thread_local_data = threading.local()
24+
_thread_local_data._api_methods = []
25+
_thread_local_data._call_stack = []
2426

2527
# The limit is 64 (https://cloud.google.com/bigquery/docs/labels-intro#requirements),
2628
# but leave a few spare for internal labels to be added.
@@ -30,12 +32,8 @@
3032
PANDAS_PARAM_TRACKING_TASK = "pandas_param_tracking"
3133
LOG_OVERRIDE_NAME = "__log_override_name__"
3234

33-
_api_methods: List = []
3435
_excluded_methods = ["__setattr__", "__getattr__"]
3536

36-
# Stack to track method calls
37-
_call_stack: List = []
38-
3937

4038
def submit_pandas_labels(
4139
bq_client: Optional[bigquery.Client],
@@ -172,11 +170,14 @@ def wrapper(*args, **kwargs):
172170
base_name = custom_base_name
173171

174172
full_method_name = f"{base_name.lower()}-{api_method_name}"
173+
if not hasattr(_thread_local_data, "_call_stack"):
174+
_thread_local_data._call_stack = []
175+
175176
# Track directly called methods
176-
if len(_call_stack) == 0:
177+
if len(_thread_local_data._call_stack) == 0:
177178
add_api_method(full_method_name)
178179

179-
_call_stack.append(full_method_name)
180+
_thread_local_data._call_stack.append(full_method_name)
180181

181182
try:
182183
return method(*args, **kwargs)
@@ -185,7 +186,7 @@ def wrapper(*args, **kwargs):
185186
# or not fully supported (NotImplementedError) in BigFrames.
186187
# Logging is currently supported only when we can access the bqclient through
187188
# _block.session.bqclient.
188-
if len(_call_stack) == 1:
189+
if len(_thread_local_data._call_stack) == 1:
189190
submit_pandas_labels(
190191
_get_bq_client(*args, **kwargs),
191192
base_name,
@@ -196,7 +197,7 @@ def wrapper(*args, **kwargs):
196197
)
197198
raise e
198199
finally:
199-
_call_stack.pop()
200+
_thread_local_data._call_stack.pop()
200201

201202
return wrapper
202203

@@ -214,19 +215,21 @@ def property_logger(prop):
214215
def shared_wrapper(prop):
215216
@functools.wraps(prop)
216217
def wrapped(*args, **kwargs):
218+
if not hasattr(_thread_local_data, "_call_stack"):
219+
_thread_local_data._call_stack = []
217220
qualname_parts = getattr(prop, "__qualname__", prop.__name__).split(".")
218221
class_name = qualname_parts[-2] if len(qualname_parts) > 1 else ""
219222
property_name = prop.__name__
220223
full_property_name = f"{class_name.lower()}-{property_name.lower()}"
221224

222-
if len(_call_stack) == 0:
225+
if len(_thread_local_data._call_stack) == 0:
223226
add_api_method(full_property_name)
224227

225-
_call_stack.append(full_property_name)
228+
_thread_local_data._call_stack.append(full_property_name)
226229
try:
227230
return prop(*args, **kwargs)
228231
finally:
229-
_call_stack.pop()
232+
_thread_local_data._call_stack.pop()
230233

231234
return wrapped
232235

@@ -251,23 +254,26 @@ def wrapper(func):
251254

252255

253256
def add_api_method(api_method_name):
254-
global _lock
255-
global _api_methods
256-
with _lock:
257-
# Push the method to the front of the _api_methods list
258-
_api_methods.insert(0, api_method_name.replace("<", "").replace(">", ""))
259-
# Keep the list length within the maximum limit (adjust MAX_LABELS_COUNT as needed)
260-
_api_methods = _api_methods[:MAX_LABELS_COUNT]
257+
if not hasattr(_thread_local_data, "_api_methods"):
258+
_thread_local_data._api_methods = []
259+
260+
# Push the method to the front of the _api_methods list
261+
_thread_local_data._api_methods.insert(
262+
0, api_method_name.replace("<", "").replace(">", "")
263+
)
264+
# Keep the list length within the maximum limit
265+
_thread_local_data._api_methods = _thread_local_data._api_methods[:MAX_LABELS_COUNT]
261266

262267

263268
def get_and_reset_api_methods(dry_run: bool = False):
264-
global _lock
265-
with _lock:
266-
previous_api_methods = list(_api_methods)
269+
if not hasattr(_thread_local_data, "_api_methods"):
270+
_thread_local_data._api_methods = []
271+
272+
previous_api_methods = list(_thread_local_data._api_methods)
267273

268-
# dry_run might not make a job resource, so only reset the log on real queries.
269-
if not dry_run:
270-
_api_methods.clear()
274+
# dry_run might not make a job resource, so only reset the log on real queries.
275+
if not dry_run:
276+
_thread_local_data._api_methods.clear()
271277
return previous_api_methods
272278

273279

bigframes/testing/mocks.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727
import bigframes
2828
import bigframes.clients
2929
import bigframes.core.global_session
30+
import bigframes.core.log_adapter
3031
import bigframes.dataframe
32+
import bigframes.session._io.bigquery
33+
from bigframes.session._io.bigquery import create_job_configs_labels
3134
import bigframes.session.clients
3235

3336
"""Utilities for creating test resources."""
@@ -90,6 +93,18 @@ def query_mock(
9093
job_config: Optional[google.cloud.bigquery.QueryJobConfig] = None,
9194
**kwargs,
9295
):
96+
job_config = (
97+
job_config
98+
if job_config is not None
99+
else google.cloud.bigquery.QueryJobConfig()
100+
)
101+
api_methods = bigframes.core.log_adapter.get_and_reset_api_methods(
102+
dry_run=job_config.dry_run
103+
)
104+
job_config.labels = create_job_configs_labels(
105+
job_configs_labels=job_config.labels,
106+
api_methods=api_methods,
107+
)
93108
queries.append(query)
94109
job_configs.append(copy.deepcopy(job_config))
95110
query_job = mock.create_autospec(google.cloud.bigquery.QueryJob, instance=True)

0 commit comments

Comments
 (0)