Skynet 集群通信详解

深入讲解 Skynet 集群通信机制,包括集群架构设计、节点注册发现、跨节点通信、负载均衡和故障处理

Skynet 的集群模块允许多个 Skynet 节点组成分布式系统,实现跨节点的服务调用和消息传递。本教程将从架构设计到实战应用,全面讲解 Skynet 集群通信机制。

集群架构概述

为什么需要集群

单个 Skynet 节点的处理能力受限于单台机器的硬件资源。当业务规模扩大时,需要通过集群实现:

  1. 水平扩展:通过增加节点提升系统处理能力
  2. 高可用:单节点故障不影响整体服务
  3. 就近部署:将服务部署在靠近用户的节点上
  4. 业务隔离:将不同业务部署在不同节点上

集群拓扑结构

Skynet 集群采用对等网络(P2P)架构,每个节点都可以与其他节点通信:

┌─────────────────────────────────────────┐
│              Skynet 集群                 │
│                                          │
│  ┌──────────┐    ┌──────────┐           │
│  │  Node 1  │◄──►│  Node 2  │           │
│  │ :7001    │    │ :7002    │           │
│  └────┬─────┘    └────┬─────┘           │
│       │               │                  │
│       │   ┌──────────┐│                  │
│       └──►│  Node 3  │◄┘                 │
│           │ :7003    │                   │
│           └──────────┘                   │
└─────────────────────────────────────────┘

集群地址格式

在集群中,服务地址由高 8 位(节点 ID)和低 24 位(服务 ID)组成:

集群地址: 0xNNNNNNNN
         ││  │
         ││  └── 低 24 位:服务 ID
         │└───── 高 8 位:节点 ID
         └────── 节点 0 表示本地节点

集群配置

配置文件

在 Skynet 配置文件中添加集群相关配置:

-- config.node1

-- 基本配置
thread = 8
start = "main"
luaservice = "./service/?.lua;./skynet/service/?.lua"
lua_path = "./lualib/?.lua;./skynet/lualib/?.lua"
lua_cpath = "./luaclib/?.so;./skynet/luaclib/?.so"

-- 集群配置
cluster = "config.cluster"  -- 集群配置文件路径
node = "node1"              -- 当前节点名称

集群配置文件

-- config.cluster

-- 节点名称 → 地址:端口
node1 = "127.0.0.1:7001"
node2 = "127.0.0.1:7002"
node3 = "127.0.0.1:7003"

集群 API 详解

cluster.open(port)

开启当前节点的集群监听端口:

local skynet = require "skynet"
local cluster = require "skynet.cluster"

skynet.start(function()
    -- 开启集群端口
    cluster.open(7001)
    skynet.error("集群端口 7001 已开启")
end)

cluster.reload(config)

动态更新集群配置:

local skynet = require "skynet"
local cluster = require "skynet.cluster"

skynet.start(function()
    -- 初始配置
    cluster.reload({
        node1 = "192.168.1.101:7001",
        node2 = "192.168.1.102:7002",
    })
    
    -- 后续可以动态添加节点
    cluster.reload({
        node1 = "192.168.1.101:7001",
        node2 = "192.168.1.102:7002",
        node3 = "192.168.1.103:7003",  -- 新增节点
    })
end)

cluster.call(node, addr, …)

调用远程节点的服务(同步):

local skynet = require "skynet"
local cluster = require "skynet.cluster"

skynet.start(function()
    -- 调用远程节点上的服务
    local result = cluster.call("node2", ".user_service", "lua", "get_user", 1001)
    skynet.error("远程用户信息:", result)
    
    -- 使用服务地址调用
    local service_addr = 0x01000005
    local data = cluster.call("node2", service_addr, "lua", "query")
end)

参数

  • node:目标节点名称
  • addr:服务地址或别名
  • ...:消息类型和消息内容

返回值

  • 远程服务的响应结果

cluster.send(node, addr, …)

发送消息到远程节点(异步,不等待响应):

local skynet = require "skynet"
local cluster = require "skynet.cluster"

skynet.start(function()
    -- 异步发送消息
    cluster.send("node3", ".logger", "lua", "log", "hello from node1")
    
    -- 不等待响应,立即继续执行
    skynet.error("消息已发送")
end)

节点注册与发现

服务注册中心

构建一个简单的服务注册中心:

-- service/registry.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"

-- 注册表:node -> service_name -> address
local registry = {}

local CMD = {}

function CMD.register(node, service_name, address)
    if not registry[node] then
        registry[node] = {}
    end
    registry[node][service_name] = address
    skynet.error(string.format("注册服务: %s/%s -> :%08x", 
        node, service_name, address))
    return true
