Skip to content

Commit f724229

Browse files
committed
add skynet.watch for skynet.call, read http://blog.codingnow.com/2013/12/skynet_monitor.html
1 parent 340148c commit f724229

File tree

6 files changed

+128
-22
lines changed

6 files changed

+128
-22
lines changed

lualib/skynet.lua

Lines changed: 80 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,56 @@ local session_coroutine_address = {}
2626
local wakeup_session = {}
2727
local sleep_session = {}
2828

29+
local watching_service = {}
30+
local watching_session = {}
31+
local exit_queue = {}
32+
33+
-- suspend is function
34+
local suspend
35+
2936
local trace_handle
3037
local trace_func = function() end
3138

39+
local function string_to_handle(str)
40+
return tonumber("0x" .. string.sub(str , 2))
41+
end
42+
43+
----- monitor exit
44+
45+
local function dispatch_exit()
46+
local session = table.remove(exit_queue,1)
47+
if session then
48+
local co = session_id_coroutine[session]
49+
session_id_coroutine[session] = nil
50+
return suspend(co, coroutine.resume(co, false))
51+
end
52+
end
53+
54+
local function _exit_dispatch(session, monitor, service)
55+
-- Don't remove from watching_service , because user may call dead service
56+
watching_service[service] = false
57+
for session, srv in pairs(watching_session) do
58+
if srv == service then
59+
table.insert(exit_queue, session)
60+
end
61+
end
62+
end
63+
64+
local watch_monitor
65+
66+
function skynet.watch(service)
67+
assert(type(service) == "number")
68+
if watch_monitor == nil then
69+
watch_monitor = string_to_handle(c.command("MONITOR"))
70+
assert(watch_monitor, "Need a monitor")
71+
end
72+
if watching_service[service] == nil then
73+
watching_service[service] = true
74+
-- read lualib/simplemonitor.lua
75+
assert(skynet.call(watch_monitor, "lua", "WATCH", service), "watch a dead service")
76+
end
77+
end
78+
3279
-- coroutine reuse
3380

3481
local coroutine_pool = {}
@@ -52,9 +99,6 @@ local function co_create(f)
5299
return co
53100
end
54101

55-
-- suspend is function
56-
local suspend
57-
58102
local function dispatch_wakeup()
59103
local co = next(wakeup_session)
60104
if co then
@@ -111,6 +155,7 @@ function suspend(co, result, command, param, size)
111155
end
112156
trace_count()
113157
dispatch_wakeup()
158+
dispatch_exit()
114159
end
115160

116161
function skynet.timeout(ti, func)
@@ -155,10 +200,6 @@ function skynet.name(name, handle)
155200
c.command("NAME", name .. " " .. handle)
156201
end
157202

158-
local function string_to_handle(str)
159-
return tonumber("0x" .. string.sub(str , 2))
160-
end
161-
162203
local self_handle
163204
function skynet.self()
164205
if self_handle then
@@ -210,6 +251,9 @@ end
210251

211252
function skynet.send(addr, typename, ...)
212253
local p = proto[typename]
254+
if watching_service[addr] == false then
255+
error("Service is dead")
256+
end
213257
return c.send(addr, p.id, 0 , p.pack(...))
214258
end
215259

@@ -231,10 +275,21 @@ skynet.pack = assert(c.pack)
231275
skynet.unpack = assert(c.unpack)
232276
skynet.tostring = assert(c.tostring)
233277

278+
local function yield_call(service, session)
279+
watching_session[session] = service
280+
local succ, msg, sz = coroutine_yield("CALL", session)
281+
watching_session[session] = nil
282+
assert(succ, "Service is dead")
283+
return msg,sz
284+
end
285+
234286
function skynet.call(addr, typename, ...)
235287
local p = proto[typename]
236-
local session = c.send(addr, p.id , nil , p.pack(...))
237-
return p.unpack(coroutine_yield("CALL", assert(session, "call to invalid address")))
288+
if watching_service[addr] == false then
289+
error("Service is dead")
290+
end
291+
local session = assert(c.send(addr, p.id , nil , p.pack(...)),"call to invalid address")
292+
return p.unpack(yield_call(addr, session))
238293
end
239294

240295
function skynet.blockcall(addr, typename , ...)
@@ -245,13 +300,13 @@ function skynet.blockcall(addr, typename , ...)
245300
c.command("UNLOCK")
246301
error("call to invalid address")
247302
end
248-
return p.unpack(coroutine_yield("CALL", session))
303+
return p.unpack(yield_call(addr, session))
249304
end
250305

251306
function skynet.rawcall(addr, typename, msg, sz)
252307
local p = proto[typename]
253-
local session = c.send(addr, p.id , nil , msg, sz)
254-
return coroutine_yield("CALL", assert(session, "call to invalid address"))
308+
local session = assert(c.send(addr, p.id , nil , msg, sz), "call to invalid address")
309+
return yield_call(addr, session)
255310
end
256311

257312
function skynet.ret(msg, sz)
@@ -306,7 +361,7 @@ local function raw_dispatch_message(prototype, msg, sz, session, source, ...)
306361
else
307362
c.trace_switch(trace_handle, session)
308363
session_id_coroutine[session] = nil
309-
suspend(co, coroutine.resume(co, msg, sz))
364+
suspend(co, coroutine.resume(co, true, msg, sz))
310365
end
311366
else
312367
local p = assert(proto[prototype], prototype)
@@ -419,7 +474,6 @@ do
419474
local remote_call_func = setmetatable({}, weak_meta)
420475

