Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions ydb/core/tx/datashard/datashard_user_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ void TDataShardUserDb::CommitChanges(const TTableId& tableId, ui64 lockId, const
auto* info = Self.GetVolatileTxManager().FindByCommitTxId(txId);
if (info && info->State != EVolatileTxState::Aborting) {
if (VolatileDependencies.insert(txId).second && !VolatileTxId) {
if (GlobalTxId == 0) {
throw TNeedGlobalTxId();
}
SetVolatileTxId(GlobalTxId);
}
}
Expand Down Expand Up @@ -564,6 +567,9 @@ void TDataShardUserDb::CheckWriteConflicts(const TTableId& tableId, TArrayRef<co
} else if (auto* cached = Self.GetConflictsCache().GetTableCache(localTableId).FindUncommittedWrites(keyCells)) {
for (ui64 txId : *cached) {
BreakWriteConflict(txId);
if (NeedGlobalTxId) {
throw TNeedGlobalTxId();
}
}
return;
} else {
Expand All @@ -581,6 +587,10 @@ void TDataShardUserDb::CheckWriteConflicts(const TTableId& tableId, TArrayRef<co
nullptr, txObserver
);

if (NeedGlobalTxId) {
throw TNeedGlobalTxId();
}

if (res.Ready == NTable::EReady::Page) {
if (mustFindConflicts || LockTxId) {
// We must gather all conflicts
Expand All @@ -590,6 +600,9 @@ void TDataShardUserDb::CheckWriteConflicts(const TTableId& tableId, TArrayRef<co
// Upgrade to volatile ordered commit and ignore the page fault
if (!VolatileCommitOrdered) {
if (!VolatileTxId) {
if (GlobalTxId == 0) {
throw TNeedGlobalTxId();
}
SetVolatileTxId(GlobalTxId);
}
VolatileCommitOrdered = true;
Expand Down Expand Up @@ -634,6 +647,10 @@ void TDataShardUserDb::BreakWriteConflict(ui64 txId) {
// it into a real volatile transaction, it works as usual in
// every sense, only persistent commit order is affected by
// a dependency below.
if (GlobalTxId == 0) {
NeedGlobalTxId = true;
return;
}
SetVolatileTxId(GlobalTxId);
}
VolatileDependencies.insert(info->TxId);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard_user_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class TDataShardUserDb final
absl::flat_hash_set<ui64> CommittedLockChanges;
absl::flat_hash_map<TPathId, TIntrusivePtr<NTable::TDynamicTransactionMap>> TxMaps;
absl::flat_hash_map<TPathId, NTable::ITransactionObserverPtr> TxObservers;
bool NeedGlobalTxId = false;

absl::flat_hash_set<ui64> VolatileCommitTxIds;
YDB_ACCESSOR_DEF(absl::flat_hash_set<ui64>, VolatileDependencies);
Expand Down
81 changes: 80 additions & 1 deletion ydb/core/tx/datashard/datashard_ut_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ using namespace NSchemeShard;
using namespace Tests;

Y_UNIT_TEST_SUITE(DataShardWrite) {

constexpr i32 operator""_i32(unsigned long long val) { return static_cast<i32>(val); }
constexpr ui32 operator""_ui32(unsigned long long val) { return static_cast<ui32>(val); }

const TString expectedTableState = "key = 0, value = 1\nkey = 2, value = 3\nkey = 4, value = 5\n";

std::tuple<TTestActorRuntime&, Tests::TServer::TPtr, TActorId> TestCreateServer(std::optional<TServerSettings> serverSettings = {}) {
Expand Down Expand Up @@ -1687,5 +1691,80 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
}
}

} // Y_UNIT_TEST_SUITE
Y_UNIT_TEST(DelayedVolatileTxAndEvWrite) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableDataShardVolatileTransactions(true);

auto [runtime, server, sender] = TestCreateServer(serverSettings);

TDisableDataShardLogBatching disableDataShardLogBatching;

UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key int, a int, b int, c int, PRIMARY KEY (key))
WITH (PARTITION_AT_KEYS = (10));
)"),
"SUCCESS");

const auto shards = GetTableShards(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
const auto tableId = ResolveTableId(server, sender, "/Root/table");

auto [tablesMap, ownerId_] = GetTablesByPathId(server, shards.at(0));

// Start blocking readsets
TBlockEvents<TEvTxProcessing::TEvReadSet> blockedReadSets(runtime);

// Prepare a distributed upsert
Cerr << "... starting a distributed upsert" << Endl;
auto upsertFuture = KqpSimpleSend(runtime, R"(
UPSERT INTO `/Root/table` (key, a, b, c) VALUES (1, 2, 2, 2), (11, 12, 12, 12);
)");
runtime.WaitFor("blocked readsets", [&]{ return blockedReadSets.size() >= 4; });

// 1. Make an upsert to (key, b)
{
Cerr << "... making a write to " << shards.at(0) << Endl;
auto req = MakeWriteRequest(
std::nullopt,
NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE,
NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
tableId,
{ 1_ui32, 3_ui32 },
{ TCell::Make(1_i32), TCell::Make(3_i32) });
Write(runtime, sender, shards.at(0), std::move(req));
}

// 1. Make an upsert to (key, c)
{
Cerr << "... making a write to " << shards.at(0) << Endl;
auto req = MakeWriteRequest(
std::nullopt,
NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE,
NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
tableId,
{ 1_ui32, 4_ui32 },
{ TCell::Make(1_i32), TCell::Make(4_i32) });
Write(runtime, sender, shards.at(0), std::move(req));
}

// Unblock readsets
blockedReadSets.Stop().Unblock();

runtime.SimulateSleep(TDuration::MilliSeconds(1));

// Make a validating read, the volatile tx changes must not be lost
Cerr << "... validating table" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
SELECT key, a, b, c FROM `/Root/table` ORDER BY key;
)"),
"{ items { int32_value: 1 } items { int32_value: 2 } items { int32_value: 3 } items { int32_value: 4 } }, "
"{ items { int32_value: 11 } items { int32_value: 12 } items { int32_value: 12 } items { int32_value: 12 } }");
}

} // Y_UNIT_TEST_SUITE(DataShardWrite)
} // namespace NKikimr
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/execute_write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
if (commitTxIds) {
TVector<ui64> participants(awaitingDecisions.begin(), awaitingDecisions.end());
DataShard.GetVolatileTxManager().PersistAddVolatileTx(
txId,
userDb.GetVolatileTxId(),
writeVersion,
commitTxIds,
userDb.GetVolatileDependencies(),
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2156,6 +2156,21 @@ std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(std::optional<u
return evWrite;
}

std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(std::optional<ui64> txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, const TTableId& tableId, const std::vector<ui32>& columnIds, const std::vector<TCell>& cells) {
UNIT_ASSERT((cells.size() % columnIds.size()) == 0);

TSerializedCellMatrix matrix(cells, cells.size() / columnIds.size(), columnIds.size());
TString blobData = matrix.ReleaseBuffer();

std::unique_ptr<NKikimr::NEvents::TDataEvents::TEvWrite> evWrite = txId
? std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(*txId, txMode)
: std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txMode);
ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
evWrite->AddOperation(operationType, tableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);

return evWrite;
}

std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequestOneKeyValue(std::optional<ui64> txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui64 key, ui64 value) {
UNIT_ASSERT_VALUES_EQUAL(columns.size(), 2);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/ut_common/datashard_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,7 @@ void ExecSQL(Tests::TServer::TPtr server,
TRowVersion AcquireReadSnapshot(TTestActorRuntime& runtime, const TString& databaseName, ui32 nodeIndex = 0);

std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(std::optional<ui64> txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 seed = 0);
std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(std::optional<ui64> txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, const TTableId& tableId, const std::vector<ui32>& columnIds, const std::vector<TCell>& cells);
std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequestOneKeyValue(std::optional<ui64> txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui64 key, ui64 value);

NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
Expand Down
Loading