Skip to content

Commit 46b9294

Browse files
correct scripts initialization (#14012)
1 parent 645f5bd commit 46b9294

File tree

5 files changed

+83
-34
lines changed

5 files changed

+83
-34
lines changed

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,64 @@ class TFetchingScript {
173173
ui32 Execute(const ui32 startStepIdx, const std::shared_ptr<IDataSource>& source) const;
174174
};
175175

176+
class TFetchingScriptOwner: TNonCopyable {
177+
private:
178+
TAtomic InitializationDetector = 0;
179+
std::shared_ptr<TFetchingScript> Script;
180+
181+
void FinishInitialization(std::shared_ptr<TFetchingScript>&& script) {
182+
AFL_VERIFY(AtomicCas(&InitializationDetector, 1, 2));
183+
Script = std::move(script);
184+
}
185+
186+
public:
187+
const std::shared_ptr<TFetchingScript>& GetScriptVerified() const {
188+
AFL_VERIFY(Script);
189+
return Script;
190+
}
191+
192+
TString DebugString() const {
193+
if (Script) {
194+
return TStringBuilder() << Script->DebugString() << Endl;
195+
} else {
196+
return TStringBuilder() << "NO_SCRIPT" << Endl;
197+
}
198+
}
199+
200+
bool HasScript() const {
201+
return !!Script;
202+
}
203+
204+
bool NeedInitialization() const {
205+
return AtomicGet(InitializationDetector) != 1;
206+
}
207+
208+
class TInitializationGuard: TNonCopyable {
209+
private:
210+
TFetchingScriptOwner& Owner;
211+
212+
public:
213+
TInitializationGuard(TFetchingScriptOwner& owner)
214+
: Owner(owner) {
215+
Owner.StartInitialization();
216+
}
217+
void InitializationFinished(std::shared_ptr<TFetchingScript>&& script) {
218+
Owner.FinishInitialization(std::move(script));
219+
}
220+
~TInitializationGuard() {
221+
AFL_VERIFY(!Owner.NeedInitialization());
222+
}
223+
};
224+
225+
std::optional<TInitializationGuard> StartInitialization() {
226+
if (AtomicCas(&InitializationDetector, 2, 0)) {
227+
return std::optional<TInitializationGuard>(*this);
228+
} else {
229+
return std::nullopt;
230+
}
231+
}
232+
};
233+
176234
class TColumnsAccumulator {
177235
private:
178236
TColumnsSetIds FetchingReadyColumns;

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -59,22 +59,18 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(c
5959
}
6060
}
6161
{
62-
auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
62+
auto& result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
6363
[needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0];
64-
if (!result) {
65-
TGuard<TMutex> wg(Mutex);
66-
result = CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
67-
[needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0];
68-
if (!result) {
69-
result = BuildColumnsFetchingPlan(
70-
needSnapshots, isWholeExclusiveSource, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions);
71-
CacheFetchingScripts[needSnapshots ? 1 : 0][isWholeExclusiveSource ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0]
72-
[needShardingFilter ? 1 : 0][hasDeletions ? 1 : 0] = result;
64+
if (result.NeedInitialization()) {
65+
TGuard<TMutex> g(Mutex);
66+
if (auto gInit = result.StartInitialization()) {
67+
gInit->InitializationFinished(BuildColumnsFetchingPlan(
68+
needSnapshots, isWholeExclusiveSource, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions));
7369
}
70+
AFL_VERIFY(!result.NeedInitialization());
7471
}
75-
AFL_VERIFY(result);
76-
if (*result) {
77-
return *result;
72+
if (result.HasScript()) {
73+
return result.GetScriptVerified();
7874
} else {
7975
std::shared_ptr<TFetchingScript> result = std::make_shared<TFetchingScript>(*this);
8076
result->SetBranchName("FAKE");
@@ -234,9 +230,9 @@ TString TSpecialReadContext::ProfileDebugString() const {
234230
};
235231

236232
for (ui32 i = 0; i < (1 << 6); ++i) {
237-
auto script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)][GetBit(i, 5)];
238-
if (script && *script) {
239-
sb << (*script)->DebugString() << ";";
233+
auto& script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)][GetBit(i, 5)];
234+
if (script.HasScript()) {
235+
sb << script.DebugString() << ";";
240236
}
241237
}
242238
return sb;

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class TSpecialReadContext: public NCommon::TSpecialReadContext {
2828
std::shared_ptr<TFetchingScript> BuildColumnsFetchingPlan(const bool needSnapshotsFilter, const bool exclusiveSource,
2929
const bool partialUsageByPredicate, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const;
3030
TMutex Mutex;
31-
std::array<std::array<std::array<std::array<std::array<std::array<std::optional<std::shared_ptr<TFetchingScript>>, 2>, 2>, 2>, 2>, 2>, 2>
31+
std::array<std::array<std::array<std::array<std::array<std::array<NCommon::TFetchingScriptOwner, 2>, 2>, 2>, 2>, 2>, 2>
3232
CacheFetchingScripts;
3333
std::shared_ptr<TFetchingScript> AskAccumulatorsScript;
3434

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,17 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(c
4848
}
4949
}
5050
{
51-
auto result = CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0]
51+
auto& result = CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0]
5252
[hasDeletions ? 1 : 0];
53-
if (!result) {
54-
TGuard<TMutex> wg(Mutex);
55-
result = CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0]
56-
[hasDeletions ? 1 : 0];
57-
if (!result) {
58-
result = BuildColumnsFetchingPlan(needSnapshots, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions);
59-
CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0]
60-
[hasDeletions ? 1 : 0] = result;
53+
if (result.NeedInitialization()) {
54+
TGuard<TMutex> g(Mutex);
55+
if (auto gInit = result.StartInitialization()) {
56+
gInit->InitializationFinished(
57+
BuildColumnsFetchingPlan(needSnapshots, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions));
6158
}
59+
AFL_VERIFY(!result.NeedInitialization());
6260
}
63-
AFL_VERIFY(result);
64-
AFL_VERIFY(*result);
65-
return *result;
61+
return result.GetScriptVerified();
6662
}
6763
}
6864

@@ -144,9 +140,9 @@ TString TSpecialReadContext::ProfileDebugString() const {
144140
};
145141

146142
for (ui32 i = 0; i < (1 << 5); ++i) {
147-
auto script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)];
148-
if (script && *script) {
149-
sb << (*script)->DebugString() << ";";
143+
auto& script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)];
144+
if (script.HasScript()) {
145+
sb << script.DebugString() << ";";
150146
}
151147
}
152148
return sb;

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ class TSpecialReadContext: public NCommon::TSpecialReadContext {
2323
std::shared_ptr<TFetchingScript> BuildColumnsFetchingPlan(const bool needSnapshots, const bool partialUsageByPredicateExt,
2424
const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const;
2525
TMutex Mutex;
26-
std::array<std::array<std::array<std::array<std::array<std::optional<std::shared_ptr<TFetchingScript>>, 2>, 2>, 2>, 2>, 2>
27-
CacheFetchingScripts;
26+
std::array<std::array<std::array<std::array<std::array<NCommon::TFetchingScriptOwner, 2>, 2>, 2>, 2>, 2> CacheFetchingScripts;
2827
std::shared_ptr<TFetchingScript> AskAccumulatorsScript;
2928

3029
virtual std::shared_ptr<TFetchingScript> DoGetColumnsFetchingPlan(const std::shared_ptr<NCommon::IDataSource>& source) override;

0 commit comments

Comments
 (0)