421476
local _send = assert(c.send)
422-
local _yield = coroutine.yield
423477
local _pack = assert(c.pack)
424478
local _unpack = assert(c.unpack)
425479
local _local = skynet.self()
@@ -431,7 +485,7 @@ do
431485
local addr = remote_query(t.__remote)
432486
-- the proto is 11 (lua is 10)
433487
local session = assert(_send(addr, 11 , nil, _pack(t,method,...)), "call to invalid address")
434-
local msg, sz = _yield("CALL", session)
488+
local msg, sz = coroutine_yield(session)
435489
return select(2,assert(_unpack(msg,sz)))
436490
end
437491
remote_call_func[method] = f
@@ -463,13 +517,13 @@ do
463517

464518
local function remote_call(obj, method, ...)
465519
if type(obj) ~= "table" or type(method) ~= "string" then
466-
return _yield("RETURN", _pack(false, "Invalid call"))
520+
return coroutine_yield("RETURN", _pack(false, "Invalid call"))
467521
end
468522
local f = obj[method]
469523
if type(f) ~= "function" then
470-
return _yield("RETURN", _pack(false, "Object has not method " .. method))
524+
return coroutine_yield("RETURN", _pack(false, "Object has not method " .. method))
471525
end
472-
return _yield("RETURN", _pack(pcall(f,...)))
526+
return coroutine_yield("RETURN", _pack(pcall(f,...)))
473527
end
474528

475529
function skynet.remote_root()
@@ -575,6 +629,14 @@ do
575629
unpack = skynet.unpack,
576630
dispatch = _debug_dispatch,
577631
}
632+
633+
REG {
634+
name = "exit",
635+
id = 7,
636+
pack = skynet.pack,
637+
unpack = skynet.unpack,
638+
dispatch = _exit_dispatch,
639+
}
578640
end
579641

580642
local init_func = {}

service/pingserver.lua

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ function command.HELLO()
1111
skynet.ret(skynet.pack("hello"))
1212
end
1313

14+
function command.EXIT()
15+
skynet.exit()
16+
end
17+
1418
skynet.start(function()
1519
skynet.dispatch("lua", function(session,addr, cmd, ...)
1620
command[cmd](...)

service/simplemonitor.lua

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,39 @@ local skynet = require "skynet"
22

33
-- It's a simple service exit monitor, you can do something more when a service exit.
44

5+
local service_map = {}
6+
57
skynet.register_protocol {
68
name = "client",
79
id = 3,
810
unpack = function() end,
911
dispatch = function(_, address)
12+
local w = service_map[address]
13+
if w then
14+
for watcher in pairs(w) do
15+
skynet.send(watcher, "exit", address)
16+
end
17+
service_map[address] = false
18+
end
1019
print(string.format("[:%x] exit", address))
1120
end
1221
}
1322

14-
skynet.start(function() end)
23+
local function monitor(session, watcher, command, service)
24+
assert(command, "WATCH")
25+
local w = service_map[service]
26+
if not w then
27+
if w == false then
28+
skynet.ret(skynet.pack(false))
29+
return
30+
end
31+
w = {}
32+
service_map[service] = w
33+
end
34+
w[watcher] = true
35+
skynet.ret(skynet.pack(true))
36+
end
37+
38+
skynet.start(function()
39+
skynet.dispatch("lua", monitor)
40+
end)

service/testping.lua

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
local skynet = require "skynet"
2+
3+
skynet.start(function()
4+
local ps = skynet.uniqueservice("pingserver")
5+
skynet.watch(ps)
6+
print(skynet.call(ps, "lua", "PING", "hello"))
7+
skynet.send(ps, "lua", "EXIT")
8+
print(skynet.call(ps, "lua", "PING", "hay"))
9+
end)

skynet-src/skynet.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
#define PTYPE_SYSTEM 4
1212
#define PTYPE_HARBOR 5
1313
#define PTYPE_SOCKET 6
14-
// don't use these id
15-
#define PTYPE_RESERVED_0 7
14+
// read lualib/skynet.lua lualib/simplemonitor.lua
15+
#define PTYPE_EXIT 7
1616
// read lualib/skynet.lua lualib/mqueue.lua
1717
#define PTYPE_RESERVED_QUEUE 8
1818
#define PTYPE_RESERVED_DEBUG 9

skynet-src/skynet_server.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,12 @@ skynet_command(struct skynet_context * context, const char * cmd , const char *
540540
if (strcmp(cmd,"MONITOR") == 0) {
541541
uint32_t handle=0;
542542
if (param == NULL || param[0] == '\0') {
543-
handle = context->handle;
543+
if (G_NODE.monitor_exit) {
544+
// return current monitor serivce
545+
sprintf(context->result, ":%x", G_NODE.monitor_exit);
546+
return context->result;
547+
}
548+
return NULL;
544549
} else {
545550
if (param[0] == ':') {
546551
handle = strtoul(param+1, NULL, 16);

0 commit comments

Comments
 (0)