Skynet 性能调优实战

深入讲解 Skynet 性能调优技术,包括服务拆分策略、消息队列优化、内存管理、并发控制和生产环境调优案例

性能调优是游戏服务器开发中的关键技能。Skynet 虽然本身性能优秀,但不当的使用方式仍可能导致性能瓶颈。本教程将从理论到实践,系统讲解 Skynet 性能调优的各个方面。

性能指标

关键性能指标

在开始优化之前,需要明确衡量标准:

指标说明目标值
QPS每秒查询数> 10,000
延迟请求响应时间< 50ms (P99)
并发连接同时在线用户数> 10,000
CPU 使用率CPU 占用百分比< 70%
内存使用内存占用量根据硬件配置
消息队列长度待处理消息数< 100

性能测试工具

-- lualib/benchmark.lua
local skynet = require "skynet"

local Benchmark = {}

function Benchmark.run(func, iterations, concurrency)
    iterations = iterations or 1000
    concurrency = concurrency or 10
    
    local results = {
        total_time = 0,
        min_time = math.huge,
        max_time = 0,
        success = 0,
        failed = 0,
    }
    
    local start_time = skynet.now()
    
    -- 并发执行
    for i = 1, concurrency do
        skynet.fork(function()
            for j = 1, iterations / concurrency do
                local iter_start = skynet.now()
                local ok, err = pcall(func)
                local iter_time = skynet.now() - iter_start
                
                if ok then
                    results.success = results.success + 1
                    results.min_time = math.min(results.min_time, iter_time)
                    results.max_time = math.max(results.max_time, iter_time)
                else
                    results.failed = results.failed + 1
                end
            end
        end)
    end
    
    -- 等待所有协程完成
    skynet.sleep(iterations / concurrency / 100 + 10)
    
    results.total_time = skynet.now() - start_time
    results.avg_time = results.total_time / (results.success + results.failed)
    results.qps = (results.success + results.failed) / (results.total_time / 100)
    
    return results
end

function Benchmark.print_report(name, results)
    skynet.error(string.rep("=", 60))
    skynet.error(string.format("性能测试报告: %s", name))
    skynet.error(string.rep("=", 60))
    skynet.error(string.format("成功次数: %d", results.success))
    skynet.error(string.format("失败次数: %d", results.failed))
    skynet.error(string.format("总耗时: %.2f 秒", results.total_time / 100))
    skynet.error(string.format("平均耗时: %.2f ms", results.avg_time * 10))
    skynet.error(string.format("最小耗时: %.2f ms", results.min_time * 10))
    skynet.error(string.format("最大耗时: %.2f ms", results.max_time * 10))
    skynet.error(string.format("QPS: %.2f", results.qps))
    skynet.error(string.rep("=", 60))
end

return Benchmark

服务拆分策略

拆分原则

服务拆分是性能优化的基础。遵循以下原则:

  1. 单一职责:每个服务只负责一个明确的功能
  2. 高内聚低耦合:服务内部功能紧密相关,服务间依赖最小化
  3. 负载均衡:将热点功能分散到多个服务实例
  4. 资源隔离:CPU 密集型和 IO 密集型分开

拆分示例

不好的设计

-- service/monolith.lua - 单体服务
local CMD = {}

function CMD.login(...) end
function CMD.chat(...) end
function CMD.battle(...) end
function CMD.payment(...) end
function CMD.mail(...) end

-- 问题:所有功能在一个服务中,无法水平扩展

好的设计

-- 按功能拆分
-- service/login.lua - 登录服务
-- service/chat.lua - 聊天服务
-- service/battle.lua - 战斗服务(多实例)
-- service/payment.lua - 支付服务
-- service/mail.lua - 邮件服务

水平扩展

对于热点服务,可以创建多个实例:

-- service/battle_dispatcher.lua
local skynet = require "skynet"

local battle_instances = {}
local instance_count = 5

