Skynet 的集群模块允许多个 Skynet 节点组成分布式系统,实现跨节点的服务调用和消息传递。本教程将从架构设计到实战应用,全面讲解 Skynet 集群通信机制。
集群架构概述
为什么需要集群
单个 Skynet 节点的处理能力受限于单台机器的硬件资源。当业务规模扩大时,需要通过集群实现:
- 水平扩展:通过增加节点提升系统处理能力
- 高可用:单节点故障不影响整体服务
- 就近部署:将服务部署在靠近用户的节点上
- 业务隔离:将不同业务部署在不同节点上
集群拓扑结构
Skynet 集群采用对等网络(P2P)架构,每个节点都可以与其他节点通信:
┌─────────────────────────────────────────┐
│ Skynet 集群 │
│ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Node 1 │◄──►│ Node 2 │ │
│ │ :7001 │ │ :7002 │ │
│ └────┬─────┘ └────┬─────┘ │
│ │ │ │
│ │ ┌──────────┐│ │
│ └──►│ Node 3 │◄┘ │
│ │ :7003 │ │
│ └──────────┘ │
└─────────────────────────────────────────┘
集群地址格式
在集群中,服务地址由高 8 位(节点 ID)和低 24 位(服务 ID)组成:
集群地址: 0xNNNNNNNN
││ │
││ └── 低 24 位:服务 ID
│└───── 高 8 位:节点 ID
└────── 节点 0 表示本地节点
集群配置
配置文件
在 Skynet 配置文件中添加集群相关配置:
-- config.node1
-- 基本配置
thread = 8
start = "main"
luaservice = "./service/?.lua;./skynet/service/?.lua"
lua_path = "./lualib/?.lua;./skynet/lualib/?.lua"
lua_cpath = "./luaclib/?.so;./skynet/luaclib/?.so"
-- 集群配置
cluster = "config.cluster" -- 集群配置文件路径
node = "node1" -- 当前节点名称
集群配置文件
-- config.cluster
-- 节点名称 → 地址:端口
node1 = "127.0.0.1:7001"
node2 = "127.0.0.1:7002"
node3 = "127.0.0.1:7003"
集群 API 详解
cluster.open(port)
开启当前节点的集群监听端口:
local skynet = require "skynet"
local cluster = require "skynet.cluster"
skynet.start(function()
-- 开启集群端口
cluster.open(7001)
skynet.error("集群端口 7001 已开启")
end)
cluster.reload(config)
动态更新集群配置:
local skynet = require "skynet"
local cluster = require "skynet.cluster"
skynet.start(function()
-- 初始配置
cluster.reload({
node1 = "192.168.1.101:7001",
node2 = "192.168.1.102:7002",
})
-- 后续可以动态添加节点
cluster.reload({
node1 = "192.168.1.101:7001",
node2 = "192.168.1.102:7002",
node3 = "192.168.1.103:7003", -- 新增节点
})
end)
cluster.call(node, addr, …)
调用远程节点的服务(同步):
local skynet = require "skynet"
local cluster = require "skynet.cluster"
skynet.start(function()
-- 调用远程节点上的服务
local result = cluster.call("node2", ".user_service", "lua", "get_user", 1001)
skynet.error("远程用户信息:", result)
-- 使用服务地址调用
local service_addr = 0x01000005
local data = cluster.call("node2", service_addr, "lua", "query")
end)
参数:
node:目标节点名称addr:服务地址或别名...:消息类型和消息内容
返回值:
- 远程服务的响应结果
cluster.send(node, addr, …)
发送消息到远程节点(异步,不等待响应):
local skynet = require "skynet"
local cluster = require "skynet.cluster"
skynet.start(function()
-- 异步发送消息
cluster.send("node3", ".logger", "lua", "log", "hello from node1")
-- 不等待响应,立即继续执行
skynet.error("消息已发送")
end)
节点注册与发现
服务注册中心
构建一个简单的服务注册中心:
-- service/registry.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"
-- 注册表:node -> service_name -> address
local registry = {}
local CMD = {}
function CMD.register(node, service_name, address)
if not registry[node] then
registry[node] = {}
end
registry[node][service_name] = address
skynet.error(string.format("注册服务: %s/%s -> :%08x",
node, service_name, address))
return true
end
function CMD.unregister(node, service_name)
if registry[node] then
registry[node][service_name] = nil
skynet.error(string.format("注销服务: %s/%s", node, service_name))
end
return true
end
function CMD.lookup(node, service_name)
if registry[node] then
return registry[node][service_name]
end
return nil
end
function CMD.list_nodes()
local nodes = {}
for node in pairs(registry) do
nodes[#nodes + 1] = node
end
return nodes
end
function CMD.list_services(node)
return registry[node] or {}
end
skynet.start(function()
skynet.error("服务注册中心启动")
-- 注册为全局服务
skynet.register(".registry")
-- 开启集群端口
cluster.open(7000)
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd], "Unknown command: " .. cmd)
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
节点启动时自动注册
-- service/node_startup.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"
local node_name = ... -- 从启动参数获取节点名
skynet.start(function()
-- 开启集群端口
local port = 7000 + tonumber(string.sub(node_name, 5))
cluster.open(port)
-- 向注册中心注册本节点的服务
local registry = ".registry"
-- 注册本节点的关键服务
local services = {
".user_service",
".game_service",
".chat_service",
}
for _, name in ipairs(services) do
local addr = skynet.localname(name)
if addr then
cluster.send("master", registry, "lua", "register",
node_name, name, addr)
end
end
skynet.error(string.format("节点 %s 注册完成", node_name))
end)
跨节点通信实战
示例:跨节点用户查询
用户服务(Node 1)
-- service/user_service.lua
local skynet = require "skynet"
-- 模拟用户数据库
local users = {
[1001] = {id = 1001, name = "张三", level = 50, gold = 10000},
[1002] = {id = 1002, name = "李四", level = 30, gold = 5000},
[1003] = {id = 1003, name = "王五", level = 80, gold = 20000},
}
local CMD = {}
function CMD.get_user(user_id)
return users[user_id]
end
function CMD.update_user(user_id, field, value)
if users[user_id] then
users[user_id][field] = value
return true
end
return false
end
function CMD.list_users()
local list = {}
for id, user in pairs(users) do
list[#list + 1] = user
end
return list
end
skynet.start(function()
skynet.register(".user_service")
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
游戏服务(Node 2)
-- service/game_service.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"
local CMD = {}
function CMD.enter_game(user_id)
-- 跨节点调用:查询用户信息
local user = cluster.call("node1", ".user_service", "lua", "get_user", user_id)
if not user then
return false, "用户不存在"
end
skynet.error(string.format("玩家 %s(%s) 进入游戏", user.name, user_id))
-- 更新用户状态
cluster.send("node1", ".user_service", "lua",
"update_user", user_id, "online", true)
return true, user
end
function CMD.leave_game(user_id)
cluster.send("node1", ".user_service", "lua",
"update_user", user_id, "online", false)
skynet.error(string.format("玩家 %s 离开游戏", user_id))
return true
end
function CMD.get_leaderboard()
-- 跨节点获取所有用户
local users = cluster.call("node1", ".user_service", "lua", "list_users")
-- 按等级排序
table.sort(users, function(a, b) return a.level > b.level end)
-- 返回前 10 名
local top = {}
for i = 1, math.min(10, #users) do
top[i] = {
rank = i,
name = users[i].name,
level = users[i].level
}
end
return top
end
skynet.start(function()
skynet.register(".game_service")
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
聊天服务(Node 3)
-- service/chat_service.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"
local online_users = {} -- user_id -> node
local CMD = {}
function CMD.user_online(user_id, node)
online_users[user_id] = node
skynet.error(string.format("用户 %s 上线 (节点: %s)", user_id, node))
end
function CMD.user_offline(user_id)
online_users[user_id] = nil
skynet.error(string.format("用户 %s 下线", user_id))
end
function CMD.send_message(from_id, to_id, message)
-- 获取发送者信息(跨节点)
local from_user = cluster.call("node1", ".user_service", "lua",
"get_user", from_id)
if not from_user then
return false, "发送者不存在"
end
local target_node = online_users[to_id]
if not target_node then
return false, "接收者不在线"
end
-- 跨节点转发消息
cluster.send(target_node, ".chat_service", "lua",
"deliver_message", from_user.name, to_id, message)
return true
end
function CMD.deliver_message(from_name, to_id, message)
skynet.error(string.format("[聊天] %s -> %s: %s",
from_name, to_id, message))
-- 这里可以将消息推送给客户端
end
function CMD.broadcast(message)
-- 向所有在线用户广播
for user_id, node in pairs(online_users) do
cluster.send(node, ".chat_service", "lua",
"deliver_message", "系统", user_id, message)
end
end
skynet.start(function()
skynet.register(".chat_service")
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
集群代理模式
通过本地代理服务封装远程调用,简化业务代码:
-- lualib/cluster_proxy.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"
local ClusterProxy = {}
ClusterProxy.__index = ClusterProxy
function ClusterProxy.new(node, service_name)
return setmetatable({
node = node,
service = service_name,
cache = {},
}, ClusterProxy)
end
function ClusterProxy:call(cmd, ...)
return cluster.call(self.node, self.service, "lua", cmd, ...)
end
function ClusterProxy:send(cmd, ...)
cluster.send(self.node, self.service, "lua", cmd, ...)
end
-- 带缓存的查询
function ClusterProxy:cached_call(cmd, cache_key, ttl, ...)
local cached = self.cache[cache_key]
if cached and skynet.time() - cached.time < ttl then
return cached.data
end
local result = self:call(cmd, ...)
self.cache[cache_key] = {
data = result,
time = skynet.time()
}
return result
end
-- 带重试的调用
function ClusterProxy:call_with_retry(cmd, max_retry, ...)
local retry = 0
while retry < max_retry do
local ok, result = pcall(cluster.call,
self.node, self.service, "lua", cmd, ...)
if ok then
return result
end
retry = retry + 1
skynet.error(string.format("集群调用失败,重试 %d/%d: %s",
retry, max_retry, result))
skynet.sleep(100) -- 等待 1 秒
end
error(string.format("集群调用失败(已重试 %d 次): %s/%s %s",
max_retry, self.node, self.service, cmd))
end
return ClusterProxy
使用代理
-- service/my_service.lua
local skynet = require "skynet"
local ClusterProxy = require "cluster_proxy"
local user_proxy = ClusterProxy.new("node1", ".user_service")
local game_proxy = ClusterProxy.new("node2", ".game_service")
local CMD = {}
function CMD.get_player_info(user_id)
-- 使用代理调用,代码更简洁
local user = user_proxy:cached_call("get_user", "user_" .. user_id, 60, user_id)
return user
end
function CMD.enter_game(user_id)
-- 带重试的调用
return game_proxy:call_with_retry("enter_game", 3, user_id)
end
skynet.start(function()
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
负载均衡
简单轮询负载均衡
-- lualib/load_balancer.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"
local LoadBalancer = {}
LoadBalancer.__index = LoadBalancer
function LoadBalancer.new(nodes, service_name)
return setmetatable({
nodes = nodes,
service = service_name,
current = 0,
}, LoadBalancer)
end
function LoadBalancer:next_node()
self.current = (self.current % #self.nodes) + 1
return self.nodes[self.current]
end
function LoadBalancer:call(cmd, ...)
local node = self:next_node()
return cluster.call(node, self.service, "lua", cmd, ...)
end
function LoadBalancer:send(cmd, ...)
local node = self:next_node()
cluster.send(node, self.service, "lua", cmd, ...)
end
return LoadBalancer
-- 使用示例
local LB = require "load_balancer"
local game_lb = LB.new({"node1", "node2", "node3"}, ".game_service")
-- 请求会自动分发到不同节点
for i = 1, 10 do
game_lb:send("process", "task_" .. i)
end
基于负载的负载均衡
-- lualib/smart_balancer.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"
local SmartBalancer = {}
SmartBalancer.__index = SmartBalancer
function SmartBalancer.new(nodes, service_name)
return setmetatable({
nodes = nodes,
service = service_name,
loads = {}, -- node -> load
}, SmartBalancer)
end
function SmartBalancer:update_loads()
for _, node in ipairs(self.nodes) do
local ok, load = pcall(cluster.call, node, ".monitor", "lua", "get_load")
if ok then
self.loads[node] = load
end
end
end
function SmartBalancer:select_node()
-- 选择负载最低的节点
local min_load = math.huge
local best_node = self.nodes[1]
for _, node in ipairs(self.nodes) do
local load = self.loads[node] or 0
if load < min_load then
min_load = load
best_node = node
end
end
return best_node
end
function SmartBalancer:call(cmd, ...)
local node = self:select_node()
return cluster.call(node, self.service, "lua", cmd, ...)
end
return SmartBalancer
故障处理
节点心跳检测
-- service/heartbeat.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"
local nodes = {...} -- 需要监控的节点列表
local node_status = {}
local function check_node(node)
local ok, result = pcall(cluster.call, node, ".monitor", "lua", "ping")
if ok then
if node_status[node] ~= "online" then
node_status[node] = "online"
skynet.error(string.format("节点 %s 上线", node))
end
else
if node_status[node] ~= "offline" then
node_status[node] = "offline"
skynet.error(string.format("节点 %s 离线: %s", node, result))
end
end
end
skynet.start(function()
-- 初始化所有节点状态
for _, node in ipairs(nodes) do
node_status[node] = "unknown"
end
-- 定期检查心跳
skynet.fork(function()
while true do
for _, node in ipairs(nodes) do
check_node(node)
end
skynet.sleep(500) -- 每 5 秒检查一次
end
end)
end)
故障转移
-- lualib/failover.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"
local Failover = {}
Failover.__index = Failover
function Failover.new(primary, backup, service_name)
return setmetatable({
primary = primary,
backup = backup,
service = service_name,
current = primary,
failover_count = 0,
}, Failover)
end
function Failover:call(cmd, ...)
-- 尝试主节点
local ok, result = pcall(cluster.call,
self.current, self.service, "lua", cmd, ...)
if ok then
self.failover_count = 0
return result
end
-- 主节点失败,切换到备用节点
skynet.error(string.format("主节点 %s 失败,切换到备用节点 %s",
self.current, self.backup))
self.current = self.backup
self.failover_count = self.failover_count + 1
-- 尝试备用节点
local ok, result = pcall(cluster.call,
self.current, self.service, "lua", cmd, ...)
if ok then
return result
end
error("所有节点均不可用")
end
return Failover
集群监控服务
-- service/cluster_monitor.lua
local skynet = require "skynet"
local cluster = require "skynet.cluster"
local stats = {
call_count = 0,
call_errors = 0,
send_count = 0,
latency_sum = 0,
}
local CMD = {}
function CMD.ping()
return "pong"
end
function CMD.get_stats()
return stats
end
function CMD.get_load()
-- 返回当前服务的负载指标
local info = skynet.info_func and skynet.info_func() or {}
return {
message_count = info.message or 0,
memory = info.memory or 0,
}
end
function CMD.report_call(success, latency)
stats.call_count = stats.call_count + 1
if not success then
stats.call_errors = stats.call_errors + 1
end
stats.latency_sum = stats.latency_sum + latency
end
skynet.start(function()
skynet.register(".cluster_monitor")
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
最佳实践
1. 合理划分服务
推荐的服务划分方式:
节点 1(登录节点):
- 登录服务
- 认证服务
- 网关服务
节点 2(游戏逻辑节点):
- 战斗服务
- 场景服务
- NPC 服务
节点 3(数据节点):
- 用户数据服务
- 排行榜服务
- 存档服务
节点 4(社交节点):
- 聊天服务
- 好友服务
- 邮件服务
2. 减少跨节点调用
-- 不好:频繁跨节点调用
for i = 1, 100 do
cluster.call("node2", ".data", "lua", "get", i)
end
-- 好:批量请求
local result = cluster.call("node2", ".data", "lua", "batch_get", {1, 2, ..., 100})
3. 使用本地缓存
-- 缓存远程数据
local cache = {}
local cache_ttl = 60 -- 60 秒过期
function get_user(user_id)
local cached = cache[user_id]
if cached and skynet.time() - cached.time < cache_ttl then
return cached.data
end
local user = cluster.call("data_node", ".user_service", "lua", "get_user", user_id)
cache[user_id] = {data = user, time = skynet.time()}
return user
end
总结
本教程详细介绍了 Skynet 集群通信的各个方面:
- 集群架构:对等网络、地址格式
- 集群配置:配置文件和动态更新
- 节点通信:call 和 send
- 服务注册:注册中心和自动注册
- 负载均衡:轮询和基于负载的策略
- 故障处理:心跳检测和故障转移
参考资料
- Skynet 集群文档:https://github.com/cloudwu/skynet/wiki/Cluster
- Skynet cluster 源码:lualib/skynet/cluster.lua
- 分布式系统设计原理
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。