|
1 | 1 | #include "datashard_impl.h" |
2 | 2 | #include "datashard_txs.h" |
3 | 3 | #include "datashard_locks_db.h" |
| 4 | +#include "memory_state_migration.h" |
4 | 5 | #include "probes.h" |
5 | 6 |
|
6 | 7 | #include <ydb/core/base/interconnect_channels.h> |
@@ -174,6 +175,15 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info) |
174 | 175 | RegisterDataShardProbes(); |
175 | 176 | } |
176 | 177 |
|
| 178 | +TDataShard::~TDataShard() { |
| 179 | + if (InMemoryRestoreActor) { |
| 180 | + InMemoryRestoreActor->OnTabletDestroyed(); |
| 181 | + } |
| 182 | + if (InMemoryStateActor) { |
| 183 | + InMemoryStateActor->OnTabletDestroyed(); |
| 184 | + } |
| 185 | +} |
| 186 | + |
177 | 187 | void TDataShard::OnDetach(const TActorContext &ctx) { |
178 | 188 | Cleanup(ctx); |
179 | 189 | LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "OnDetach: " << TabletID()); |
@@ -265,6 +275,13 @@ void TDataShard::Cleanup(const TActorContext& ctx) { |
265 | 275 | } |
266 | 276 |
|
267 | 277 | void TDataShard::Die(const TActorContext& ctx) { |
| 278 | + if (InMemoryRestoreActor) { |
| 279 | + InMemoryRestoreActor->OnTabletDead(); |
| 280 | + } |
| 281 | + if (InMemoryStateActor) { |
| 282 | + InMemoryStateActor->OnTabletDead(); |
| 283 | + } |
| 284 | + |
268 | 285 | NTabletPipe::CloseAndForgetClient(SelfId(), SchemeShardPipe); |
269 | 286 | NTabletPipe::CloseAndForgetClient(SelfId(), StateReportPipe); |
270 | 287 | NTabletPipe::CloseAndForgetClient(SelfId(), DbStatsReportPipe); |
@@ -1620,6 +1637,29 @@ void TDataShard::PersistSys(NIceDb::TNiceDb& db, ui64 key, bool value) const { |
1620 | 1637 | db.Table<Schema::Sys>().Key(key).Update(NIceDb::TUpdate<Schema::Sys::Uint64>(value ? 1 : 0)); |
1621 | 1638 | } |
1622 | 1639 |
|
| 1640 | +void TDataShard::PersistSys(NIceDb::TNiceDb& db, ui64 key, const TActorId& value) const { |
| 1641 | + char buf[sizeof(ui64) * 2]; |
| 1642 | + WriteUnaligned<ui64>(buf, value.RawX1()); |
| 1643 | + WriteUnaligned<ui64>(buf + sizeof(ui64), value.RawX2()); |
| 1644 | + db.Table<Schema::Sys>().Key(key).Update(NIceDb::TUpdate<Schema::Sys::Bytes>(TString(buf, sizeof(ui64) * 2))); |
| 1645 | +} |
| 1646 | + |
| 1647 | +bool TDataShard::SysGetActorId(NIceDb::TNiceDb& db, ui64 key, TActorId& value) { |
| 1648 | + auto rowset = db.Table<Schema::Sys>().Key(key).Select<Schema::Sys::Bytes>(); |
| 1649 | + if (!rowset.IsReady()) { |
| 1650 | + return false; |
| 1651 | + } |
| 1652 | + if (rowset.IsValid()) { |
| 1653 | + TString buf = rowset.GetValue<Schema::Sys::Bytes>(); |
| 1654 | + Y_ABORT_UNLESS(buf.size() == sizeof(ui64) * 2, "Unexpected TActorId value size %" PRISZT, buf.size()); |
| 1655 | + const char* data = buf.data(); |
| 1656 | + ui64 x1 = ReadUnaligned<ui64>(data); |
| 1657 | + ui64 x2 = ReadUnaligned<ui64>(data + sizeof(ui64)); |
| 1658 | + value = TActorId(x1, x2); |
| 1659 | + } |
| 1660 | + return true; |
| 1661 | +} |
| 1662 | + |
1623 | 1663 | void TDataShard::PersistUserTable(NIceDb::TNiceDb& db, ui64 tableId, const TUserTable& tableInfo) { |
1624 | 1664 | db.Table<Schema::UserTables>().Key(tableId).Update( |
1625 | 1665 | NIceDb::TUpdate<Schema::UserTables::LocalTid>(tableInfo.LocalTid), |
@@ -2484,11 +2524,18 @@ void TDataShard::SendImmediateWriteResult( |
2484 | 2524 | const ui64 step = version.Step; |
2485 | 2525 | const ui64 observedStep = GetMaxObservedStep(); |
2486 | 2526 | if (step <= observedStep) { |
2487 | | - SnapshotManager.PromoteImmediateWriteEdgeReplied(version); |
2488 | | - if (!sessionId) { |
2489 | | - Send(target, event, 0, cookie, span.GetTraceId()); |
| 2527 | + // We avoid sending replies that would have promoted the replied edge |
| 2528 | + // when it's frozen. This prevents us replying and causing the next |
| 2529 | + // generation to potentially keep reading stale data. |
| 2530 | + if (Y_LIKELY(!InMemoryVarsFrozen) || version <= SnapshotManager.GetImmediateWriteEdgeReplied()) { |
| 2531 | + SnapshotManager.PromoteImmediateWriteEdgeReplied(version); |
| 2532 | + if (!sessionId) { |
| 2533 | + Send(target, event, 0, cookie, span.GetTraceId()); |
| 2534 | + } else { |
| 2535 | + SendViaSession(sessionId, target, SelfId(), event, 0, cookie, span.GetTraceId()); |
| 2536 | + } |
2490 | 2537 | } else { |
2491 | | - SendViaSession(sessionId, target, SelfId(), event, 0, cookie, span.GetTraceId()); |
| 2538 | + span.EndError("Dropped"); |
2492 | 2539 | } |
2493 | 2540 | return; |
2494 | 2541 | } |
@@ -2627,13 +2674,20 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo |
2627 | 2674 | } |
2628 | 2675 |
|
2629 | 2676 | if (step <= mediatorStep) { |
2630 | | - if (it->second.Span) { |
2631 | | - it->second.Span.Attribute("ActivateStep", std::to_string(mediatorStep)); |
| 2677 | + // We avoid sending replies that would have promoted the replied edge |
| 2678 | + // when it's frozen. This prevents us replying and causing the next |
| 2679 | + // generation to potentially keep reading stale data. |
| 2680 | + if (Y_LIKELY(!InMemoryVarsFrozen) || it->first <= SnapshotManager.GetImmediateWriteEdgeReplied()) { |
| 2681 | + if (it->second.Span) { |
| 2682 | + it->second.Span.Attribute("ActivateStep", std::to_string(mediatorStep)); |
| 2683 | + } |
| 2684 | + SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first); |
| 2685 | + SendViaSession(it->second.SessionId, |
| 2686 | + it->second.Target, SelfId(), it->second.Event.Release(), |
| 2687 | + 0, it->second.Cookie, it->second.Span.GetTraceId()); |
| 2688 | + } else { |
| 2689 | + it->second.Span.EndError("Dropped"); |
2632 | 2690 | } |
2633 | | - SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first); |
2634 | | - SendViaSession(it->second.SessionId, |
2635 | | - it->second.Target, SelfId(), it->second.Event.Release(), |
2636 | | - 0, it->second.Cookie, it->second.Span.GetTraceId()); |
2637 | 2691 | it = MediatorDelayedReplies.erase(it); |
2638 | 2692 | continue; |
2639 | 2693 | } |
@@ -2693,6 +2747,12 @@ bool TDataShard::NeedMediatorStateRestored() const { |
2693 | 2747 | return false; |
2694 | 2748 | } |
2695 | 2749 |
|
| 2750 | + if (InMemoryVarsRestored) { |
| 2751 | + // We have migrated in-memory vars from previous generations |
| 2752 | + // We don't need mediator state for correctness |
| 2753 | + return false; |
| 2754 | + } |
| 2755 | + |
2696 | 2756 | switch (State) { |
2697 | 2757 | case TShardState::Ready: |
2698 | 2758 | case TShardState::Readonly: |
|
0 commit comments