Skip to content

Coordinator: per node mediators instead of tablets #6030

@snaury

Description

@snaury

Testing queue-like workload showed a huge disadvantage to the way current mediator time is propagated. Tablets that belong to a mediator are split into buckets, and each bucket advances its mediator time when all transactions in that bucket are acknowledged. Unfortunately this acknowledgement may take an unexpectedly long time, for example, given round-trip time between nodes is rtt:

  • @ 0.0rtt KQP node receives a read request, which needs a snapshot. The request will take 0.5 rtt to arrive to coordinator.
  • @ 0.5rtt Coordinator plans a new step S with some transactions and sends it to mediators, which will travel for 0.5 rtt.
  • @ 0.5rtt Coordinator receives a new snapshot request and replies with S. This step will travel back to KQP node for 0.5 rtt.
  • @ 1.0rtt KQP node receives snapshot and sends a read request to shard 1. This request will travel for 0.5 rtt.
  • @ 1.0rtt Plan step S arrives to mediator. Since it has transactions, mediator time is not updated, plans are forwarded to participating shards. They will travel for 0.5 rtt.
  • @ 1.5rtt Shard 1 receives a read request at step S, this is not the current time so it starts waiting (background request for this step is not important).
  • @ 1.5rtt Shard 2 receives transactions at step S, it starts to persist the plan, which will take 1.0 rtt.
  • @ 2.5rtt Shard 2 finishes committing the plan, and sends acknowledgement to mediator, it will travel for 0.5 rtt.
  • @ 3.0rtt Mediator receives acknowledgements and advances mediator time, it is broadcast to nodes and will reach shard 1 node in 0.5 rtt.
  • @ 3.5rtt Shard 1 can finally see that step S arrived and it has no preceding transactions. Let's say read request is served from memory and response is sent immediately. It will travel for 0.5 rtt.
  • @ 4.0rtt KQP node receives a read response and replies to client.

It seemed like mediators only add 0.5 rtt which is not much, but as it turns out it may block mediator time updates for a very long time. What we need is a way to stop using these "buckets" and use personal per-participant mediator time.

To do that we need to make mediators per-node, which will subscribe to coordinators on behalf of running tablets and then distribute merged transactions to these tablets. Since this merging will be effectively per-tablet, mediator time will also be per-tablet, although we need a way to optimize the majority of idle tablets to observe updated time with a single atomic (when possible).

The first step would be a way for actors (nodes) to subscribe to a subset of the transaction stream (filtering it by tablet), and for coordinators to then stream relevant steps to these subscribers. When a new tablet is needed nodes will add/remove tablets to the stream and we need a way for additional events to be injected at the front of the queue, so nodes can determine when historic events end and the complete stream begins.

Challenges:

  • The number of tablets may be huge (millions), a simple hash table for routing might consume a lot of memory. Subscribing to these millions of tablets may also be expensive. Probably need a way to batch these, and maybe optimize it somehow, so nodes begin receiving streams based on some hashes instead of individual tablets? Likely not really a problem, even for millions the hash table memory is in the order of dozens of megabytes.
  • The number of nodes coordinators will need to broadcast steps may become very large (thousands). However realistically it shouldn't be a problem, since mediators already have to broadcast timecast updates.
  • There will be two streams of transactions running in parallel, which may duplicate traffic (but it's very small). Shards should handle duplicates gracefully however.
  • We need a way for idle shards to use some common atomic variable, but also a way to reliably switch them between personal and common time. This is needed to avoid doing too many atomic updates for thousands of running idle shards, but may be ignored in the first version (so that all atomics are "personal").

Metadata

Metadata

Assignees

Labels

area/datashardIssues related to datashard tablets (relational table partitions)

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions