调试是游戏服务器开发中不可或缺的技能。Skynet 的 Actor 模型和异步特性给调试带来了独特的挑战。本教程将系统介绍 Skynet 的各种调试技术和工具,帮助你快速定位和解决问题。
日志系统
基础日志
Skynet 内置了日志系统,通过 skynet.error、skynet.info、skynet.trace 等函数输出:
local skynet = require "skynet"
skynet.start(function()
-- 不同级别的日志
skynet.error("错误日志:严重问题")
skynet.info("信息日志:一般信息")
skynet.trace("追踪日志:详细调试信息")
-- 格式化输出
local user_id = 12345
local action = "login"
skynet.error(string.format("用户 %d 执行操作: %s", user_id, action))
end)
日志级别控制
-- service/logger_ctrl.lua
local skynet = require "skynet"
local LOG_LEVELS = {
ERROR = 1,
INFO = 2,
TRACE = 3,
}
local current_level = LOG_LEVELS.INFO
local CMD = {}
function CMD.set_level(level)
if LOG_LEVELS[level] then
current_level = LOG_LEVELS[level]
skynet.error("日志级别设置为:", level)
end
end
function CMD.get_level()
for name, value in pairs(LOG_LEVELS) do
if value == current_level then
return name
end
end
end
function CMD.error(service_id, msg)
if current_level >= LOG_LEVELS.ERROR then
skynet.error(string.format("[:%08x] ERROR: %s", service_id, msg))
end
end
function CMD.info(service_id, msg)
if current_level >= LOG_LEVELS.INFO then
skynet.info(string.format("[:%08x] INFO: %s", service_id, msg))
end
end
function CMD.trace(service_id, msg)
if current_level >= LOG_LEVELS.TRACE then
skynet.trace(string.format("[:%08x] TRACE: %s", service_id, msg))
end
end
skynet.start(function()
skynet.register(".logger_ctrl")
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
结构化日志
-- lualib/structured_log.lua
local skynet = require "skynet"
local cjson = require "cjson"
local StructuredLog = {}
function StructuredLog.new(service_name)
return setmetatable({
service = service_name,
service_id = skynet.self(),
}, {__index = StructuredLog})
end
function StructuredLog:log(level, event, data)
local log_entry = {
timestamp = os.time(),
level = level,
service = self.service,
service_id = string.format(":%08x", self.service_id),
event = event,
data = data or {},
}
-- 输出 JSON 格式,便于日志收集和分析
skynet.error(cjson.encode(log_entry))
end
function StructuredLog:error(event, data)
self:log("ERROR", event, data)
end
function StructuredLog:info(event, data)
self:log("INFO", event, data)
end
function StructuredLog:trace(event, data)
self:log("TRACE", event, data)
end
-- 使用示例
local log = StructuredLog.new("game_logic")
log:info("player_login", {
user_id = 12345,
ip = "192.168.1.100",
device = "iOS",
})
log:error("payment_failed", {
user_id = 12345,
order_id = "ORD_001",
reason = "insufficient_balance",
})
return StructuredLog
日志文件管理
-- service/log_writer.lua
local skynet = require "skynet"
local log_file = nil
local log_buffer = {}
local buffer_size = 100
local CMD = {}
function CMD.init(file_path)
log_file = io.open(file_path, "a")
if not log_file then
error("无法打开日志文件: " .. file_path)
end
-- 定时刷新缓冲区
skynet.fork(function()
while true do
skynet.sleep(100) -- 每秒刷新
CMD.flush()
end
end)
skynet.error("日志文件初始化:", file_path)
end
function CMD.write(log_entry)
table.insert(log_buffer, log_entry)
-- 缓冲区满时立即刷新
if #log_buffer >= buffer_size then
CMD.flush()
end
end
function CMD.flush()
if not log_file or #log_buffer == 0 then
return
end
for _, entry in ipairs(log_buffer) do
log_file:write(entry .. "\n")
end
log_file:flush()
log_buffer = {}
end
function CMD.rotate(new_file_path)
-- 关闭当前文件
if log_file then
CMD.flush()
log_file:close()
end
-- 打开新文件
log_file = io.open(new_file_path, "a")
skynet.error("日志轮转:", new_file_path)
end
skynet.start(function()
skynet.register(".log_writer")
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
远程调试控制台
内置调试控制台
Skynet 提供了调试控制台服务,可以通过 telnet 连接:
-- config 文件配置
console = "127.0.0.1:8888" -- 调试控制台端口
连接调试控制台:
telnet 127.0.0.1 8888
常用命令:
-- 查看所有服务
services
-- 查看服务信息
info <service_id>
-- 执行 Lua 代码
call <service_id> lua <command> [args...]
-- 查看服务内存使用
mem <service_id>
-- 查看服务消息队列
stat <service_id>
-- 启动新服务
launch <service_name> [args...]
-- 终止服务
kill <service_id>
自定义调试命令
-- service/debug_console_ext.lua
local skynet = require "skynet"
local debug_commands = {}
-- 注册调试命令
local function register_debug_cmd(name, func)
debug_commands[name] = func
end
-- 预定义调试命令
register_debug_cmd("online_count", function()
local gate = skynet.localname(".gate")
if gate then
return skynet.call(gate, "lua", "get_online")
end
return "gate 服务未启动"
end)
register_debug_cmd("memory_usage", function()
local launcher = skynet.localname(".launcher")
local services = skynet.call(launcher, "lua", "list")
local result = {}
for addr in pairs(services) do
local mem = skynet.call(addr, "debug", "MEM")
result[string.format(":%08x", addr)] = mem
end
return result
end)
register_debug_cmd("message_queue", function(service_id)
local addr = tonumber(service_id)
if not addr then
return "无效的服务 ID"
end
local stat = skynet.call(addr, "debug", "STAT")
return {
queue = stat.queue,
cpu = stat.cpu,
message = stat.message,
}
end)
register_debug_cmd("gc", function(service_id)
if service_id then
local addr = tonumber(service_id)
skynet.call(addr, "debug", "GC")
return "GC 完成: " .. service_id
else
-- 对所有服务执行 GC
local launcher = skynet.localname(".launcher")
local services = skynet.call(launcher, "lua", "list")
for addr in pairs(services) do
pcall(skynet.call, addr, "debug", "GC")
end
return "所有服务 GC 完成"
end
end)
local CMD = {}
function CMD.execute(cmd_name, ...)
local func = debug_commands[cmd_name]
if not func then
return "未知命令: " .. cmd_name
end
local ok, result = pcall(func, ...)
if not ok then
return "执行失败: " .. result
end
return result
end
function CMD.list_commands()
local cmds = {}
for name in pairs(debug_commands) do
cmds[#cmds + 1] = name
end
return cmds
end
skynet.start(function()
skynet.register(".debug_console_ext")
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
使用调试控制台脚本
#!/usr/bin/env python3
# scripts/debug_console.py
import socket
import sys
import json
class DebugConsole:
def __init__(self, host="127.0.0.1", port=8888):
self.host = host
self.port = port
self.sock = None
def connect(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
# 读取欢迎信息
self.sock.recv(1024)
def execute(self, command):
self.sock.sendall((command + "\n").encode())
response = b""
while True:
data = self.sock.recv(4096)
if not data:
break
response += data
if b"\n" in data:
break
return response.decode().strip()
def close(self):
if self.sock:
self.sock.close()
def main():
if len(sys.argv) < 2:
print("Usage: debug_console.py <command> [args...]")
sys.exit(1)
console = DebugConsole()
console.connect()
command = " ".join(sys.argv[1:])
result = console.execute(command)
print(result)
console.close()
if __name__ == "__main__":
main()
性能分析
服务性能监控
-- service/perf_monitor.lua
local skynet = require "skynet"
local stats = {}
local history = {}
local CMD = {}
function CMD.start_monitor(service_id, interval)
interval = interval or 100 -- 默认 1 秒
stats[service_id] = {
start_time = skynet.time(),
last_check = skynet.time(),
cpu_total = 0,
message_total = 0,
}
-- 启动监控协程
skynet.fork(function()
while stats[service_id] do
skynet.sleep(interval)
CMD.check(service_id)
end
end)
return true
end
function CMD.check(service_id)
local stat_info = stats[service_id]
if not stat_info then
return nil, "未监控该服务"
end
-- 获取服务统计信息
local ok, stat = pcall(skynet.call, service_id, "debug", "STAT")
if not ok then
return nil, "无法获取服务状态"
end
local now = skynet.time()
local dt = now - stat_info.last_check
-- 计算增量
local cpu_delta = stat.cpu - stat_info.cpu_total
local msg_delta = stat.message - stat_info.message_total
-- 记录历史
if not history[service_id] then
history[service_id] = {}
end
table.insert(history[service_id], {
timestamp = now,
cpu = cpu_delta / dt,
message_rate = msg_delta / dt,
queue = stat.queue,
memory = stat.memory,
})
-- 保留最近 3600 条记录(1 小时)
if #history[service_id] > 3600 then
table.remove(history[service_id], 1)
end
-- 更新统计
stat_info.last_check = now
stat_info.cpu_total = stat.cpu
stat_info.message_total = stat.message
return {
cpu = cpu_delta / dt,
message_rate = msg_delta / dt,
queue = stat.queue,
memory = stat.memory,
}
end
function CMD.get_history(service_id, limit)
limit = limit or 100
local hist = history[service_id]
if not hist then
return {}
end
local start_idx = math.max(1, #hist - limit + 1)
local result = {}
for i = start_idx, #hist do
result[#result + 1] = hist[i]
end
return result
end
function CMD.stop_monitor(service_id)
stats[service_id] = nil
return true
end
skynet.start(function()
skynet.register(".perf_monitor")
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
代码性能分析
-- lualib/profiler.lua
local skynet = require "skynet"
local Profiler = {}
local profiles = {}
local call_stacks = {}
function Profiler.start()
-- 设置调试钩子
debug.sethook(function(event, line)
local info = debug.getinfo(2, "nS")
local func_name = info.name or string.format("%s:%d", info.short_src, info.linedefined)
if event == "call" then
-- 函数调用
local stack = call_stacks[coroutine.running()] or {}
call_stacks[coroutine.running()] = stack
local record = {
name = func_name,
start_time = skynet.now(),
children = {},
}
table.insert(stack, record)
elseif event == "return" then
-- 函数返回
local stack = call_stacks[coroutine.running()]
if stack and #stack > 0 then
local record = table.remove(stack)
local elapsed = skynet.now() - record.start_time
-- 记录到 profiles
if not profiles[record.name] then
profiles[record.name] = {
count = 0,
total_time = 0,
max_time = 0,
min_time = math.huge,
}
end
local profile = profiles[record.name]
profile.count = profile.count + 1
profile.total_time = profile.total_time + elapsed
profile.max_time = math.max(profile.max_time, elapsed)
profile.min_time = math.min(profile.min_time, elapsed)
-- 添加到父函数
if #stack > 0 then
local parent = stack[#stack]
table.insert(parent.children, record)
end
end
end
end, "cr")
end
function Profiler.stop()
debug.sethook()
end
function Profiler.get_report()
local report = {}
for func_name, data in pairs(profiles) do
report[func_name] = {
count = data.count,
total_time = data.total_time,
avg_time = data.total_time / data.count,
max_time = data.max_time,
min_time = data.min_time == math.huge and 0 or data.min_time,
}
end
return report
end
function Profiler.print_report(top_n)
top_n = top_n or 20
local report = Profiler.get_report()
-- 按总时间排序
local sorted = {}
for name, data in pairs(report) do
sorted[#sorted + 1] = {name = name, data = data}
end
table.sort(sorted, function(a, b)
return a.data.total_time > b.data.total_time
end)
-- 打印前 N 个
skynet.error("===== 性能分析报告 =====")
skynet.error(string.format("%-40s %10s %10s %10s",
"函数", "调用次数", "总时间", "平均时间"))
skynet.error(string.rep("-", 80))
for i = 1, math.min(top_n, #sorted) do
local item = sorted[i]
skynet.error(string.format("%-40s %10d %10.2f %10.2f",
item.name,
item.data.count,
item.data.total_time / 100, -- 转换为秒
item.data.avg_time / 100))
end
skynet.error("========================")
end
function Profiler.reset()
profiles = {}
call_stacks = {}
end
return Profiler
使用性能分析器
-- 示例:分析游戏逻辑性能
local skynet = require "skynet"
local Profiler = require "profiler"
local CMD = {}
function CMD.process_heavy_task(data)
-- 开始性能分析
Profiler.start()
-- 执行复杂计算
for i = 1, 1000 do
process_item(data[i])
end
-- 停止分析
Profiler.stop()
-- 输出报告
Profiler.print_report(10)
return true
end
function process_item(item)
-- 模拟处理
local result = 0
for i = 1, 100 do
result = result + math.sqrt(item * i)
end
return result
end
skynet.start(function()
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
内存泄漏检测
内存监控服务
-- service/memory_monitor.lua
local skynet = require "skynet"
local memory_history = {}
local alerts = {}
local CMD = {}
function CMD.start_monitor(service_id, threshold_mb)
threshold_mb = threshold_mb or 100 -- 默认 100MB
memory_history[service_id] = {}
skynet.fork(function()
while memory_history[service_id] do
skynet.sleep(6000) -- 每分钟检查
local ok, mem = pcall(skynet.call, service_id, "debug", "MEM")
if ok then
local mem_mb = mem / 1024 / 1024
-- 记录历史
table.insert(memory_history[service_id], {
timestamp = os.time(),
memory = mem_mb,
})
-- 保留最近 1440 条(24 小时)
if #memory_history[service_id] > 1440 then
table.remove(memory_history[service_id], 1)
end
-- 检查是否超过阈值
if mem_mb > threshold_mb then
local alert = {
service_id = service_id,
memory = mem_mb,
threshold = threshold_mb,
timestamp = os.time(),
}
table.insert(alerts, alert)
skynet.error(string.format(
"内存警告:服务 :%08x 使用 %.2f MB,超过阈值 %d MB",
service_id, mem_mb, threshold_mb))
end
end
end
end)
return true
end
function CMD.get_trend(service_id)
local hist = memory_history[service_id]
if not hist or #hist < 2 then
return nil
end
-- 计算内存增长趋势
local first = hist[1]
local last = hist[#hist]
local time_span = last.timestamp - first.timestamp
if time_span == 0 then
return {growth_rate = 0}
end
local growth = last.memory - first.memory
local growth_rate = growth / time_span * 3600 -- MB/小时
return {
current = last.memory,
growth_rate = growth_rate, -- 每小时增长 MB
time_span = time_span,
samples = #hist,
}
end
function CMD.detect_leak(service_id)
local trend = CMD.get_trend(service_id)
if not trend then
return nil, "数据不足"
end
-- 如果内存持续增长(每小时 > 1MB),可能存在泄漏
if trend.growth_rate > 1 then
return {
possible_leak = true,
growth_rate = trend.growth_rate,
current = trend.current,
}
end
return {
possible_leak = false,
growth_rate = trend.growth_rate,
}
end
function CMD.get_alerts()
return alerts
end
function CMD.clear_alerts()
alerts = {}
end
skynet.start(function()
skynet.register(".memory_monitor")
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
对象泄漏追踪
-- lualib/object_tracker.lua
local skynet = require "skynet"
local ObjectTracker = {}
ObjectTracker.__index = ObjectTracker
local tracked_objects = setmetatable({}, {__mode = "k"})
local object_history = {}
function ObjectTracker.new(type_name)
local obj = setmetatable({
type = type_name,
created_at = skynet.time(),
created_by = debug.traceback(),
}, ObjectTracker)
-- 弱引用追踪
tracked_objects[obj] = {
type = type_name,
created_at = skynet.time(),
stack = debug.traceback(),
}
-- 记录创建历史
if not object_history[type_name] then
object_history[type_name] = {created = 0, alive = 0}
end
object_history[type_name].created = object_history[type_name].created + 1
object_history[type_name].alive = object_history[type_name].alive + 1
return obj
end
function ObjectTracker:__gc()
local info = tracked_objects[self]
if info and object_history[info.type] then
object_history[info.type].alive = object_history[info.type].alive - 1
end
end
function ObjectTracker.get_stats()
-- 强制 GC 以更新计数
collectgarbage("collect")
local stats = {}
for type_name, data in pairs(object_history) do
stats[type_name] = {
total_created = data.created,
currently_alive = data.alive,
}
end
return stats
end
function ObjectTracker.print_stats()
local stats = ObjectTracker.get_stats()
skynet.error("===== 对象追踪统计 =====")
skynet.error(string.format("%-30s %15s %15s",
"类型", "总创建数", "当前存活"))
skynet.error(string.rep("-", 70))
for type_name, data in pairs(stats) do
skynet.error(string.format("%-30s %15d %15d",
type_name, data.total_created, data.currently_alive))
end
skynet.error("========================")
end
return ObjectTracker
死锁和循环检测
消息循环检测
-- lualib/call_detector.lua
local skynet = require "skynet"
local CallDetector = {}
local call_graph = {}
local pending_calls = {}
function CallDetector.wrap_call(target_service, ...)
local caller = skynet.self()
local call_id = string.format("%d->%d", caller, target_service)
-- 检查是否形成环
local visited = {}
local current = caller
while current do
if current == target_service then
skynet.error(string.format(
"检测到可能的调用环: %s",
table.concat(visited, " -> ") .. " -> " .. target_service))
break
end
visited[#visited + 1] = string.format(":%08x", current)
current = pending_calls[current]
end
-- 记录调用
pending_calls[caller] = target_service
-- 执行实际调用
local ok, result = pcall(skynet.call, target_service, ...)
-- 清除记录
pending_calls[caller] = nil
if not ok then
error(result)
end
return result
end
return CallDetector
超时检测
-- lualib/timeout_wrapper.lua
local skynet = require "skynet"
local TimeoutWrapper = {}
function TimeoutWrapper.call(timeout_ms, service, ...)
local args = {...}
local result = nil
local error_msg = nil
local completed = false
-- 创建超时定时器
local timer_id = skynet.timeout(timeout_ms, function()
if not completed then
error_msg = string.format("调用超时: %d ms", timeout_ms)
end
end)
-- 在独立协程中执行调用
skynet.fork(function()
local ok, res = pcall(skynet.call, service, table.unpack(args))
completed = true
if ok then
result = res
else
error_msg = res
end
end)
-- 等待完成或超时
while not completed and not error_msg do
skynet.sleep(1)
end
-- 取消定时器
if not completed then
skynet.cancel(timer_id)
end
if error_msg then
error(error_msg)
end
return result
end
return TimeoutWrapper
常见问题诊断
服务无响应
-- lualib/service_checker.lua
local skynet = require "skynet"
local ServiceChecker = {}
function ServiceChecker.ping(service_id, timeout)
timeout = timeout or 100 -- 1 秒
local start_time = skynet.now()
local ok, result = pcall(skynet.call, service_id, "debug", "PING")
local elapsed = skynet.now() - start_time
if not ok then
return {
status = "error",
error = result,
elapsed = elapsed,
}
end
return {
status = "ok",
elapsed = elapsed,
response = result,
}
end
function ServiceChecker.health_check(service_id)
local result = {
ping = ServiceChecker.ping(service_id),
}
-- 获取服务统计
local ok, stat = pcall(skynet.call, service_id, "debug", "STAT")
if ok then
result.stat = stat
-- 检查消息队列是否堆积
if stat.queue > 100 then
result.warning = string.format(
"消息队列堆积: %d 条消息", stat.queue)
end
end
-- 获取内存使用
local ok, mem = pcall(skynet.call, service_id, "debug", "MEM")
if ok then
result.memory = mem
end
return result
end
return ServiceChecker
消息队列堆积诊断
-- lualib/queue_analyzer.lua
local skynet = require "skynet"
local QueueAnalyzer = {}
function QueueAnalyzer.analyze(service_id)
-- 获取服务统计
local stat = skynet.call(service_id, "debug", "STAT")
local analysis = {
queue_size = stat.queue,
cpu_usage = stat.cpu,
message_count = stat.message,
}
-- 分析原因
if stat.queue > 1000 then
analysis.severity = "high"
analysis.possible_causes = {
"服务处理速度过慢",
"消息发送频率过高",
"服务阻塞在某个操作上",
}
elseif stat.queue > 100 then
analysis.severity = "medium"
analysis.possible_causes = {
"服务负载较高",
"某些消息处理耗时较长",
}
else
analysis.severity = "low"
analysis.possible_causes = {"正常范围"}
end
-- 建议
analysis.suggestions = {}
if stat.cpu > 100 then
table.insert(analysis.suggestions, "CPU 使用率高,考虑优化算法")
end
if stat.queue > 1000 then
table.insert(analysis.suggestions, "消息队列过长,考虑水平扩展")
end
return analysis
end
return QueueAnalyzer
总结
本教程全面介绍了 Skynet 的调试技术:
- 日志系统:结构化日志、日志级别控制、文件管理
- 远程调试:调试控制台、自定义命令
- 性能分析:服务监控、代码性能分析
- 内存泄漏:内存监控、对象追踪
- 问题诊断:死锁检测、超时处理、队列分析
良好的调试能力是高效开发的关键。建议:
- 在生产环境启用结构化日志
- 部署性能监控和告警
- 定期进行内存泄漏检查
- 建立完善的调试工具和流程
参考资料
- Skynet 调试文档:https://github.com/cloudwu/skynet/wiki/Debug
- Lua 调试库:https://www.lua.org/manual/5.4/manual.html#6.10
- 性能优化指南:https://www.lua.org/gems/sample.pdf
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。