end

function CMD.unregister(node, service_name)
    if registry[node] then
        registry[node][service_name] = nil
        skynet.error(string.format("注销服务: %s/%s", node, service_name))
    end
    return true
end

function CMD.lookup(node, service_name)
    if registry[node] then
        return registry[node][service_name]
    end
    return nil
end

function CMD.list_nodes()
    local nodes = {}
    for node in pairs(registry) do
        nodes[#nodes + 1] = node
    end
    return nodes
end

function CMD.list_services(node)
    return registry[node] or {}
end

skynet.start(function()
    skynet.error("服务注册中心启动")
    
    -- 注册为全局服务
    skynet.register(".registry")
    
    -- 开启集群端口
    cluster.open(7000)
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd], "Unknown command: " .. cmd)
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

节点启动时自动注册

-- service/node_startup.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"

local node_name = ...  -- 从启动参数获取节点名

skynet.start(function()
    -- 开启集群端口
    local port = 7000 + tonumber(string.sub(node_name, 5))
    cluster.open(port)
    
    -- 向注册中心注册本节点的服务
    local registry = ".registry"
    
    -- 注册本节点的关键服务
    local services = {
        ".user_service",
        ".game_service",
        ".chat_service",
    }
    
    for _, name in ipairs(services) do
        local addr = skynet.localname(name)
        if addr then
            cluster.send("master", registry, "lua", "register", 
                node_name, name, addr)
        end
    end
    
    skynet.error(string.format("节点 %s 注册完成", node_name))
end)

跨节点通信实战

示例:跨节点用户查询

用户服务(Node 1)

-- service/user_service.lua
local skynet = require "skynet"

-- 模拟用户数据库
local users = {
    [1001] = {id = 1001, name = "张三", level = 50, gold = 10000},
    [1002] = {id = 1002, name = "李四", level = 30, gold = 5000},
    [1003] = {id = 1003, name = "王五", level = 80, gold = 20000},
}

local CMD = {}

function CMD.get_user(user_id)
    return users[user_id]
end

function CMD.update_user(user_id, field, value)
    if users[user_id] then
        users[user_id][field] = value
        return true
    end
    return false
end

