Skip to content

Commit d4dd01d

Browse files
authored
Add exposure deduping (#138)
* Add exposure deduping * feedback and cleanup
1 parent 09fe886 commit d4dd01d

File tree

4 files changed

+118
-19
lines changed

4 files changed

+118
-19
lines changed

statsig/statsig_logger.py

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import collections
22
import threading
3-
from typing import Optional
3+
from typing import Optional, Union
44

55
from .retryable_logs import RetryableLogs
66
from .evaluation_details import EvaluationDetails
77
from .evaluator import _ConfigEvaluation
88
from .statsig_event import StatsigEvent
9+
# from .statsig_user import StatsigUser
910
from .layer import Layer
1011
from .utils import logger
1112
from .thread_util import spawn_background_thread, THREAD_JOIN_TIMEOUT
@@ -14,10 +15,11 @@
1415
_LAYER_EXPOSURE_EVENT = "statsig::layer_exposure"
1516
_GATE_EXPOSURE_EVENT = "statsig::gate_exposure"
1617

18+
_IGNORED_METADATA_KEYS = {'serverTime', 'configSyncTime', 'initTime', 'reason'}
1719

1820
def _safe_add_evaluation_to_event(
19-
evaluation_details: EvaluationDetails, event: StatsigEvent):
20-
if evaluation_details is None:
21+
evaluation_details: Union[EvaluationDetails, None], event: StatsigEvent):
22+
if evaluation_details is None or event is None or event.metadata is None:
2123
return
2224

