Skip to content
143 changes: 98 additions & 45 deletions apisix/discovery/nacos/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ local core = require('apisix.core')
local ipairs = ipairs
local pairs = pairs
local type = type
local math = math
local math_random = math.random
local ngx = ngx
local ngx_re = require('ngx.re')
Expand Down Expand Up @@ -164,10 +163,7 @@ local function get_signed_param(group_name, service_name)
end


local function get_base_uri()
local host = local_conf.discovery.nacos.host
-- TODO Add health check to get healthy nodes.
local url = host[math_random(#host)]
local function build_base_uri(url)
local auth_idx = core.string.rfind_char(url, '@')
local username, password
if auth_idx then
Expand Down Expand Up @@ -195,6 +191,18 @@ local function get_base_uri()
end


local function get_base_uri_by_index(index)
local host = local_conf.discovery.nacos.host

local url = host[index]
if not url then
return nil
end

return build_base_uri(url)
end


local function de_duplication(services, namespace_id, group_name, service_name, scheme)
for _, service in ipairs(services) do
if service.namespace_id == namespace_id and service.group_name == group_name
Expand Down Expand Up @@ -277,69 +285,114 @@ local function is_grpc(scheme)
end

local curr_service_in_use = {}
local function fetch_full_registry(premature)
if premature then
return
end

local base_uri, username, password = get_base_uri()

local function fetch_from_host(base_uri, username, password, services)
local token_param, err = get_token_param(base_uri, username, password)
if err then
log.error('get_token_param error:', err)
return
return false, err
end

local infos = get_nacos_services()
if #infos == 0 then
return
end
local service_names = {}
for _, service_info in ipairs(infos) do
local data, err
local nodes_cache = {}
local had_success = false

for _, service_info in ipairs(services) do
local namespace_id = service_info.namespace_id
local group_name = service_info.group_name
local scheme = service_info.scheme or ''
local namespace_param = get_namespace_param(service_info.namespace_id)
local group_name_param = get_group_name_param(service_info.group_name)
local signature_param = get_signed_param(service_info.group_name, service_info.service_name)
local namespace_param = get_namespace_param(namespace_id)
local group_name_param = get_group_name_param(group_name)
local signature_param = get_signed_param(group_name, service_info.service_name)
local query_path = instance_list_path .. service_info.service_name
.. token_param .. namespace_param .. group_name_param
.. signature_param
data, err = get_url(base_uri, query_path)
if err then
log.error('get_url:', query_path, ' err:', err)
goto CONTINUE
end
local data, req_err = get_url(base_uri, query_path)
if req_err then
log.error('failed to fetch instances for service [', service_info.service_name,
'] from ', base_uri, ', error: ', req_err)
else
had_success = true

local key = get_key(namespace_id, group_name, service_info.service_name)
service_names[key] = true

local nodes = {}
local key = get_key(namespace_id, group_name, service_info.service_name)
service_names[key] = true
for _, host in ipairs(data.hosts) do
local node = {
host = host.ip,
port = host.port,
weight = host.weight or default_weight,
}
-- docs: https://github.com/yidongnan/grpc-spring-boot-starter/pull/496
if is_grpc(scheme) and host.metadata and host.metadata.gRPC_port then
node.port = host.metadata.gRPC_port
local hosts = data.hosts
if type(hosts) ~= 'table' then
hosts = {}
end

core.table.insert(nodes, node)
end
if #nodes > 0 then
local content = core.json.encode(nodes)
nacos_dict:set(key, content)
local nodes = {}
for _, host in ipairs(hosts) do
local node = {
host = host.ip,
port = host.port,
weight = host.weight or default_weight,
}
-- docs: https://github.com/yidongnan/grpc-spring-boot-starter/pull/496
if is_grpc(scheme) and host.metadata and host.metadata.gRPC_port then
node.port = host.metadata.gRPC_port
end

core.table.insert(nodes, node)
end

if #nodes > 0 then
nodes_cache[key] = nodes
end
end
::CONTINUE::
end
-- remove services that are not in use anymore

if not had_success then
return false, 'all nacos services fetch failed'
end

for key, nodes in pairs(nodes_cache) do
local content = core.json.encode(nodes)
nacos_dict:set(key, content)
end

for key, _ in pairs(curr_service_in_use) do
if not service_names[key] then
nacos_dict:delete(key)
end
end

curr_service_in_use = service_names
return true
end


local function fetch_full_registry(premature)
if premature then
return
end

local infos = get_nacos_services()
if #infos == 0 then
return
end

local host_list = local_conf.discovery.nacos.host
local host_count = #host_list
local start = math_random(host_count)

for i = 0, host_count - 1 do
local idx = (start + i - 1) % host_count + 1
local base_uri, username, password = get_base_uri_by_index(idx)

if not base_uri then
log.warn('nacos host at index ', idx, ' is invalid, skip')
else
local ok, err = fetch_from_host(base_uri, username, password, infos)
if ok then
return
end
log.error('fetch_from_host: ', base_uri, ' err:', err)
end
end

log.error('failed to fetch nacos registry from all hosts')
end


Expand Down
48 changes: 47 additions & 1 deletion t/discovery/nacos2.t
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ GET /hello
--- response_body_like eval
qr/server [1-2]/
--- error_log
err:status = 502
error: status = 502



Expand Down Expand Up @@ -340,3 +340,49 @@ discovery:
}
--- response_body
2



=== TEST 6: fallback to next nacos host when current host fails
--- yaml_config
apisix:
node_listen: 1984
deployment:
role: data_plane
role_data_plane:
config_provider: yaml
discovery:
nacos:
host:
- "http://127.0.0.1:20998"
- "http://127.0.0.1:8858"
prefix: "/nacos/v1/"
fetch_interval: 1
weight: 1
timeout:
connect: 2000
send: 2000
read: 5000
--- apisix_yaml
routes:
-
uri: /hello
upstream:
service_name: APISIX-NACOS
discovery_type: nacos
type: roundrobin
#END
--- http_config
server {
listen 20998;

location / {
return 502;
}
}
--- request
GET /hello
--- response_body_like eval
qr/server [1-2]/
--- error_log
fetch_from_host: http://127.0.0.1:20998/nacos/v1/ err:all nacos services fetch failed
Loading