function CMD.list_users()
    local list = {}
    for id, user in pairs(users) do
        list[#list + 1] = user
    end
    return list
end

skynet.start(function()
    skynet.register(".user_service")
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

游戏服务(Node 2)

-- service/game_service.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"

local CMD = {}

function CMD.enter_game(user_id)
    -- 跨节点调用:查询用户信息
    local user = cluster.call("node1", ".user_service", "lua", "get_user", user_id)
    
    if not user then
        return false, "用户不存在"
    end
    
    skynet.error(string.format("玩家 %s(%s) 进入游戏", user.name, user_id))
    
    -- 更新用户状态
    cluster.send("node1", ".user_service", "lua", 
        "update_user", user_id, "online", true)
    
    return true, user
end

function CMD.leave_game(user_id)
    cluster.send("node1", ".user_service", "lua", 
        "update_user", user_id, "online", false)
    
    skynet.error(string.format("玩家 %s 离开游戏", user_id))
    return true
end

function CMD.get_leaderboard()
    -- 跨节点获取所有用户
    local users = cluster.call("node1", ".user_service", "lua", "list_users")
    
    -- 按等级排序
    table.sort(users, function(a, b) return a.level > b.level end)
    
    -- 返回前 10 名
    local top = {}
    for i = 1, math.min(10, #users) do
        top[i] = {
            rank = i,
            name = users[i].name,
            level = users[i].level
        }
    end
    
    return top
end

skynet.start(function()
    skynet.register(".game_service")
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

聊天服务(Node 3)

-- service/chat_service.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"

local online_users = {}  -- user_id -> node

local CMD = {}

function CMD.user_online(user_id, node)
    online_users[user_id] = node
    skynet.error(string.format("用户 %s 上线 (节点: %s)", user_id, node))
end

function CMD.user_offline(user_id)
    online_users[user_id] = nil
    skynet.error(string.format("用户 %s 下线", user_id))
end

function CMD.send_message(from_id, to_id, message)
    -- 获取发送者信息(跨节点)
    local from_user = cluster.call("node1", ".user_service", "lua", 
        "get_user", from_id)
    
    if not from_user then
        return false, "发送者不存在"
    end
    
    local target_node = online_users[to_id]
    if not target_node then
        return false, "接收者不在线"
    end
    
    -- 跨节点转发消息
    cluster.send(target_node, ".chat_service", "lua", 
        "deliver_message", from_user.name, to_id, message)
    
    return true
end

function CMD.deliver_message(from_name, to_id, message)
    skynet.error(string.format("[聊天] %s -> %s: %s", 
        from_name, to_id, message))
    -- 这里可以将消息推送给客户端
end

function CMD.broadcast(message)
    -- 向所有在线用户广播
    for user_id, node in pairs(online_users) do
        cluster.send(node, ".chat_service", "lua", 
            "deliver_message", "系统", user_id, message)
    end
end

skynet.start(function()
    skynet.register(".chat_service")
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

集群代理模式

通过本地代理服务封装远程调用,简化业务代码:

-- lualib/cluster_proxy.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"

local ClusterProxy = {}
ClusterProxy.__index = ClusterProxy

function ClusterProxy.new(node, service_name)
    return setmetatable({
        node = node,
        service = service_name,
        cache = {},
    }, ClusterProxy)
end

function ClusterProxy:call(cmd, ...)
    return cluster.call(self.node, self.service, "lua", cmd, ...)
end

function ClusterProxy:send(cmd, ...)
    cluster.send(self.node, self.service, "lua", cmd, ...)
end

-- 带缓存的查询
function ClusterProxy:cached_call(cmd, cache_key, ttl, ...)
    local cached = self.cache[cache_key]
    if cached and skynet.time() - cached.time < ttl then
        return cached.data
    end
    
    local result = self:call(cmd, ...)
    self.cache[cache_key] = {
        data = result,
        time = skynet.time()
    }
    return result
end

-- 带重试的调用
function ClusterProxy:call_with_retry(cmd, max_retry, ...)
    local retry = 0
    while retry < max_retry do
        local ok, result = pcall(cluster.call, 
            self.node, self.service, "lua", cmd, ...)
        if ok then
            return result
        end
        retry = retry + 1
        skynet.error(string.format("集群调用失败,重试 %d/%d: %s", 
            retry, max_retry, result))
        skynet.sleep(100)  -- 等待 1 秒
    end
    error(string.format("集群调用失败(已重试 %d 次): %s/%s %s", 
        max_retry, self.node, self.service, cmd))
end

return ClusterProxy

使用代理

-- service/my_service.lua
local skynet = require "skynet"
local ClusterProxy = require "cluster_proxy"

local user_proxy = ClusterProxy.new("node1", ".user_service")
local game_proxy = ClusterProxy.new("node2", ".game_service")

local CMD = {}

function CMD.get_player_info(user_id)
    -- 使用代理调用,代码更简洁
    local user = user_proxy:cached_call("get_user", "user_" .. user_id, 60, user_id)
    return user
end

function CMD.enter_game(user_id)
    -- 带重试的调用
    return game_proxy:call_with_retry("enter_game", 3, user_id)
end

skynet.start(function()
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

负载均衡

简单轮询负载均衡

-- lualib/load_balancer.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"

local LoadBalancer = {}
LoadBalancer.__index = LoadBalancer

function LoadBalancer.new(nodes, service_name)
    return setmetatable({
        nodes = nodes,
        service = service_name,
        current = 0,
    }, LoadBalancer)
end

function LoadBalancer:next_node()
    self.current = (self.current % #self.nodes) + 1
    return self.nodes[self.current]
end

function LoadBalancer:call(cmd, ...)
    local node = self:next_node()
    return cluster.call(node, self.service, "lua", cmd, ...)
end

function LoadBalancer:send(cmd, ...)
    local node = self:next_node()
    cluster.send(node, self.service, "lua", cmd, ...)
end

return LoadBalancer

-- 使用示例
local LB = require "load_balancer"
local game_lb = LB.new({"node1", "node2", "node3"}, ".game_service")

-- 请求会自动分发到不同节点
for i = 1, 10 do
    game_lb:send("process", "task_" .. i)
end

基于负载的负载均衡

-- lualib/smart_balancer.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"

local SmartBalancer = {}
SmartBalancer.__index = SmartBalancer

function SmartBalancer.new(nodes, service_name)
    return setmetatable({
        nodes = nodes,
        service = service_name,
        loads = {},  -- node -> load
    }, SmartBalancer)
end

function SmartBalancer:update_loads()
    for _, node in ipairs(self.nodes) do
        local ok, load = pcall(cluster.call, node, ".monitor", "lua", "get_load")
        if ok then
            self.loads[node] = load
        end
    end
end

function SmartBalancer:select_node()
    -- 选择负载最低的节点
    local min_load = math.huge
    local best_node = self.nodes[1]
    
    for _, node in ipairs(self.nodes) do
        local load = self.loads[node] or 0
        if load < min_load then
            min_load = load
            best_node = node
        end
    end
    
    return best_node
end

function SmartBalancer:call(cmd, ...)
    local node = self:select_node()
    return cluster.call(node, self.service, "lua", cmd, ...)
end

return SmartBalancer

故障处理

节点心跳检测

-- service/heartbeat.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"

local nodes = {...}  -- 需要监控的节点列表
local node_status = {}

local function check_node(node)
    local ok, result = pcall(cluster.call, node, ".monitor", "lua", "ping")
    if ok then
        if node_status[node] ~= "online" then
            node_status[node] = "online"
            skynet.error(string.format("节点 %s 上线", node))
        end
    else
        if node_status[node] ~= "offline" then
            node_status[node] = "offline"
            skynet.error(string.format("节点 %s 离线: %s", node, result))
        end
    end
end

skynet.start(function()
    -- 初始化所有节点状态
    for _, node in ipairs(nodes) do
        node_status[node] = "unknown"
    end
    
    -- 定期检查心跳
    skynet.fork(function()
        while true do
            for _, node in ipairs(nodes) do
                check_node(node)
            end
            skynet.sleep(500)  -- 每 5 秒检查一次
        end
    end)
end)

故障转移

-- lualib/failover.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"

local Failover = {}
Failover.__index = Failover

function Failover.new(primary, backup, service_name)
    return setmetatable({
        primary = primary,
        backup = backup,
        service = service_name,
        current = primary,
        failover_count = 0,
    }, Failover)
end

function Failover:call(cmd, ...)
    -- 尝试主节点
    local ok, result = pcall(cluster.call, 
        self.current, self.service, "lua", cmd, ...)
    
    if ok then
        self.failover_count = 0
        return result
    end
    
    -- 主节点失败,切换到备用节点
    skynet.error(string.format("主节点 %s 失败,切换到备用节点 %s", 
        self.current, self.backup))
    
    self.current = self.backup
    self.failover_count = self.failover_count + 1
    
    -- 尝试备用节点
    local ok, result = pcall(cluster.call, 
        self.current, self.service, "lua", cmd, ...)
    
    if ok then
        return result
    end
    
    error("所有节点均不可用")
end

return Failover

集群监控服务

-- service/cluster_monitor.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"

local stats = {
    call_count = 0,
    call_errors = 0,
    send_count = 0,
    latency_sum = 0,
}

local CMD = {}

function CMD.ping()
    return "pong"
end

function CMD.get_stats()
    return stats
end

function CMD.get_load()
    -- 返回当前服务的负载指标
    local info = skynet.info_func and skynet.info_func() or {}
    return {
        message_count = info.message or 0,
        memory = info.memory or 0,
    }
end

function CMD.report_call(success, latency)
    stats.call_count = stats.call_count + 1
    if not success then
        stats.call_errors = stats.call_errors + 1
    end
    stats.latency_sum = stats.latency_sum + latency
end

skynet.start(function()
    skynet.register(".cluster_monitor")
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

最佳实践

1. 合理划分服务

推荐的服务划分方式:

节点 1(登录节点):
  - 登录服务
  - 认证服务
  - 网关服务

节点 2(游戏逻辑节点):
  - 战斗服务
  - 场景服务
  - NPC 服务

节点 3(数据节点):
  - 用户数据服务
  - 排行榜服务
  - 存档服务

节点 4(社交节点):
  - 聊天服务
  - 好友服务
  - 邮件服务

2. 减少跨节点调用

-- 不好:频繁跨节点调用
for i = 1, 100 do
    cluster.call("node2", ".data", "lua", "get", i)
end

-- 好:批量请求
local result = cluster.call("node2", ".data", "lua", "batch_get", {1, 2, ..., 100})

3. 使用本地缓存

-- 缓存远程数据
local cache = {}
local cache_ttl = 60  -- 60 秒过期

function get_user(user_id)
    local cached = cache[user_id]
    if cached and skynet.time() - cached.time < cache_ttl then
        return cached.data
    end
    
    local user = cluster.call("data_node", ".user_service", "lua", "get_user", user_id)
    cache[user_id] = {data = user, time = skynet.time()}
    return user
end

总结

本教程详细介绍了 Skynet 集群通信的各个方面:

  1. 集群架构:对等网络、地址格式
  2. 集群配置:配置文件和动态更新
  3. 节点通信:call 和 send
  4. 服务注册:注册中心和自动注册
  5. 负载均衡:轮询和基于负载的策略
  6. 故障处理:心跳检测和故障转移

参考资料

  1. Skynet 集群文档:https://github.com/cloudwu/skynet/wiki/Cluster
  2. Skynet cluster 源码:lualib/skynet/cluster.lua
  3. 分布式系统设计原理

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页