2325
event.metadata["reason"] = evaluation_details.reason
@@ -29,10 +31,12 @@ def _safe_add_evaluation_to_event(
2931
class _StatsigLogger:
3032
_background_flush: Optional[threading.Thread]
3133
_background_retry: Optional[threading.Thread]
34+
_background_deduper: Optional[threading.Thread]
3235

3336
def __init__(self, net, shutdown_event, statsig_metadata, error_boundary, options):
3437
self._events = []
3538
self._retry_logs = collections.deque(maxlen=10)
39+
self._deduper = set()
3640
self._net = net
3741
self._statsig_metadata = statsig_metadata
3842
self._local_mode = options.local_mode
@@ -44,6 +48,7 @@ def __init__(self, net, shutdown_event, statsig_metadata, error_boundary, option
4448
self._shutdown_event = shutdown_event
4549
self._background_flush = None
4650
self._background_retry = None
51+
self._background_deduper = None
4752
self.spawn_bg_threads_if_needed()
4853

4954
def spawn_bg_threads_if_needed(self):
@@ -58,6 +63,10 @@ def spawn_bg_threads_if_needed(self):
5863
self._background_retry = spawn_background_thread(
5964
self._periodic_retry, (self._shutdown_event,), self._error_boundary)
6065

66+
if self._background_deduper is None or not self._background_deduper.is_alive():
67+
self._background_deduper = spawn_background_thread(
68+
self._periodic_dedupe_clear, (self._shutdown_event,), self._error_boundary)
69+
6170
def log(self, event):
6271
if self._local_mode:
6372
return
@@ -73,6 +82,9 @@ def log_gate_exposure(self, user, gate, value, rule_id, secondary_exposures,
7382
"gateValue": "true" if value else "false",
7483
"ruleID": rule_id,
7584
}
85+
if not self._is_unique_exposure(user, _GATE_EXPOSURE_EVENT, event.metadata):
86+
return
87+
7688
if is_manual_exposure:
7789
event.metadata["isManualExposure"] = "true"
7890

@@ -90,6 +102,8 @@ def log_config_exposure(self, user, config, rule_id, secondary_exposures,
90102
"config": config,
91103
"ruleID": rule_id,
92104
}
105+
if not self._is_unique_exposure(user, _CONFIG_EXPOSURE_EVENT, event.metadata):
106+
return
93107
if is_manual_exposure:
94108
event.metadata["isManualExposure"] = "true"
95109

@@ -111,20 +125,24 @@ def log_layer_exposure(self, user, layer: Layer, parameter_name: str,
111125
exposures = config_evaluation.secondary_exposures
112126
allocated_experiment = config_evaluation.allocated_experiment
113127

114-
event.metadata = {
128+
metadata = {
115129
"config": layer.name,
116130
"ruleID": layer.rule_id,
117131
"allocatedExperiment": allocated_experiment,
118132
"parameterName": parameter_name,
119133
"isExplicitParameter": "true" if is_explicit else "false"
120134
}
135+
if not self._is_unique_exposure(user, _LAYER_EXPOSURE_EVENT, metadata):
136+
return
137+
event.metadata = metadata
121138
if is_manual_exposure:
122139
event.metadata["isManualExposure"] = "true"
123140

124141
event._secondary_exposures = [] if exposures is None else exposures
125142

126143
_safe_add_evaluation_to_event(
127144
config_evaluation.evaluation_details, event)
145+
128146
self.log(event)
129147

130148
def flush(self):
@@ -157,11 +175,19 @@ def _periodic_flush(self, shutdown_event):
157175
except Exception as e:
158176
self._error_boundary.log_exception(e)
159177

178+
def _periodic_dedupe_clear(self, shutdown_event):
179+
while True:
180+
try:
181+
if shutdown_event.wait(self._logging_interval):
182+
break
183+
self._deduper = set()
184+
except Exception as e:
185+
self._error_boundary.log_exception(e)
186+
160187
def _periodic_retry(self, shutdown_event):
161188
while True:
162189
if shutdown_event.wait(self._retry_interval):
163190
break
164-
165191
length = len(self._retry_logs)
166192
for _i in range(length):
167193
try:
@@ -178,3 +204,24 @@ def _periodic_retry(self, shutdown_event):
178204
return
179205

180206
self._retry_logs.append(RetryableLogs(retry_logs.payload, retry_logs.retries))
207+
208+
def _is_unique_exposure(self, user, eventName: str, metadata: dict or None) -> bool:
209+
if user is None:
210+
return True
211+
if len(self._deduper) > 10000:
212+
self._deduper = set()
213+
custom_id_key = ''
214+
if user.custom_ids and isinstance(user.custom_ids, dict):
215+
custom_id_key = ','.join(user.custom_ids.values())
216+
217+
metadata_key = ''
218+
if metadata and isinstance(metadata, dict):
219+
metadata_key = ','.join(str(value) for key, value in metadata.items() if key not in _IGNORED_METADATA_KEYS)
220+
221+
key = ','.join(str(item) for item in [user.user_id, custom_id_key, eventName, metadata_key])
222+
223+
if key in self._deduper:
224+
return False
225+
226+
self._deduper.add(key)
227+
return True

statsig/statsig_server.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ def __init__(self) -> None:
3737
self._initialized = False
3838

3939
def initialize_with_timeout(self, sdkKey: str, options=None):
40-
thread = threading.Thread(target=self.initialize, args=(sdkKey, options))
40+
thread = threading.Thread(
41+
target=self.initialize, args=(sdkKey, options))
4142
thread.start()
4243
thread.join(timeout=options.init_timeout)
4344
if thread.is_alive():
@@ -62,7 +63,8 @@ def initialize(self, sdkKey: str, options=None):
6263
self._options = options
6364
self.__shutdown_event = threading.Event()
6465
self.__statsig_metadata = _StatsigMetadata.get()
65-
self._network = _StatsigNetwork(sdkKey, options, self._errorBoundary)
66+
self._network = _StatsigNetwork(
67+
sdkKey, options, self._errorBoundary)
6668
self._logger = _StatsigLogger(
6769
self._network, self.__shutdown_event, self.__statsig_metadata, self._errorBoundary,
6870
options)
@@ -81,7 +83,8 @@ def task():
8183
if not self._verify_inputs(user, gate_name):
8284
return False
8385

84-
result = self.__check_gate_server_fallback(user, gate_name, log_exposure)
86+
result = self.__check_gate_server_fallback(
87+
user, gate_name, log_exposure)
8588
return result.boolean_value
8689

8790
return self._errorBoundary.capture(task, lambda: False)
@@ -98,7 +101,8 @@ def task():
98101
if not self._verify_inputs(user, config_name):
99102
return DynamicConfig({}, config_name, "")
100103

101-
result = self.__get_config_server_fallback(user, config_name, log_exposure)
104+
result = self.__get_config_server_fallback(
105+
user, config_name, log_exposure)
102106
return DynamicConfig(
103107
result.json_value, config_name, result.rule_id)
104108

@@ -173,7 +177,8 @@ def task():
173177
self._errorBoundary.swallow(task)
174178

175179
def flush(self):
176-
self._logger.flush()
180+
if self._logger is not None:
181+
self._logger.flush()
177182

178183
def shutdown(self):
179184
def task():
@@ -272,8 +277,10 @@ def _verify_inputs(self, user: StatsigUser, variable_name: str):
272277
return True
273278

274279
def _verify_bg_threads_running(self):
275-
self._logger.spawn_bg_threads_if_needed()
276-
self._spec_store.spawn_bg_threads_if_needed()
280+
if self._logger is not None:
281+
self._logger.spawn_bg_threads_if_needed()
282+
if self._spec_store is not None:
283+
self._spec_store.spawn_bg_threads_if_needed()
277284

278285
def __check_gate_server_fallback(
279286
self, user: StatsigUser, gate_name: str, log_exposure=True):

tests/test_concurrency.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import random
12
import threading
23
import time
34
import os
@@ -92,22 +93,25 @@ def test_checking_and_updating_concurrently(self, mock_post, mock_get):
9293
for t in self.threads:
9394
t.join()
9495

95-
self.assertEqual(200, len(statsig.get_instance()._logger._events))
96+
self.assertEqual(201, len(statsig.get_instance()._logger._events))
9697
self.assertEqual(1600, self._event_count)
9798
statsig.shutdown()
9899

99100
self.assertEqual(0, len(statsig.get_instance()._logger._events))
100-
self.assertEqual(1800, self._event_count)
101+
self.assertEqual(1801, self._event_count)
101102

102103
def run_checks(self, interval, times):
103104
for x in range(times):
105+
salt = str(random.randint(1, 10000000000))
104106
user = StatsigUser(
105-
f'user_id_{x}', email="testuser@statsig.com", private_attributes={"test": 123})
107+
f'user_id_{x}', email="testuser@statsig.com", private_attributes={"test": 123}, custom_ids={'salt': salt})
106108
statsig.log_event(StatsigEvent(
107109
user, "test_event", 1, {"key": "value"}))
108110
self.assertEqual(True, statsig.check_gate(
109111
user, "on_for_statsig_email"))
110112
self.assertEqual(True, statsig.check_gate(user, "always_on_gate"))
113+
self.assertFalse(statsig.check_gate(
114+
StatsigUser(f'user_id_{salt}{x}'), "on_for_id_list"))
111115
self.assertTrue(statsig.check_gate(
112116
StatsigUser("regular_user_id"), "on_for_id_list"))
113117

tests/test_logger.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import random
12
import unittest
23
from unittest.mock import patch
34

@@ -36,23 +37,63 @@ def on_log(url: str, data: dict):
3637
@patch('requests.post', side_effect=_network_stub.mock)
3738
def test_log_size(self, mock_post):
3839
self._instance.check_gate(self._user, "a_gate")
39-
self._instance.check_gate(self._user, "a_gate")
40+
self._instance.check_gate(self._user, "b_gate")
4041

4142
self.assertEqual(len(self._events), 0)
4243

43-
self._instance.check_gate(self._user, "a_gate")
44+
self._instance.check_gate(self._user, "c_gate")
4445
self.assertEqual(len(self._events), 3)
4546

46-
self._instance.check_gate(self._user, "a_gate")
47+
self._instance.check_gate(self._user, "d_gate")
4748
self.assertEqual(len(self._events), 3)
49+
self._instance.check_gate(self._user, "e_gate")
50+
self._instance.check_gate(self._user, "f_gate")
51+
self.assertEqual(len(self._events), 6)
52+
53+
@patch('requests.post', side_effect=_network_stub.mock)
54+
def test_exposure_dedupe(self, mock_post):
4855
self._instance.check_gate(self._user, "a_gate")
4956
self._instance.check_gate(self._user, "a_gate")
50-
self.assertEqual(len(self._events), 6)
57+
58+
self.assertEqual(len(self._events), 0)
5159

5260
self._instance.check_gate(self._user, "a_gate")
61+
# doesnt flush yet, because they are deduped
62+
self.assertEqual(len(self._events), 0)
63+
64+
self._instance.check_gate(self._user, "b_gate")
65+
self.assertEqual(len(self._events), 0)
66+
self._instance.check_gate(self._user, "c_gate")
67+
self.assertEqual(len(self._events), 3)
68+
self._instance.check_gate(self._user, "a_gate")
69+
self._instance.check_gate(self._user, "b_gate")
70+
self._instance.check_gate(self._user, "c_gate")
71+
self.assertEqual(len(self._events), 3)
72+
73+
self._instance.get_config(self._user, "a_gate")
74+
self._instance.get_config(self._user, "b_gate")
75+
self._instance.get_config(self._user, "b_gate")
76+
self._instance.get_config(self._user, "b_gate")
77+
self._instance.get_config(self._user, "a_gate")
78+
self._instance.get_config(self._user, "c_gate")
79+
self.assertEqual(len(self._events), 6)
80+
81+
self._instance.check_gate(self._user, "d_gate")
5382
self._instance.flush()
5483
self.assertEqual(len(self._events), 7)
5584

85+
# get layer does not expose
86+
self._instance.get_layer(self._user, "a_gate")
87+
self._instance.get_layer(self._user, "b_gate")
88+
self._instance.get_layer(self._user, "c_gate")
89+
self.assertEqual(len(self._events), 7)
90+
91+
self._instance.get_experiment(StatsigUser(str(random.randint(1, 10000000000))), "a_gate")
92+
self._instance.get_experiment(StatsigUser(str(random.randint(1, 10000000000))), "a_gate")
93+
self._instance.get_experiment(StatsigUser(str(random.randint(1, 10000000000))), "a_gate")
94+
self.assertEqual(len(self._events), 10)
95+
96+
5697
@patch('requests.post', side_effect=_network_stub.mock)
5798
def test_log_content(self, mock_post):
5899
self._instance.check_gate(self._user, "a_gate")

0 commit comments

Comments
 (0)