Skynet 的消息传递机制是框架的核心,决定了服务之间如何通信。本教程将从底层原理到上层应用,系统讲解 Skynet 消息传递的方方面面。
消息传递概览
在 Skynet 中,所有服务间的通信都通过消息完成。消息传递具有以下特点:
- 异步非阻塞:发送消息后不会阻塞发送方
- 单向通信:消息只能从一个服务发往另一个服务
- 类型标识:每条消息都有类型标识,决定接收方如何处理
- 序列号机制:用于关联请求和响应
消息类型
Skynet 定义了多种消息类型,每种类型有不同的用途和处理方式。
类型列表
-- Skynet 内部定义的消息类型常量
local PTYPE_TEXT = 0 -- 纯文本消息
local PTYPE_RESPONSE = 1 -- 响应消息
local PTYPE_MULTICAST = 2 -- 多播消息
local PTYPE_CLIENT = 3 -- 客户端消息
local PTYPE_SYSTEM = 4 -- 系统消息
local PTYPE_HARBOR = 5 -- 节点消息(集群)
local PTYPE_SOCKET = 6 -- Socket 消息
local PTYPE_ERROR = 7 -- 错误消息
local PTYPE_RESERVED_QUEUE = 8 -- 保留队列
local PTYPE_DEBUG = 9 -- 调试消息
local PTYPE_TRACE = 10 -- 追踪消息
local PTYPE_LUA = 11 -- Lua 消息(最常用)
local PTYPE_SNLua = 12 -- snlua 服务初始化
local PTYPE_C = 13 -- C 消息
注册消息处理函数
local skynet = require "skynet"
skynet.start(function()
-- 处理 Lua 消息
skynet.dispatch("lua", function(session, source, cmd, ...)
-- session: 序列号(0 表示 send,非 0 表示 call)
-- source: 发送方服务地址
-- cmd: 命令名
-- ...: 参数
end)
-- 处理 Socket 消息
skynet.dispatch("socket", function(session, source, cmd, ...)
-- 处理网络事件
end)
-- 处理自定义消息类型
skynet.register_protocol({
name = "myprotocol",
id = 100,
pack = function(...) return ... end,
unpack = function(...) return ... end,
})
skynet.dispatch("myprotocol", function(session, source, ...)
-- 处理自定义消息
end)
end)
send 和 call 的区别
skynet.send 和 skynet.call 是 Skynet 中最核心的两个消息发送函数。
skynet.send:异步发送
local skynet = require "skynet"
skynet.start(function()
local target = skynet.newservice("target")
-- send 不等待响应,立即返回
skynet.send(target, "lua", "notify", "data")
-- 可以继续执行
skynet.error("消息已发出,不等待响应")
end)
send 的特点:
- 立即返回,不阻塞
- 发送方不需要处理响应
- session 始终为 0
skynet.call:同步调用
local skynet = require "skynet"
skynet.start(function()
local target = skynet.newservice("target")
-- call 会阻塞当前协程,等待响应
local result = skynet.call(target, "lua", "query", "key")
-- 直到收到响应才会执行到这里
skynet.error("收到响应:", result)
end)
call 的特点:
- 阻塞当前协程,等待响应
- 发送方会收到响应结果
- session 为非 0 值,用于匹配响应
底层实现
call 实际上是通过 send + 协程挂起实现的:
-- call 的底层实现逻辑(简化版)
function skynet.call(addr, typename, ...)
local session = skynet.genid() -- 生成唯一 session
-- 挂起当前协程
local co = coroutine.running()
skynet_suspend(co)
-- 发送消息
skynet_send(addr, typename, session, ...)
-- 当响应到来时,协程被恢复,返回结果
return coroutine.yield()
end
消息响应
接收方处理完消息后,需要将结果返回给发送方。
skynet.ret
local skynet = require "skynet"
local CMD = {}
function CMD.query(key)
return data[key]
end
skynet.start(function()
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
local result = f(...)
-- 如果是 call(session != 0),需要返回响应
if session ~= 0 then
skynet.ret(skynet.pack(result))
end
end)
end)
skynet.pack / skynet.unpack
-- 打包数据
local packed = skynet.pack("hello", 123, {a = 1})
-- 解包数据
local str, num, tbl = skynet.unpack(packed)
skynet.retpack
skynet.retpack 是 skynet.ret + skynet.pack 的快捷方式:
local skynet = require "skynet"
local CMD = {}
function CMD.query(key)
return data[key]
end
skynet.start(function()
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
-- 简化写法
skynet.retpack(f(...))
else
f(...)
end
end)
end)
消息序列化
Skynet 使用自定义的序列化方式,性能优于 JSON 和 MessagePack。
内部序列化机制
local skynet = require "skynet"
-- skynet.pack 使用 skynet.pack 序列化
-- 支持的类型:
-- - nil, boolean, number
-- - string
-- - table(支持嵌套)
-- - userdata(C 指针)
local data = {
name = "张三",
age = 25,
items = {1, 2, 3},
position = {x = 100, y = 200}
}
local packed = skynet.pack(data)
local unpacked = skynet.unpack(packed)
自定义序列化
local skynet = require "skynet"
-- 注册自定义协议
skynet.register_protocol({
name = "json",
id = skynet.PTYPE_TEXT,
pack = function(data)
return cjson.encode(data)
end,
unpack = function(data)
return cjson.decode(data)
end,
})
-- 发送 JSON 消息
skynet.send(target, "json", {type = "chat", msg = "hello"})
序列化性能对比
| 方式 | 编码速度 | 解码速度 | 大小 |
|---|---|---|---|
| skynet.pack | 最快 | 最快 | 中等 |
| JSON | 中等 | 中等 | 较大 |
| MessagePack | 较快 | 较快 | 较小 |
| Protobuf | 快 | 快 | 最小 |
协程调度与消息处理
Skynet 的协程调度机制是实现异步 IO 的关键。
单服务单协程
每个服务在同一时刻只处理一条消息,但可以在消息处理过程中挂起等待其他事件:
local skynet = require "skynet"
local CMD = {}
function CMD.process(data)
-- 1. 调用数据库服务(挂起当前协程)
local user = skynet.call(db_service, "lua", "get_user", data.user_id)
-- 2. 调用缓存服务(再次挂起)
local cache = skynet.call(cache_service, "lua", "get", "key")
-- 3. 处理完成后返回
return {user = user, cache = cache}
end
skynet.start(function()
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
使用 skynet.fork 创建新协程
local skynet = require "skynet"
skynet.start(function()
-- 创建独立的协程执行后台任务
skynet.fork(function()
while true do
-- 后台任务:每秒执行一次
skynet.sleep(100)
do_background_work()
end
end)
-- 创建另一个独立协程
skynet.fork(function()
-- 等待某个事件
local result = skynet.call(service, "lua", "wait_event")
process_event(result)
end)
-- 主协程继续处理消息
skynet.dispatch("lua", message_handler)
end)
协程调度示意图
服务 A 的消息处理协程:
┌─────────────────────────────────────────┐
│ 收到消息 │
│ ↓ │
│ 调用 skynet.call(B) → 挂起 │
│ ↓ (等待 B 响应) │
│ 收到响应 → 恢复 │
│ ↓ │
│ 调用 skynet.call(C) → 挂起 │
│ ↓ (等待 C 响应) │
│ 收到响应 → 恢复 │
│ ↓ │
│ 返回结果 │
└─────────────────────────────────────────┘
消息队列机制
每个 Skynet 服务都有自己的消息队列,工作线程从队列中取出消息并处理。
消息队列的工作流程
1. 其他服务发送消息 → 消息进入目标服务的消息队列
2. 工作线程检查消息队列
3. 如果队列不为空,取出消息
4. 执行消息处理函数
5. 处理完成,继续处理下一条消息
队列满的处理
-- 在配置文件中设置消息队列大小
queue = 1024 -- 消息队列最大长度
-- 当队列满时,新消息会被丢弃,并产生错误日志
消息超时
长时间等待响应会导致资源浪费,可以使用超时机制。
使用 skynet.timeout
local skynet = require "skynet"
local CMD = {}
function CMD.query_with_timeout(target, timeout_sec)
local result
local timeout = false
-- 设置超时
skynet.timeout(timeout_sec * 100, function()
timeout = true
skynet.error("请求超时")
end)
-- 等待响应
result = skynet.call(target, "lua", "query")
if timeout then
return nil, "timeout"
end
return result
end
使用 pcall 包装
local skynet = require "skynet"
local function safe_call(addr, typename, ...)
local ok, result = pcall(skynet.call, addr, typename, ...)
if ok then
return result
else
skynet.error("调用失败:", result)
return nil, result
end
end
多播消息
Skynet 支持将消息同时发送给多个服务。
使用 multicast 服务
local skynet = require "skynet"
local mc = require "skynet.multicast"
skynet.start(function()
-- 创建多播频道
local channel = mc.new()
-- 订阅频道
channel:subscribe()
-- 其他服务也可以订阅
skynet.send(other_service, "lua", "subscribe", channel.channel)
-- 发布消息
channel:publish("hello everyone")
-- 接收多播消息
skynet.dispatch("lua", function(session, source, cmd, ...)
if cmd == "multicast" then
local channel_id, msg = ...
skynet.error("收到多播消息:", msg)
end
end)
end)
错误处理
消息传递过程中的错误处理非常重要。
处理 call 失败
local skynet = require "skynet"
skynet.start(function()
-- call 可能失败(目标服务崩溃、不存在等)
local ok, result = pcall(skynet.call, target, "lua", "query")
if not ok then
skynet.error("call 失败:", result)
-- 处理错误
else
-- 处理成功结果
skynet.error("结果:", result)
end
end)
处理消息处理函数中的错误
local skynet = require "skynet"
local CMD = {}
function CMD.safe_command(...)
-- 业务逻辑
return do_something(...)
end
skynet.start(function()
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = CMD[cmd]
if not f then
skynet.error("未知命令:", cmd)
if session ~= 0 then
skynet.ret(skynet.pack(nil, "unknown command"))
end
return
end
local ok, result = pcall(f, ...)
if session ~= 0 then
if ok then
skynet.ret(skynet.pack(result))
else
skynet.ret(skynet.pack(nil, result))
end
else
if not ok then
skynet.error("处理消息出错:", result)
end
end
end)
end)
实战:构建消息通信框架
请求路由器
-- lualib/router.lua
local skynet = require "skynet"
local Router = {}
Router.__index = Router
function Router.new()
return setmetatable({
handlers = {}
}, Router)
end
function Router:register(cmd, handler)
self.handlers[cmd] = handler
end
function Router:dispatch()
skynet.dispatch("lua", function(session, source, cmd, ...)
local handler = self.handlers[cmd]
if not handler then
skynet.error("未注册的命令:", cmd)
if session ~= 0 then
skynet.ret(skynet.pack(nil, "unknown command"))
end
return
end
local ok, result = pcall(handler, source, ...)
if session ~= 0 then
if ok then
skynet.ret(skynet.pack(result))
else
skynet.ret(skynet.pack(nil, result))
end
end
end)
end
return Router
使用路由器
-- service/my_service.lua
local skynet = require "skynet"
local Router = require "router"
local router = Router.new()
router:register("get_user", function(source, user_id)
return {id = user_id, name = "张三"}
end)
router:register("update_user", function(source, user_id, data)
-- 更新用户信息
return true
end)
skynet.start(function()
router:dispatch()
end)
总结
本教程详细介绍了 Skynet 的消息传递机制,包括:
- 消息类型:Lua、Text、Client、Socket 等
- send 与 call:异步发送和同步调用
- 消息响应:ret、pack、retpack
- 协程调度:异步 IO 的核心机制
- 消息队列:消息存储和处理的流程
- 错误处理:保证系统稳定性
在下一节教程中,我们将学习 Skynet 的 Lua API 参考手册,掌握所有常用 API 的用法。
参考资料
- Skynet 消息传递文档:https://github.com/cloudwu/skynet/wiki/Message
- Skynet 协程调度源码:skynet-src/skynet_mq.c
- Lua 协程文档:https://www.lua.org/manual/5.4/manual.html#2.6
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。