skynet.start(function()
    -- 创建多个战斗服务实例
    for i = 1, instance_count do
        battle_instances[i] = skynet.newservice("battle", i)
    end
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        if cmd == "start_battle" then
            local player_id = ...
            -- 根据玩家 ID 分配到不同实例
            local instance_id = (player_id % instance_count) + 1
            local instance = battle_instances[instance_id]
            
            if session ~= 0 then
                skynet.retpack(skynet.call(instance, "lua", cmd, ...))
            else
                skynet.send(instance, "lua", cmd, ...)
            end
        end
    end)
end)

消息队列优化

减少消息数量

-- 不好:频繁发送小消息
for i = 1, 100 do
    skynet.send(target, "lua", "update_item", i, items[i])
end

-- 好:批量发送
skynet.send(target, "lua", "update_items", items)

消息合并

-- service/data_aggregator.lua
local skynet = require "skynet"

local pending_updates = {}
local flush_timer = nil

local CMD = {}

function CMD.update(key, value)
    table.insert(pending_updates, {key = key, value = value})
    
    -- 达到阈值或定时器触发时批量写入
    if #pending_updates >= 100 then
        CMD.flush()
    elseif not flush_timer then
        flush_timer = skynet.timeout(100, function()
            CMD.flush()
            flush_timer = nil
        end)
    end
end

function CMD.flush()
    if #pending_updates == 0 then
        return
    end
    
    -- 批量写入数据库
    skynet.call(".db_service", "lua", "batch_update", pending_updates)
    pending_updates = {}
end

