Skynet 消息传递机制

深入解析 Skynet 消息传递机制,包括消息类型、消息队列、skynet.send/call、协程调度和消息序列化

Skynet 的消息传递机制是框架的核心,决定了服务之间如何通信。本教程将从底层原理到上层应用,系统讲解 Skynet 消息传递的方方面面。

消息传递概览

在 Skynet 中,所有服务间的通信都通过消息完成。消息传递具有以下特点:

  1. 异步非阻塞:发送消息后不会阻塞发送方
  2. 单向通信:消息只能从一个服务发往另一个服务
  3. 类型标识:每条消息都有类型标识,决定接收方如何处理
  4. 序列号机制:用于关联请求和响应

消息类型

Skynet 定义了多种消息类型,每种类型有不同的用途和处理方式。

类型列表

-- Skynet 内部定义的消息类型常量
local PTYPE_TEXT = 0       -- 纯文本消息
local PTYPE_RESPONSE = 1   -- 响应消息
local PTYPE_MULTICAST = 2  -- 多播消息
local PTYPE_CLIENT = 3     -- 客户端消息
local PTYPE_SYSTEM = 4     -- 系统消息
local PTYPE_HARBOR = 5     -- 节点消息(集群)
local PTYPE_SOCKET = 6     -- Socket 消息
local PTYPE_ERROR = 7      -- 错误消息
local PTYPE_RESERVED_QUEUE = 8  -- 保留队列
local PTYPE_DEBUG = 9      -- 调试消息
local PTYPE_TRACE = 10     -- 追踪消息
local PTYPE_LUA = 11       -- Lua 消息(最常用)
local PTYPE_SNLua = 12     -- snlua 服务初始化
local PTYPE_C = 13         -- C 消息

注册消息处理函数

local skynet = require "skynet"

skynet.start(function()
    -- 处理 Lua 消息
    skynet.dispatch("lua", function(session, source, cmd, ...)
        -- session: 序列号(0 表示 send,非 0 表示 call)
        -- source: 发送方服务地址
        -- cmd: 命令名
        -- ...: 参数
    end)
    
    -- 处理 Socket 消息
    skynet.dispatch("socket", function(session, source, cmd, ...)
        -- 处理网络事件
    end)
    
    -- 处理自定义消息类型
    skynet.register_protocol({
        name = "myprotocol",
        id = 100,
        pack = function(...) return ... end,
        unpack = function(...) return ... end,
    })
    skynet.dispatch("myprotocol", function(session, source, ...)
        -- 处理自定义消息
    end)
end)

send 和 call 的区别

skynet.sendskynet.call 是 Skynet 中最核心的两个消息发送函数。

skynet.send:异步发送

local skynet = require "skynet"

skynet.start(function()
    local target = skynet.newservice("target")
    
    -- send 不等待响应,立即返回
    skynet.send(target, "lua", "notify", "data")
    
    -- 可以继续执行
    skynet.error("消息已发出,不等待响应")
end)

send 的特点:

  • 立即返回,不阻塞
  • 发送方不需要处理响应
  • session 始终为 0

skynet.call:同步调用

local skynet = require "skynet"

skynet.start(function()
    local target = skynet.newservice("target")
    
    -- call 会阻塞当前协程,等待响应
    local result = skynet.call(target, "lua", "query", "key")
    
    -- 直到收到响应才会执行到这里
    skynet.error("收到响应:", result)
end)

call 的特点:

  • 阻塞当前协程,等待响应
  • 发送方会收到响应结果
  • session 为非 0 值,用于匹配响应

底层实现

call 实际上是通过 send + 协程挂起实现的:

-- call 的底层实现逻辑(简化版)
function skynet.call(addr, typename, ...)
    local session = skynet.genid()  -- 生成唯一 session
    
    -- 挂起当前协程
    local co = coroutine.running()
    skynet_suspend(co)
    
    -- 发送消息
    skynet_send(addr, typename, session, ...)
    
    -- 当响应到来时,协程被恢复,返回结果
    return coroutine.yield()
end

消息响应

接收方处理完消息后,需要将结果返回给发送方。

skynet.ret

local skynet = require "skynet"

local CMD = {}

function CMD.query(key)
    return data[key]
end

skynet.start(function()
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        local result = f(...)
        
        -- 如果是 call(session != 0),需要返回响应
        if session ~= 0 then
            skynet.ret(skynet.pack(result))
        end
    end)
