Skip to content
Merged
5 changes: 5 additions & 0 deletions .changeset/lemon-cars-count.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'svelte': patch
---

fix: rebase pending batches when other batches are committed
206 changes: 126 additions & 80 deletions packages/svelte/src/internal/client/reactivity/batch.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/** @import { Derived, Effect, Source } from '#client' */
/** @import { Derived, Effect, Source, Value } from '#client' */
import {
BLOCK_EFFECT,
BRANCH_EFFECT,
Expand All @@ -10,10 +10,11 @@ import {
INERT,
RENDER_EFFECT,
ROOT_EFFECT,
MAYBE_DIRTY
MAYBE_DIRTY,
DERIVED
} from '#client/constants';
import { async_mode_flag } from '../../flags/index.js';
import { deferred, define_property } from '../../shared/utils.js';
import { deferred, define_property, noop } from '../../shared/utils.js';
import {
active_effect,
is_dirty,
Expand Down Expand Up @@ -97,22 +98,8 @@ export class Batch {
#deferred = null;

/**
* True if an async effect inside this batch resolved and
* its parent branch was already deleted
*/
#neutered = false;

/**
* Async effects (created inside `async_derived`) encountered during processing.
* These run after the rest of the batch has updated, since they should
* always have the latest values
* @type {Effect[]}
*/
#async_effects = [];

/**
* The same as `#async_effects`, but for effects inside a newly-created
* `<svelte:boundary>` — these do not prevent the batch from committing
* Async effects inside a newly-created `<svelte:boundary>`
* — these do not prevent the batch from committing
* @type {Effect[]}
*/
#boundary_async_effects = [];
Expand Down Expand Up @@ -165,40 +152,15 @@ export class Batch {

previous_batch = null;

/** @type {Map<Source, { v: unknown, wv: number }> | null} */
var current_values = null;

// if there are multiple batches, we are 'time travelling' —
// we need to undo the changes belonging to any batch
// other than the current one
if (async_mode_flag && batches.size > 1) {
current_values = new Map();
batch_deriveds = new Map();

for (const [source, current] of this.current) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = current;
}

for (const batch of batches) {
if (batch === this) continue;

for (const [source, previous] of batch.#previous) {
if (!current_values.has(source)) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = previous;
}
}
}
}
var revert = Batch.apply(this);

for (const root of root_effects) {
this.#traverse_effect_tree(root);
}

// if we didn't start any new async work, and no async work
// is outstanding from a previous flush, commit
if (this.#async_effects.length === 0 && this.#pending === 0) {
if (this.#pending === 0) {
this.#commit();

var render_effects = this.#render_effects;
Expand All @@ -210,7 +172,7 @@ export class Batch {

// If sources are written to, then work needs to happen in a separate batch, else prior sources would be mixed with
// newly updated sources, which could lead to infinite loops when effects run over and over again.
previous_batch = current_batch;
previous_batch = this;
current_batch = null;

flush_queued_effects(render_effects);
Expand All @@ -223,27 +185,12 @@ export class Batch {
this.#defer_effects(this.#block_effects);
}

if (current_values) {
for (const [source, { v, wv }] of current_values) {
// reset the source to the current value (unless
// it got a newer value as a result of effects running)
if (source.wv <= wv) {
source.v = v;
}
}

batch_deriveds = null;
}

for (const effect of this.#async_effects) {
update_effect(effect);
}
revert();

for (const effect of this.#boundary_async_effects) {
update_effect(effect);
}

this.#async_effects = [];
this.#boundary_async_effects = [];
}

Expand Down Expand Up @@ -272,12 +219,8 @@ export class Batch {
} else if (async_mode_flag && (flags & RENDER_EFFECT) !== 0) {
this.#render_effects.push(effect);
} else if ((flags & CLEAN) === 0) {
if ((flags & ASYNC) !== 0) {
var effects = effect.b?.is_pending()
? this.#boundary_async_effects
: this.#async_effects;

effects.push(effect);
if ((flags & ASYNC) !== 0 && effect.b?.is_pending()) {
this.#boundary_async_effects.push(effect);
} else if (is_dirty(effect)) {
if ((effect.f & BLOCK_EFFECT) !== 0) this.#block_effects.push(effect);
update_effect(effect);
Expand Down Expand Up @@ -350,10 +293,6 @@ export class Batch {
}
}

neuter() {
this.#neutered = true;
}

flush() {
if (queued_root_effects.length > 0) {
this.activate();
Expand All @@ -374,13 +313,58 @@ export class Batch {
* Append and remove branches to/from the DOM
*/
#commit() {
if (!this.#neutered) {
for (const fn of this.#callbacks) {
fn();
}
for (const fn of this.#callbacks) {
fn();
}

this.#callbacks.clear();

// If there are other pending batches, they now need to be 'rebased' —
// in other words, we re-run block/async effects with the newly
// committed state, unless the batch in question has a more
// recent value for a given source
if (batches.size > 1) {
this.#previous.clear();

let is_earlier = true;

for (const batch of batches) {
if (batch === this) {
is_earlier = false;
continue;
}

for (const [source, value] of this.current) {
if (batch.current.has(source)) {
if (is_earlier) {
// bring the value up to date
batch.current.set(source, value);
} else {
// later batch has more recent value,
// no need to re-run these effects
continue;
}
}

mark_effects(source);
}

if (queued_root_effects.length > 0) {
current_batch = batch;
const revert = Batch.apply(batch);

for (const root of queued_root_effects) {
batch.#traverse_effect_tree(root);
}

queued_root_effects = [];
revert();
}
}

current_batch = null;
}

batches.delete(this);
}

Expand All @@ -402,9 +386,6 @@ export class Batch {
schedule_effect(e);
}

this.#render_effects = [];
this.#effects = [];

this.flush();
} else {
this.deactivate();
Expand Down Expand Up @@ -444,6 +425,51 @@ export class Batch {
static enqueue(task) {
queue_micro_task(task);
}

/**
* @param {Batch} current_batch
*/
static apply(current_batch) {
if (!async_mode_flag || batches.size === 1) {
return noop;
}

// if there are multiple batches, we are 'time travelling' —
// we need to undo the changes belonging to any batch
// other than the current one

/** @type {Map<Source, { v: unknown, wv: number }>} */
var current_values = new Map();
batch_deriveds = new Map();

for (const [source, current] of current_batch.current) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = current;
}

for (const batch of batches) {
if (batch === current_batch) continue;

for (const [source, previous] of batch.#previous) {
if (!current_values.has(source)) {
current_values.set(source, { v: source.v, wv: source.wv });
source.v = previous;
}
}
}

return () => {
for (const [source, { v, wv }] of current_values) {
// reset the source to the current value (unless
// it got a newer value as a result of effects running)
if (source.wv <= wv) {
source.v = v;
}
}

batch_deriveds = null;
};
}
}

/**
Expand Down Expand Up @@ -615,6 +641,26 @@ function flush_queued_effects(effects) {
eager_block_effects = null;
}

/**
* This is similar to `mark_reactions`, but it only marks async/block effects
* so that these can re-run after another batch has been committed
* @param {Value} value
*/
function mark_effects(value) {
if (value.reactions !== null) {
for (const reaction of value.reactions) {
const flags = reaction.f;

if ((flags & DERIVED) !== 0) {
mark_effects(/** @type {Derived} */ (reaction));
} else if ((flags & (ASYNC | BLOCK_EFFECT)) !== 0) {
set_signal_status(reaction, DIRTY);
schedule_effect(/** @type {Effect} */ (reaction));
}
}
}
}

/**
* @param {Effect} signal
* @returns {void}
Expand Down
Loading
Loading