- Notifications
You must be signed in to change notification settings - Fork 399
Watchers (aka one-time subscriptions) #6510
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
3966112 to 09adfc3 Compare
Gerold103 left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patchset! I didn't finish review of the last commit, will return later.
639d97f to a57635d Compare
Gerold103 left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fixes! Could you please not resolve my comments if possible? Otherwise I need to go an unhide them all to see what they were about, what was fixed how, etc.
Okay. |
Gerold103 left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fixes!
src/box/lua/net_box.lua Outdated
| -- The value was not updated while this watcher was running. | ||
| -- Append it to the list of ready watchers and send an ack to | ||
| -- the server unless we've already sent it. | ||
| table.insert(state.ready, self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
table.insert can fail with OOM. Then you won't send an ACK. But I don't know how to fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If table.insert fails with OOM, then the whole program will fall apart, because nobody handles OOM in Lua so I don't think this would be an issue. Anyway, if this function raises an exception, the connection will be closed with an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If table.insert fails with OOM, then the whole program will fall apart
Nope: https://www.lua.org/pil/24.3.1.html. Lua pcall() and its C API colleague lua_pcall() help you to catch even OOM errors (I have no idea how it handles OOM of the Lua stack during arguments push - it just doesn't probably). In particular, this is one of the reasons why coding in Lua I consider extra hard and dangerous - almost every single line can raise an exception. That significantly complicates doing several actions atomically. For instance, add something to a table or two and change some counters/flags. Or vice versa.
Ironically, when you want to raise an error like an assertion or an error injection - you can't do that properly either. Because you will pay for them even in release build.
As for your code, I did a simple test. For that you will need to apply this diff:
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 07a793b33..163ca0066 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -34,6 +34,10 @@ local E_UNKNOWN = box.error.UNKNOWN local E_NO_CONNECTION = box.error.NO_CONNECTION local E_PROC_LUA = box.error.PROC_LUA +local errinj = { + RAISE_BEFORE_WATCH = false, +} + -- Method types used internally by net.box. local M_PING = 0 local M_CALL_16 = 1 @@ -951,6 +955,11 @@ function watcher_methods:_run() -- The value was not updated while this watcher was running. -- Append it to the list of ready watchers and send an ack to -- the server unless we've already sent it. + if errinj.RAISE_BEFORE_WATCH then + errinj.RAISE_BEFORE_WATCH = false + log.error('watcher_methods:_run(): OOM during table.insert') + error('Out of memory') + end table.insert(state.ready, self) if not state.is_acknowledged then self._conn._transport.watch(state.key) @@ -1608,6 +1617,8 @@ this_module.self = { end } +this_module.errinj = errinj + setmetatable(this_module.self, { __index = function(self, key) if key == 'space' thenThen run this preparation:
netbox = require('net.box') box.cfg{listen = 3313} box.schema.user.grant('guest', 'super') box.broadcast('key', 1) c = netbox.connect(box.cfg.listen) do_fail_oom = false c:watch('key', function(k, v) print(v) netbox.errinj.RAISE_BEFORE_WATCH = do_fail_oom end)Now do this:
do_fail_oom = true box.broadcast('key', 2)You will see:
tarantool> 2 2021-10-28 00:00:29.317 [97132] main/122/lua net_box.lua:960 E> watcher_methods:_run(): OOM during table.insert 2021-10-28 00:00:29.317 [97132] main/122/lua utils.c:450 E> LuajitError: builtin/box/net_box.lua:961: Out of memory You might expect that when I do another broadcast, I will receive it and all will be fine.
box.broadcast('key', 3)But nothing happens on the client. The callback is never called again. The connection is active, c:ping() works just fine. But I will no longer receive events on this watcher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But nothing happens on the client. The callback is never called again. The connection is active.
You're right - I mistakenly thought that table.insert was called by the net.box worker fiber so that the oom error would break the worker loop, but it wouldn't, because it's done by the fiber running the callback.
Lua pcall() and its C API colleague lua_pcall() help you to catch even OOM errors
But I doubt anybody expects a memory error from an interpreted language with automatic garbage collection. A user application is likely to break somewhere in case of oom, e.g. because it fails to create a string on Lua stack. I bet most of our Lua code isn't ready for oom. That's why I think that the program will become unusable. A bad thing about handling oom at any point of code is that it's nearly impossible to properly test. That's why I think the best way to handle oom is to restart the service. AFAIU with GC64 Lua should never raise an oom error. With GC32, I think we should terminate the program in case of Lua failing to allocate memory.
That being said, I suggest to ignore oom errors for now in Lua and deal with the problem only if somebody actually complains about it (which I doubt) - the error is printed to the log, which should be enough to identify the problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A user application is likely to break somewhere in case of oom, e.g. because it fails to create a string on Lua stack.
It will raise an exception. Like most of our box methods do. User applications somehow deal with that. You can talk to solutions if you want to find more. AFAIK, they try to never fail even in case of OOM to avoid bad errors returned to users or not returning anything. On the top level there always should be a pcall which can convert the error into something more user-friendly than a crash and remote connections drop.
I bet most of our Lua code isn't ready for oom. That's why I think that the program will become unusable.
It will be usable. GC eventually will free more space. Especially if the offender's stack is unrolled and it contained a lot of garbage.
AFAIU with GC64 Lua should never raise an oom error.
AFAIK, it still is not enabled everywhere, is it?
With GC32, I think we should terminate the program in case of Lua failing to allocate memory.
I am not against that. But then it must be done before this patch, because otherwise there is a bug. I can live with names which I don't like, or some sort of code duplication, or other non-critical compromises, but can't upvote the bug. If you really-really don't want to fix it, I totally understand. Then you can merge it if you don't think there are more useful comments from me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced the table with a singly-linked list. Hope it addresses your concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not fix the entire problem, because, for instance, self._conn._transport.watch(state.key) can also raise an exception. Because it calls netbox_watch_or_unwatch which uses luamp_error. But I rethought my approach and decided that I should not care that much. Besides, I don't know how to fix this in a good way, so I can't recommend anything except C.
Part of tarantool#6257 This commit introduces a C module that will later be used for implementation of the watchers API in both Lua and IPROTO. Key points: - A watcher callback can be registered for an arbitrary string key with box_register_watcher(). - Watcher callbacks are invoked in the background by the worker fiber. If a watcher is asynchronous (WATCHER_RUN_ASYNC flag set), the worker fiber will invoke the callback in a new fiber. - A newly registered watcher callback is scheduled for execution unconditionally after registration, and then whenever box_broadcast() is called for the specified key. - The caller may pass arbitrary data to box_broadcast() which will be stored internally and passed to registered callbacks. - A callback is not executed again until it acknowledges the last notification (explicitly by calling watcher_ack() or implicitly by returning if WATCHER_EXPLICIT_ACK is unset).
Part of tarantool#6257 @TarantoolBot document Title: Document box.watch and box.broadcast `box.watch(key, func)` registers a watcher for the given key and returns a watcher handle, which can be used to unregister the watcher (by calling the `unregister` method). A key is an arbitrary string. It's possible to register more than one watcher for the same key. Note, garbage collection of a watcher handle doesnt result in unregistering the watcher so it's okay to discard the result of `box.watch` if the watcher is never going to be unregistered. `box.broadcast(key, value)` updates the value of the given key and signals all watchers registered for it. A watcher callback is first invoked unconditionally after the watcher registration. Subsequent invocations are triggered by `box.broadcast()` called on the local host. A watcher callback is passed the name of the key the watcher was subscribed to and the current key value. A watcher callback is always executed in a new fiber so it's okay to yield inside it. A watcher callback never runs in parallel with itself: if the key to which a watcher is subscribed is updated while the watcher callback is running, the callback will be invoked again with the new value as soon as it returns. `box.watch` and `box.broadcast` may be used before `box.cfg`. Example usage: ```lua -- Broadcast value 123 for key 'foo'. box.broadcast('foo', 123) -- Subscribe to updates of key 'foo'. w = box.watch('foo', function(key, value) assert(key == 'foo') -- do something with value end) -- Unregister the watcher when it's no longer needed. w:unregister() ```
Part of tarantool#6257 @TarantoolBot document Title: Document IPROTO watchers There are three new commands to support asynchronous server->client notifications signaled with `box.broadcast()`: - `IPROTO_WATCH` (code 74). Registers a new watcher for the given notification key or acknowledges a notification if a watcher is already registered. The key name is passed in `IPROTO_EVENT_KEY` (code 0x56). The watcher will be notified unconditionally after registration and then every time the key is updated with `box.broadcast()` provided the last notification was acknowledged. The server doesn't reply to the request unless it fails to parse the packet. - `IPROTO_UNWATCH` (code 75). Unregisters a watcher registered for the given notification key. The key name is passed in `IPROTO_EVENT_KEY` (code 0x56). A server doesn't reply to the request unless it fails to parse the packet. - `IPROTO_EVENT` (code 76). Sent by the server to notify a client about a key update. The key name is passed in `IPROTO_EVENT_KEY` (code 0x56). The key data (optional) is passed in `IPROTO_EVENT_DATA` (code 0x57). When a connection is closed, all watchers registered for it are unregistered. Servers that support the new feature set the `IPROTO_FEATURE_WATCHERS` feature bit (bit 3) in reply to the `IPROTO_ID` command.
Part of tarantool#6257 @TarantoolBot document Title: Document net.box watchers Using the new `watch` method of a net.box connection, one can subscribe to events broadcasted by a remote host. The method has the same syntax as the `box.watch()` function, which is used for subscribing to events locally. It takes a key name (string) to subscribe to and a callback to invoke when the key value is updated. It returns a watcher handle that can be used to unregister the watcher. Note, garbage collection of a watcher handle doesnt result in unregistering the watcher so it's okay to discard the result of `box.watch` if the watcher is never going to be unregistered. A watcher callback is first invoked unconditionally after the watcher registration. Subsequent invocations are triggered by `box.broadcast()` called on the remote host. A watcher callback is passed the name of the key the watcher was subscribed to and the current key value. A watcher callback is always executed in a new fiber so it's okay to yield inside it. A watcher callback never runs in parallel with itself: if the key to which a watcher is subscribed is updated while the watcher callback is running, the callback will be invoked again with the new value as soon as it returns. Watchers survive reconnect (see `reconnect_after` connection option): all registered watchers are automatically resubscribed as soon as the connection is reestablished. If a remote host supports watchers, the 'watchers' key will be set in connection's `peer_protocol_features`. Example usage: * Server: ```lua -- Broadcast value 123 for key 'foo'. box.broadcast('foo', 123) ``` * Client: ```lua conn = net.box.connect(URI) -- Subscribe to updates of key 'foo'. w = conn:watch('foo', function(key, value) assert(key == 'foo') -- do something with value end) -- Unregister the watcher when it's no longer needed. w:unregister() ```
Gerold103 left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved, but there is a known problem in case a watcher after run won't be able to allocate memory. Then it seems it will be stuck until connection recreation. Don't know how to fix it in Lua in a good way.
A watcher is a callback invoked on a state change. A state is a key-value pair stored internally. A key is a string and a value is anything that can be encoded as msgpack. To update a state, use
box.broadcast(key, value). To create a local watcher, usebox.watch(key, func). To create a remote watcher, useconn:watch(key, func), whereconnis a net.box connection. Thewatch()function returns a watcher handle, which can be used to unregister the watcher if it's no longer needed by callingw:unregister()A watcher callback is passed the key for which it was registered and the current key data. A watcher callback is always invoked in a new fiber so it's okay to yield in it. A watcher callback is never executed in parallel with itself: if the key is updated while the callback is running, it will be invoked with the new value as soon as it returns.
Closes #6257