skynet.start(function()
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

避免消息风暴

-- lualib/rate_limiter.lua
local skynet = require "skynet"

local RateLimiter = {}
RateLimiter.__index = RateLimiter

function RateLimiter.new(max_requests, time_window)
    return setmetatable({
        max_requests = max_requests,
        time_window = time_window,
        requests = {},
    }, RateLimiter)
end

function RateLimiter:allow(key)
    local now = skynet.time()
    
    -- 清理过期记录
    local valid_requests = {}
    for _, timestamp in ipairs(self.requests[key] or {}) do
        if now - timestamp < self.time_window then
            valid_requests[#valid_requests + 1] = timestamp
        end
    end
    
    -- 检查是否超过限制
    if #valid_requests >= self.max_requests then
        return false
    end
    
    -- 记录新请求
    valid_requests[#valid_requests + 1] = now
    self.requests[key] = valid_requests
    
    return true
end

-- 使用示例
local limiter = RateLimiter.new(100, 60)  -- 每分钟最多 100 次

function CMD.process_request(user_id, data)
    if not limiter:allow(user_id) then
        return false, "请求过于频繁"
    end
    
    -- 处理请求
    return do_process(data)
end

return RateLimiter

内存优化

对象池模式

-- lualib/object_pool.lua
local skynet = require "skynet"

local ObjectPool = {}
ObjectPool.__index = ObjectPool

function ObjectPool.new(create_func, reset_func, initial_size)
    local pool = setmetatable({
        create = create_func,
        reset = reset_func,
        objects = {},
    }, ObjectPool)
    
    -- 预创建对象
    for i = 1, initial_size or 10 do
        pool.objects[#pool.objects + 1] = create_func()
    end
    
    return pool
end

function ObjectPool:acquire()
    if #self.objects > 0 then
        return table.remove(self.objects)
    end
    return self.create()
end

function ObjectPool:release(obj)
    if self.reset then
        self.reset(obj)
    end
    self.objects[#self.objects + 1] = obj
end

function ObjectPool:size()
    return #self.objects
end

return ObjectPool

-- 使用示例:子弹对象池
local BulletPool = ObjectPool.new(
    function()
        return {x = 0, y = 0, vx = 0, vy = 0, damage = 0}
    end,
    function(bullet)
        bullet.x = 0
        bullet.y = 0
        bullet.vx = 0
        bullet.vy = 0
        bullet.damage = 0
    end,
    100  -- 预创建 100 个
)

-- 获取子弹
local bullet = BulletPool:acquire()
bullet.x = 100
bullet.y = 200
bullet.damage = 50

-- 回收子弹
BulletPool:release(bullet)

缓存策略

-- lualib/lru_cache.lua
local skynet = require "skynet"

local LRUCache = {}
LRUCache.__index = LRUCache

function LRUCache.new(max_size, ttl)
    return setmetatable({
        max_size = max_size or 1000,
        ttl = ttl or 300,  -- 5 分钟
        cache = {},
        access_order = {},
    }, LRUCache)
end

function LRUCache:get(key)
    local entry = self.cache[key]
    if not entry then
        return nil
    end
    
    -- 检查过期
    if skynet.time() - entry.timestamp > self.ttl then
        self:remove(key)
        return nil
    end
    
    -- 更新访问顺序
    self:update_access(key)
    
    return entry.value
end

function LRUCache:set(key, value)
    -- 如果已存在,更新
    if self.cache[key] then
        self.cache[key].value = value
        self.cache[key].timestamp = skynet.time()
        self:update_access(key)
        return
    end
    
    -- 如果已满,删除最久未使用的
    if #self.access_order >= self.max_size then
        local oldest_key = table.remove(self.access_order, 1)
        self.cache[oldest_key] = nil
    end
    
    -- 添加新条目
    self.cache[key] = {
        value = value,
        timestamp = skynet.time(),
    }
    self.access_order[#self.access_order + 1] = key
end

function LRUCache:remove(key)
    self.cache[key] = nil
    for i, k in ipairs(self.access_order) do
        if k == key then
            table.remove(self.access_order, i)
            break
        end
    end
end

function LRUCache:update_access(key)
    for i, k in ipairs(self.access_order) do
        if k == key then
            table.remove(self.access_order, i)
            break
        end
    end
    self.access_order[#self.access_order + 1] = key
end

function LRUCache:clear()
    self.cache = {}
    self.access_order = {}
end

return LRUCache

-- 使用示例
local user_cache = LRUCache.new(10000, 600)  -- 10000 用户,10 分钟过期

function get_user(user_id)
    local user = user_cache:get(user_id)
    if user then
        return user
    end
    
    -- 从数据库加载
    user = skynet.call(".db_service", "lua", "get_user", user_id)
    if user then
        user_cache:set(user_id, user)
    end
    
    return user
end

表优化

-- 不好:使用字符串键(哈希表,开销大)
local config = {}
config["max_players"] = 100
config["timeout"] = 30
config["retry_count"] = 3

-- 好:使用数字索引(数组,开销小)
local config = {100, 30, 3}
local MAX_PLAYERS, TIMEOUT, RETRY_COUNT = 1, 2, 3

-- 更好:使用局部变量
local max_players = 100
local timeout = 30
local retry_count = 3

字符串优化

-- 不好:循环中拼接字符串
local result = ""
for i = 1, 10000 do
    result = result .. "x"  -- 每次创建新字符串
end

-- 好:使用 table.concat
local parts = {}
for i = 1, 10000 do
    parts[i] = "x"
end
local result = table.concat(parts)

-- 好:使用 string.rep
local result = string.rep("x", 10000)

并发控制

避免热点服务

-- 不好:所有请求都经过一个服务
local db_service = skynet.uniqueservice("db_service")

-- 好:按数据类型分片
local db_services = {}
for i = 1, 5 do
    db_services[i] = skynet.newservice("db_service", i)
end

function get_db_service(user_id)
    local shard = (user_id % 5) + 1
    return db_services[shard]
end

异步处理

-- 不好:同步等待耗时操作
function CMD.process(data)
    local result = skynet.call(".slow_service", "lua", "compute", data)
    -- 阻塞当前协程
    return result
end

-- 好:异步处理,立即返回
function CMD.process(data)
    -- 立即返回任务 ID
    local task_id = generate_task_id()
    
    -- 后台处理
    skynet.fork(function()
        local result = skynet.call(".slow_service", "lua", "compute", data)
        -- 保存结果
        save_result(task_id, result)
        -- 通知客户端
        notify_client(task_id, result)
    end)
    
    return task_id
end

-- 客户端轮询结果
function CMD.get_result(task_id)
    return get_result(task_id)
end

读写分离

-- service/data_service.lua
local skynet = require "skynet"

-- 读服务(多实例)
local read_services = {}
-- 写服务(单实例)
local write_service = nil

skynet.start(function()
    -- 创建 3 个读服务
    for i = 1, 3 do
        read_services[i] = skynet.newservice("data_reader", i)
    end
    
    -- 创建 1 个写服务
    write_service = skynet.newservice("data_writer")
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        if cmd == "read" then
            -- 轮询分发读请求
            local idx = (source % 3) + 1
            if session ~= 0 then
                skynet.retpack(skynet.call(read_services[idx], "lua", cmd, ...))
            else
                skynet.send(read_services[idx], "lua", cmd, ...)
            end
        elseif cmd == "write" then
            -- 写请求发送到写服务
            if session ~= 0 then
                skynet.retpack(skynet.call(write_service, "lua", cmd, ...))
            else
                skynet.send(write_service, "lua", cmd, ...)
            end
        end
    end)
end)

CPU 优化

局部变量缓存

-- 不好:频繁访问全局变量
function process_data(data)
    for i = 1, #data do
        data[i] = math.sqrt(data[i])  -- math 是全局变量
    end
end

-- 好:缓存为局部变量
local sqrt = math.sqrt
local insert = table.insert
local concat = table.concat

function process_data(data)
    for i = 1, #data do
        data[i] = sqrt(data[i])
    end
end

避免重复计算

-- 不好:重复计算
function update_positions(entities)
    for i = 1, #entities do
        local e = entities[i]
        e.x = e.x + e.vx * get_delta_time()  -- 每次都调用
        e.y = e.y + e.vy * get_delta_time()
    end
end

-- 好:提前计算
function update_positions(entities)
    local dt = get_delta_time()  -- 只计算一次
    for i = 1, #entities do
        local e = entities[i]
        e.x = e.x + e.vx * dt
        e.y = e.y + e.vy * dt
    end
end

算法优化

-- 不好:O(n²) 查找
function find_entity(entities, target_id)
    for i = 1, #entities do
        if entities[i].id == target_id then
            return entities[i]
        end
    end
    return nil
end

-- 好:O(1) 查找(使用索引)
local entity_index = {}  -- id -> entity

function build_index(entities)
    for i = 1, #entities do
        entity_index[entities[i].id] = entities[i]
    end
end

function find_entity(target_id)
    return entity_index[target_id]
end

IO 优化

批量数据库操作

-- 不好:逐条插入
for i = 1, 1000 do
    skynet.call(".db", "lua", "insert", "users", users[i])
end

-- 好:批量插入
skynet.call(".db", "lua", "batch_insert", "users", users)

异步 IO

-- service/async_logger.lua
local skynet = require "skynet"

local log_buffer = {}
local buffer_size = 1000
local flush_interval = 100  -- 1 秒

local CMD = {}

function CMD.log(level, message)
    table.insert(log_buffer, {
        level = level,
        message = message,
        timestamp = os.time(),
    })
    
    -- 缓冲区满时立即刷新
    if #log_buffer >= buffer_size then
        CMD.flush()
    end
end

function CMD.flush()
    if #log_buffer == 0 then
        return
    end
    
    -- 批量写入文件
    local logs_to_write = log_buffer
    log_buffer = {}
    
    skynet.fork(function()
        write_to_file(logs_to_write)
    end)
end

skynet.start(function()
    -- 定时刷新
    skynet.fork(function()
        while true do
            skynet.sleep(flush_interval)
            CMD.flush()
        end
    end)
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

生产环境调优

配置文件优化

-- config.production

-- 线程数(根据 CPU 核心数)
thread = 16

-- 消息队列大小
queue = 32

-- 启动服务
start = "main"

-- 日志级别(生产环境用 INFO)
loglevel = "info"

-- 关闭调试控制台(生产环境)
-- console = nil

-- 集群配置
cluster = "config.cluster"
node = "node1"

启动优化

-- service/main.lua
local skynet = require "skynet"

skynet.start(function()
    skynet.error("服务器启动中...")
    
    -- 按依赖顺序启动服务
    -- 1. 基础服务
    skynet.uniqueservice("config_mgr")
    skynet.uniqueservice("db_service")
    skynet.uniqueservice("redis_service")
    
    -- 2. 业务服务
    skynet.uniqueservice("user_service")
    skynet.uniqueservice("game_service")
    
    -- 3. 网关服务(最后启动)
    local gate = skynet.newservice("gate")
    skynet.call(gate, "lua", "open", {
        port = 8888,
        maxclient = 10000,
    })
    
    skynet.error("服务器启动完成")
end)

监控和告警

-- service/monitor.lua
local skynet = require "skynet"

local thresholds = {
    cpu = 80,          -- CPU 使用率 80%
    memory = 85,       -- 内存使用率 85%
    queue = 1000,      -- 消息队列 1000
    latency = 100,     -- 延迟 100ms
}

local CMD = {}

function CMD.check()
    local launcher = skynet.localname(".launcher")
    local services = skynet.call(launcher, "lua", "list")
    
    local alerts = {}
    
    for addr in pairs(services) do
        local ok, stat = pcall(skynet.call, addr, "debug", "STAT")
        if ok then
            -- 检查消息队列
            if stat.queue > thresholds.queue then
                table.insert(alerts, {
                    type = "queue",
                    service = string.format(":%08x", addr),
                    value = stat.queue,
                    threshold = thresholds.queue,
                })
            end
            
            -- 检查内存
            local ok, mem = pcall(skynet.call, addr, "debug", "MEM")
            if ok and mem > 100 * 1024 * 1024 then  -- 100MB
                table.insert(alerts, {
                    type = "memory",
                    service = string.format(":%08x", addr),
                    value = mem,
                    threshold = 100 * 1024 * 1024,
                })
            end
        end
    end
    
    -- 发送告警
    if #alerts > 0 then
        skynet.call(".alert_service", "lua", "send_alerts", alerts)
    end
    
    return alerts
end

skynet.start(function()
    -- 每 10 秒检查一次
    skynet.fork(function()
        while true do
            skynet.sleep(1000)
            pcall(CMD.check)
        end
    end)
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

性能调优清单

检查清单

□ 服务拆分是否合理
□ 是否存在热点服务
□ 消息队列是否堆积
□ 内存使用是否正常
□ CPU 使用率是否过高
□ 是否存在内存泄漏
□ 数据库查询是否优化
□ 缓存策略是否有效
□ 并发控制是否合理
□ 是否有不必要的同步等待

优化优先级

  1. 架构优化:服务拆分、负载均衡
  2. 算法优化:减少复杂度、使用索引
  3. IO 优化:批量操作、异步处理
  4. 内存优化:对象池、缓存、GC 调优
  5. 代码优化:局部变量、避免重复计算

总结

本教程全面介绍了 Skynet 性能调优技术:

  1. 性能指标:明确优化目标和衡量标准
  2. 服务拆分:合理的架构是性能的基础
  3. 消息优化:减少消息数量、批量处理
  4. 内存优化:对象池、缓存、表优化
  5. 并发控制:避免热点、异步处理
  6. CPU 优化:局部变量、算法优化
  7. IO 优化:批量操作、异步 IO

性能优化是一个持续的过程,需要:

  • 建立完善的监控体系
  • 定期进行性能测试
  • 及时发现和解决瓶颈
  • 持续优化和改进

参考资料

  1. Skynet 性能优化讨论:https://github.com/cloudwu/skynet/wiki
  2. Lua 性能优化技巧:http://www.lua.org/gems/
  3. 游戏服务器性能优化实践

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页