end)

skynet.pack / skynet.unpack

-- 打包数据
local packed = skynet.pack("hello", 123, {a = 1})

-- 解包数据
local str, num, tbl = skynet.unpack(packed)

skynet.retpack

skynet.retpackskynet.ret + skynet.pack 的快捷方式:

local skynet = require "skynet"

local CMD = {}

function CMD.query(key)
    return data[key]
end

skynet.start(function()
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        if session ~= 0 then
            -- 简化写法
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

消息序列化

Skynet 使用自定义的序列化方式,性能优于 JSON 和 MessagePack。

内部序列化机制

local skynet = require "skynet"

-- skynet.pack 使用 skynet.pack 序列化
-- 支持的类型:
-- - nil, boolean, number
-- - string
-- - table(支持嵌套)
-- - userdata(C 指针)

local data = {
    name = "张三",
    age = 25,
    items = {1, 2, 3},
    position = {x = 100, y = 200}
}

local packed = skynet.pack(data)
local unpacked = skynet.unpack(packed)

自定义序列化

local skynet = require "skynet"

-- 注册自定义协议
skynet.register_protocol({
    name = "json",
    id = skynet.PTYPE_TEXT,
    pack = function(data)
        return cjson.encode(data)
    end,
    unpack = function(data)
        return cjson.decode(data)
    end,
})

-- 发送 JSON 消息
skynet.send(target, "json", {type = "chat", msg = "hello"})

序列化性能对比

方式编码速度解码速度大小
skynet.pack最快最快中等
JSON中等中等较大
MessagePack较快较快较小
Protobuf最小

协程调度与消息处理

Skynet 的协程调度机制是实现异步 IO 的关键。

单服务单协程

每个服务在同一时刻只处理一条消息,但可以在消息处理过程中挂起等待其他事件:

local skynet = require "skynet"

local CMD = {}

function CMD.process(data)
    -- 1. 调用数据库服务(挂起当前协程)
    local user = skynet.call(db_service, "lua", "get_user", data.user_id)
    
    -- 2. 调用缓存服务(再次挂起)
    local cache = skynet.call(cache_service, "lua", "get", "key")
    
    -- 3. 处理完成后返回
    return {user = user, cache = cache}
end

skynet.start(function()
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

使用 skynet.fork 创建新协程

local skynet = require "skynet"

skynet.start(function()
    -- 创建独立的协程执行后台任务
    skynet.fork(function()
        while true do
            -- 后台任务:每秒执行一次
            skynet.sleep(100)
            do_background_work()
        end
    end)
    
    -- 创建另一个独立协程
    skynet.fork(function()
        -- 等待某个事件
        local result = skynet.call(service, "lua", "wait_event")
        process_event(result)
    end)
    
    -- 主协程继续处理消息
    skynet.dispatch("lua", message_handler)
end)

协程调度示意图

服务 A 的消息处理协程:
  ┌─────────────────────────────────────────┐
  │ 收到消息                                 │
  │   ↓                                      │
  │ 调用 skynet.call(B) → 挂起              │
  │   ↓ (等待 B 响应)                       │
  │ 收到响应 → 恢复                          │
  │   ↓                                      │
  │ 调用 skynet.call(C) → 挂起              │
  │   ↓ (等待 C 响应)                       │
  │ 收到响应 → 恢复                          │
  │   ↓                                      │
  │ 返回结果                                 │
  └─────────────────────────────────────────┘

消息队列机制

每个 Skynet 服务都有自己的消息队列,工作线程从队列中取出消息并处理。

消息队列的工作流程

1. 其他服务发送消息 → 消息进入目标服务的消息队列
2. 工作线程检查消息队列
3. 如果队列不为空,取出消息
4. 执行消息处理函数
5. 处理完成,继续处理下一条消息

队列满的处理

-- 在配置文件中设置消息队列大小
queue = 1024  -- 消息队列最大长度

-- 当队列满时,新消息会被丢弃,并产生错误日志

消息超时

长时间等待响应会导致资源浪费,可以使用超时机制。

使用 skynet.timeout

local skynet = require "skynet"

local CMD = {}

function CMD.query_with_timeout(target, timeout_sec)
    local result
    local timeout = false
    
    -- 设置超时
    skynet.timeout(timeout_sec * 100, function()
        timeout = true
        skynet.error("请求超时")
    end)
    
    -- 等待响应
    result = skynet.call(target, "lua", "query")
    
    if timeout then
        return nil, "timeout"
    end
    
    return result
end

使用 pcall 包装

local skynet = require "skynet"

local function safe_call(addr, typename, ...)
    local ok, result = pcall(skynet.call, addr, typename, ...)
    if ok then
        return result
    else
        skynet.error("调用失败:", result)
        return nil, result
    end
end

多播消息

Skynet 支持将消息同时发送给多个服务。

使用 multicast 服务

local skynet = require "skynet"
local mc = require "skynet.multicast"

skynet.start(function()
    -- 创建多播频道
    local channel = mc.new()
    
    -- 订阅频道
    channel:subscribe()
    
    -- 其他服务也可以订阅
    skynet.send(other_service, "lua", "subscribe", channel.channel)
    
    -- 发布消息
    channel:publish("hello everyone")
    
    -- 接收多播消息
    skynet.dispatch("lua", function(session, source, cmd, ...)
        if cmd == "multicast" then
            local channel_id, msg = ...
            skynet.error("收到多播消息:", msg)
        end
    end)
end)

错误处理

消息传递过程中的错误处理非常重要。

处理 call 失败

local skynet = require "skynet"

skynet.start(function()
    -- call 可能失败(目标服务崩溃、不存在等)
    local ok, result = pcall(skynet.call, target, "lua", "query")
    
    if not ok then
        skynet.error("call 失败:", result)
        -- 处理错误
    else
        -- 处理成功结果
        skynet.error("结果:", result)
    end
end)

处理消息处理函数中的错误

local skynet = require "skynet"

local CMD = {}

function CMD.safe_command(...)
    -- 业务逻辑
    return do_something(...)
end

skynet.start(function()
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = CMD[cmd]
        if not f then
            skynet.error("未知命令:", cmd)
            if session ~= 0 then
                skynet.ret(skynet.pack(nil, "unknown command"))
            end
            return
        end
        
        local ok, result = pcall(f, ...)
        if session ~= 0 then
            if ok then
                skynet.ret(skynet.pack(result))
            else
                skynet.ret(skynet.pack(nil, result))
            end
        else
            if not ok then
                skynet.error("处理消息出错:", result)
            end
        end
    end)
end)

实战:构建消息通信框架

请求路由器

-- lualib/router.lua
local skynet = require "skynet"

local Router = {}
Router.__index = Router

function Router.new()
    return setmetatable({
        handlers = {}
    }, Router)
end

function Router:register(cmd, handler)
    self.handlers[cmd] = handler
end

function Router:dispatch()
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local handler = self.handlers[cmd]
        if not handler then
            skynet.error("未注册的命令:", cmd)
            if session ~= 0 then
                skynet.ret(skynet.pack(nil, "unknown command"))
            end
            return
        end
        
        local ok, result = pcall(handler, source, ...)
        if session ~= 0 then
            if ok then
                skynet.ret(skynet.pack(result))
            else
                skynet.ret(skynet.pack(nil, result))
            end
        end
    end)
end

return Router

使用路由器

-- service/my_service.lua
local skynet = require "skynet"
local Router = require "router"

local router = Router.new()

router:register("get_user", function(source, user_id)
    return {id = user_id, name = "张三"}
end)

router:register("update_user", function(source, user_id, data)
    -- 更新用户信息
    return true
end)

skynet.start(function()
    router:dispatch()
end)

总结

本教程详细介绍了 Skynet 的消息传递机制,包括:

  1. 消息类型:Lua、Text、Client、Socket 等
  2. send 与 call:异步发送和同步调用
  3. 消息响应:ret、pack、retpack
  4. 协程调度:异步 IO 的核心机制
  5. 消息队列:消息存储和处理的流程
  6. 错误处理:保证系统稳定性

在下一节教程中,我们将学习 Skynet 的 Lua API 参考手册,掌握所有常用 API 的用法。

参考资料

  1. Skynet 消息传递文档:https://github.com/cloudwu/skynet/wiki/Message
  2. Skynet 协程调度源码:skynet-src/skynet_mq.c
  3. Lua 协程文档:https://www.lua.org/manual/5.4/manual.html#2.6

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页