在游戏服务器开发中,数据持久化是核心需求之一。Skynet 本身不提供数据库驱动,但可以通过 C 模块或 Lua 库与各种数据库集成。本教程将详细介绍如何在 Skynet 中使用 MySQL、Redis、MongoDB 等主流数据库。
数据库选择策略
在开始集成之前,需要了解不同数据库的特点和适用场景:
| 数据库 | 类型 | 适用场景 | 性能 |
|---|---|---|---|
| MySQL | 关系型 | 结构化数据、事务操作 | 中等 |
| Redis | 内存 | 缓存、排行榜、会话 | 极高 |
| MongoDB | 文档型 | 灵活结构、大数据 | 高 |
| PostgreSQL | 关系型 | 复杂查询、JSON 支持 | 中等 |
推荐方案:
- 核心业务数据 → MySQL/PostgreSQL
- 缓存和热点数据 → Redis
- 日志和分析数据 → MongoDB
MySQL 集成
安装 MySQL 驱动
Skynet 社区提供了多个 MySQL 驱动,推荐使用 skynet-fly/mysql 或 lua-resty-mysql:
# 下载 MySQL 驱动
cd ~/my-skynet-project
git clone https://github.com/dalinggang/lua-resty-mysql.git lualib/mysql
MySQL 服务封装
-- service/mysql_mgr.lua
local skynet = require "skynet"
local mysql = require "mysql"
local db_config = {
host = "127.0.0.1",
port = 3306,
database = "game_db",
user = "root",
password = "password",
charset = "utf8mb4",
max_packet_size = 1024 * 1024,
}
local db_conn = nil
-- 建立连接
local function connect()
local conn, err = mysql.connect(db_config)
if not conn then
skynet.error("MySQL 连接失败:", err)
return false
end
-- 设置字符集
conn:query("SET NAMES utf8mb4")
db_conn = conn
return true
end
-- 执行查询
local function query(sql)
if not db_conn then
if not connect() then
return nil, "无法连接数据库"
end
end
local result, err = db_conn:query(sql)
if not result then
-- 可能是连接断开,尝试重连
if string.find(err, "closed") or string.find(err, "lost") then
skynet.error("MySQL 连接断开,尝试重连")
db_conn = nil
if connect() then
result, err = db_conn:query(sql)
end
end
if not result then
return nil, err
end
end
return result
end
-- 执行带参数的查询(防 SQL 注入)
local function execute(sql, params)
-- 简单的参数替换实现
local param_index = 0
local real_sql = string.gsub(sql, "?", function()
param_index = param_index + 1
local value = params[param_index]
if type(value) == "number" then
return tostring(value)
elseif type(value) == "string" then
return "'" .. db_conn:escape(value) .. "'"
elseif type(value) == "boolean" then
return value and "1" or "0"
elseif value == nil then
return "NULL"
end
end)
return query(real_sql)
end
local CMD = {}
function CMD.query(sql)
return query(sql)
end
function CMD.execute(sql, params)
return execute(sql, params)
end
function CMD.insert(table_name, data)
local fields = {}
local values = {}
for k, v in pairs(data) do
fields[#fields + 1] = k
values[#values + 1] = v
end
local placeholders = string.rep("?", #values, ",")
local sql = string.format("INSERT INTO %s (%s) VALUES (%s)",
table_name,
table.concat(fields, ","),
placeholders)
local result, err = execute(sql, values)
if result then
return result.insert_id
end
return nil, err
end
function CMD.update(table_name, data, where)
local sets = {}
local params = {}
for k, v in pairs(data) do
sets[#sets + 1] = k .. "=?"
params[#params + 1] = v
end
local where_sql = ""
if where then
local where_parts = {}
for k, v in pairs(where) do
where_parts[#where_parts + 1] = k .. "=?"
params[#params + 1] = v
end
where_sql = " WHERE " .. table.concat(where_parts, " AND ")
end
local sql = string.format("UPDATE %s SET %s%s",
table_name,
table.concat(sets, ","),
where_sql)
return execute(sql, params)
end
function CMD.delete(table_name, where)
local params = {}
local where_parts = {}
for k, v in pairs(where) do
where_parts[#where_parts + 1] = k .. "=?"
params[#params + 1] = v
end
local sql = string.format("DELETE FROM %s WHERE %s",
table_name,
table.concat(where_parts, " AND "))
return execute(sql, params)
end
function CMD.select(table_name, where, fields, order_by, limit)
fields = fields or "*"
local params = {}
local where_sql = ""
if where then
local where_parts = {}
for k, v in pairs(where) do
where_parts[#where_parts + 1] = k .. "=?"
params[#params + 1] = v
end
where_sql = " WHERE " .. table.concat(where_parts, " AND ")
end
local order_sql = order_by and (" ORDER BY " .. order_by) or ""
local limit_sql = limit and (" LIMIT " .. limit) or ""
local sql = string.format("SELECT %s FROM %s%s%s%s",
fields, table_name, where_sql, order_sql, limit_sql)
return execute(sql, params)
end
skynet.start(function()
-- 建立初始连接
if not connect() then
skynet.error("数据库连接失败,服务启动失败")
skynet.exit()
return
end
skynet.register(".mysql_mgr")
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd], "Unknown command: " .. cmd)
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
MySQL 连接池
对于高并发场景,单个连接可能成为瓶颈,需要使用连接池:
-- lualib/mysql_pool.lua
local skynet = require "skynet"
local mysql = require "mysql"
local MySQLPool = {}
MySQLPool.__index = MySQLPool
function MySQLPool.new(config, max_size)
return setmetatable({
config = config,
max_size = max_size or 10,
connections = {},
free_list = {},
}, MySQLPool)
end
function MySQLPool:create_connection()
local conn, err = mysql.connect(self.config)
if not conn then
return nil, err
end
conn:query("SET NAMES utf8mb4")
return conn
end
function MySQLPool:acquire()
-- 尝试从空闲列表获取
if #self.free_list > 0 then
local conn = table.remove(self.free_list)
-- 测试连接是否有效
local ok = pconn:query("SELECT 1")
if ok then
return conn
end
-- 连接失效,创建新的
end
-- 创建新连接
if #self.connections < self.max_size then
local conn, err = self:create_connection()
if conn then
table.insert(self.connections, conn)
return conn
end
return nil, err
end
-- 等待可用连接
return nil, "连接池已满"
end
function MySQLPool:release(conn)
table.insert(self.free_list, conn)
end
function MySQLPool:query(sql)
local conn, err = self:acquire()
if not conn then
return nil, err
end
local result, err = conn:query(sql)
self:release(conn)
return result, err
end
return MySQLPool
MySQL 使用示例
-- service/user_service.lua
local skynet = require "skynet"
local mysql_mgr
local CMD = {}
function CMD.create_user(name, level, gold)
local id, err = skynet.call(mysql_mgr, "lua", "insert", "users", {
name = name,
level = level,
gold = gold,
created_at = os.time(),
})
return id, err
end
function CMD.get_user(user_id)
local result, err = skynet.call(mysql_mgr, "lua", "select",
"users", {id = user_id})
if result and #result > 0 then
return result[1]
end
return nil, err
end
function CMD.update_gold(user_id, gold)
return skynet.call(mysql_mgr, "lua", "update",
"users", {gold = gold}, {id = user_id})
end
function CMD.get_top_players(limit)
return skynet.call(mysql_mgr, "lua", "select",
"users", nil, "*", "gold DESC", limit or 100)
end
skynet.start(function()
mysql_mgr = skynet.uniqueservice("mysql_mgr")
skynet.register(".user_service")
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
Redis 集成
安装 Redis 驱动
cd ~/my-skynet-project
git clone https://github.com/zhandouxiaojiji/skynet-redis.git lualib/redis
Redis 服务封装
-- service/redis_mgr.lua
local skynet = require "skynet"
local redis = require "skynet.db.redis"
local redis_conn = nil
local config = {
host = "127.0.0.1",
port = 6379,
auth = nil,
db = 0,
}
local function connect()
local ok, conn = pcall(redis.connect, config)
if not ok then
skynet.error("Redis 连接失败:", conn)
return false
end
redis_conn = conn
return true
end
local CMD = {}
-- 基础字符串操作
function CMD.get(key)
return redis_conn:get(key)
end
function CMD.set(key, value, ex)
if ex then
return redis_conn:setex(key, ex, value)
else
return redis_conn:set(key, value)
end
end
function CMD.del(key)
return redis_conn:del(key)
end
function CMD.exists(key)
return redis_conn:exists(key)
end
-- Hash 操作
function CMD.hget(key, field)
return redis_conn:hget(key, field)
end
function CMD.hset(key, field, value)
return redis_conn:hset(key, field, value)
end
function CMD.hgetall(key)
return redis_conn:hgetall(key)
end
function CMD.hmset(key, hash)
local args = {key}
for k, v in pairs(hash) do
args[#args + 1] = k
args[#args + 1] = tostring(v)
end
return redis_conn:hmset(table.unpack(args))
end
-- List 操作
function CMD.lpush(key, value)
return redis_conn:lpush(key, value)
end
function CMD.rpop(key)
return redis_conn:rpop(key)
end
function CMD.lrange(key, start, stop)
return redis_conn:lrange(key, start, stop)
end
-- Set 操作
function CMD.sadd(key, member)
return redis_conn:sadd(key, member)
end
function CMD.srem(key, member)
return redis_conn:srem(key, member)
end
function CMD.smembers(key)
return redis_conn:smembers(key)
end
-- Sorted Set 操作(排行榜)
function CMD.zadd(key, score, member)
return redis_conn:zadd(key, score, member)
end
function CMD.zrange(key, start, stop, withscores)
if withscores then
return redis_conn:zrange(key, start, stop, "WITHSCORES")
else
return redis_conn:zrange(key, start, stop)
end
end
function CMD.zrevrange(key, start, stop, withscores)
if withscores then
return redis_conn:zrevrange(key, start, stop, "WITHSCORES")
else
return redis_conn:zrevrange(key, start, stop)
end
end
function CMD.zrank(key, member)
return redis_conn:zrank(key, member)
end
function CMD.zscore(key, member)
return redis_conn:zscore(key, member)
end
-- 过期时间
function CMD.expire(key, seconds)
return redis_conn:expire(key, seconds)
end
function CMD.ttl(key)
return redis_conn:ttl(key)
end
-- 原子操作
function CMD.incr(key)
return redis_conn:incr(key)
end
function CMD.incrby(key, increment)
return redis_conn:incrby(key, increment)
end
function CMD.decr(key)
return redis_conn:decr(key)
end
-- 分布式锁
function CMD.acquire_lock(key, value, timeout)
local ok = redis_conn:setnx(key, value)
if ok == 1 then
redis_conn:expire(key, timeout)
return true
end
return false
end
function CMD.release_lock(key, value)
-- 使用 Lua 脚本确保原子性
local script = [[
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
]]
return redis_conn:eval(script, 1, key, value)
end
skynet.start(function()
if not connect() then
skynet.error("Redis 连接失败,服务启动失败")
skynet.exit()
return
end
skynet.register(".redis_mgr")
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd], "Unknown command: " .. cmd)
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
Redis 使用示例:游戏排行榜
-- service/leaderboard.lua
local skynet = require "skynet"
local cjson = require "cjson"
local redis_mgr
local LEADERBOARD_KEY = "leaderboard:score"
local CMD = {}
function CMD.update_score(user_id, score)
-- 更新排行榜
skynet.call(redis_mgr, "lua", "zadd", LEADERBOARD_KEY, score, user_id)
-- 同时保存用户详细信息
local user_key = "user:" .. user_id
skynet.call(redis_mgr, "lua", "hset", user_key, "score", score)
skynet.call(redis_mgr, "lua", "expire", user_key, 86400 * 30) -- 30 天过期
end
function CMD.get_top(n)
n = n or 100
local result = skynet.call(redis_mgr, "lua", "zrevrange",
LEADERBOARD_KEY, 0, n - 1, true)
local top = {}
for i = 1, #result, 2 do
local user_id = result[i]
local score = tonumber(result[i + 1])
top[#top + 1] = {
rank = math.ceil(i / 2),
user_id = user_id,
score = score
}
end
return top
end
function CMD.get_rank(user_id)
local rank = skynet.call(redis_mgr, "lua", "zrank", LEADERBOARD_KEY, user_id)
local score = skynet.call(redis_mgr, "lua", "zscore", LEADERBOARD_KEY, user_id)
return rank, score
end
function CMD.get_around(user_id, range)
range = range or 5
local rank = skynet.call(redis_mgr, "lua", "zrank", LEADERBOARD_KEY, user_id)
if not rank then
return {}
end
local start = math.max(0, rank - range)
local stop = rank + range
local result = skynet.call(redis_mgr, "lua", "zrange",
LEADERBOARD_KEY, start, stop, true)
local around = {}
for i = 1, #result, 2 do
around[#around + 1] = {
rank = start + math.ceil(i / 2),
user_id = result[i],
score = tonumber(result[i + 1])
}
end
return around
end
skynet.start(function()
redis_mgr = skynet.uniqueservice("redis_mgr")
skynet.register(".leaderboard")
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
MongoDB 集成
安装 MongoDB 驱动
cd ~/my-skynet-project
git clone https://github.com/Neopall/mongo-lua-driver.git lualib/mongo
MongoDB 服务封装
-- service/mongo_mgr.lua
local skynet = require "skynet"
local mongo = require "mongo"
local db = nil
local config = {
host = "127.0.0.1",
port = 27017,
database = "game_db",
}
local function connect()
local client = mongo.Client(string.format("mongodb://%s:%d",
config.host, config.port))
if not client then
skynet.error("MongoDB 连接失败")
return false
end
db = client:getDatabase(config.database)
return true
end
local CMD = {}
function CMD.insert(collection, document)
local coll = db:getCollection(collection)
local result = coll:insert_one(document)
return result and result:inserted_id()
end
function CMD.insert_many(collection, documents)
local coll = db:getCollection(collection)
local result = coll:insert_many(documents)
return result and result:inserted_ids()
end
function CMD.find(collection, filter, options)
local coll = db:getCollection(collection)
local cursor = coll:find(filter or {}, options or {})
local results = {}
for doc in cursor:iterator() do
results[#results + 1] = doc
end
return results
end
function CMD.find_one(collection, filter)
local coll = db:getCollection(collection)
return coll:find_one(filter or {})
end
function CMD.update(collection, filter, update, upsert)
local coll = db:getCollection(collection)
local result = coll:update_many(filter, update, {upsert = upsert})
return result and result:modified_count()
end
function CMD.update_one(collection, filter, update, upsert)
local coll = db:getCollection(collection)
local result = coll:update_one(filter, update, {upsert = upsert})
return result and result:modified_count()
end
function CMD.delete(collection, filter)
local coll = db:getCollection(collection)
local result = coll:delete_many(filter)
return result and result:deleted_count()
end
function CMD.count(collection, filter)
local coll = db:getCollection(collection)
return coll:count_documents(filter or {})
end
function CMD.aggregate(collection, pipeline)
local coll = db:getCollection(collection)
local cursor = coll:aggregate(pipeline)
local results = {}
for doc in cursor:iterator() do
results[#results + 1] = doc
end
return results
end
-- 创建索引
function CMD.create_index(collection, keys, options)
local coll = db:getCollection(collection)
return coll:create_index(keys, options or {})
end
skynet.start(function()
if not connect() then
skynet.error("MongoDB 连接失败,服务启动失败")
skynet.exit()
return
end
skynet.register(".mongo_mgr")
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd], "Unknown command: " .. cmd)
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
MongoDB 使用示例:游戏日志
-- service/game_log.lua
local skynet = require "skynet"
local mongo_mgr
local CMD = {}
function CMD.log_login(user_id, ip, device)
return skynet.call(mongo_mgr, "lua", "insert", "login_logs", {
user_id = user_id,
ip = ip,
device = device,
timestamp = os.time(),
date = os.date("%Y-%m-%d"),
})
end
function CMD.log_battle(user_id, battle_type, result, duration)
return skynet.call(mongo_mgr, "lua", "insert", "battle_logs", {
user_id = user_id,
battle_type = battle_type,
result = result,
duration = duration,
timestamp = os.time(),
})
end
function CMD.log_payment(user_id, order_id, amount, item)
return skynet.call(mongo_mgr, "lua", "insert", "payment_logs", {
user_id = user_id,
order_id = order_id,
amount = amount,
item = item,
timestamp = os.time(),
status = "pending",
})
end
function CMD.get_user_logs(user_id, log_type, limit)
return skynet.call(mongo_mgr, "lua", "find",
log_type .. "_logs",
{user_id = user_id},
{limit = limit or 100, sort = {timestamp = -1}})
end
function CMD.get_daily_stats(date)
-- 聚合查询:按日期统计登录用户数
return skynet.call(mongo_mgr, "lua", "aggregate", "login_logs", {
{$match = {date = date}},
{$group = {
_id = "$user_id",
count = {$sum = 1}
}},
{$count = "total"}
})
end
skynet.start(function()
mongo_mgr = skynet.uniqueservice("mongo_mgr")
skynet.register(".game_log")
-- 创建必要的索引
skynet.call(mongo_mgr, "lua", "create_index", "login_logs",
{user_id = 1, timestamp = -1})
skynet.call(mongo_mgr, "lua", "create_index", "battle_logs",
{user_id = 1, timestamp = -1})
skynet.call(mongo_mgr, "lua", "create_index", "payment_logs",
{order_id = 1}, {unique = true})
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
数据缓存策略
多级缓存架构
请求 → Redis 缓存(L1)→ MySQL 数据库(L2)→ 回写缓存
-- lualib/cache_helper.lua
local skynet = require "skynet"
local CacheHelper = {}
local redis_mgr
local mysql_mgr
function CacheHelper.init()
redis_mgr = skynet.uniqueservice("redis_mgr")
mysql_mgr = skynet.uniqueservice("mysql_mgr")
end
-- 缓存优先查询
function CacheHelper.get_user(user_id)
local cache_key = "user:" .. user_id
-- 1. 先查 Redis 缓存
local cached = skynet.call(redis_mgr, "lua", "get", cache_key)
if cached then
return cjson.decode(cached)
end
-- 2. 缓存未命中,查询 MySQL
local result = skynet.call(mysql_mgr, "lua", "select",
"users", {id = user_id})
if result and #result > 0 then
local user = result[1]
-- 3. 写入缓存(5 分钟过期)
skynet.call(redis_mgr, "lua", "set",
cache_key, cjson.encode(user), 300)
return user
end
return nil
end
-- 更新时同步更新缓存
function CacheHelper.update_user(user_id, data)
-- 更新数据库
skynet.call(mysql_mgr, "lua", "update", "users", data, {id = user_id})
-- 失效缓存
skynet.call(redis_mgr, "lua", "del", "user:" .. user_id)
end
return CacheHelper
数据一致性保证
事务处理
local CMD = {}
-- 转账事务
function CMD.transfer(from_id, to_id, amount)
-- 开启事务
skynet.call(mysql_mgr, "lua", "query", "START TRANSACTION")
local ok, err = pcall(function()
-- 扣除发起方金额
skynet.call(mysql_mgr, "lua", "execute",
"UPDATE users SET gold = gold - ? WHERE id = ? AND gold >= ?",
{amount, from_id, amount})
-- 增加接收方金额
skynet.call(mysql_mgr, "lua", "execute",
"UPDATE users SET gold = gold + ? WHERE id = ?",
{amount, to_id})
-- 记录交易日志
skynet.call(mysql_mgr, "lua", "insert", "transactions", {
from_id = from_id,
to_id = to_id,
amount = amount,
timestamp = os.time(),
})
end)
if ok then
-- 提交事务
skynet.call(mysql_mgr, "lua", "query", "COMMIT")
return true
else
-- 回滚事务
skynet.call(mysql_mgr, "lua", "query", "ROLLBACK")
return false, err
end
end
分布式锁
-- 使用 Redis 实现分布式锁
function CMD.safe_transfer(from_id, to_id, amount)
local lock_key = "lock:transfer:" .. math.min(from_id, to_id) .. ":" .. math.max(from_id, to_id)
local lock_value = tostring(skynet.time())
-- 获取锁(5 秒超时)
local locked = skynet.call(redis_mgr, "lua", "acquire_lock", lock_key, lock_value, 5)
if not locked then
return false, "获取锁失败"
end
-- 执行转账
local ok, err = CMD.transfer(from_id, to_id, amount)
-- 释放锁
skynet.call(redis_mgr, "lua", "release_lock", lock_key, lock_value)
return ok, err
end
总结
本教程介绍了 Skynet 与主流数据库的集成方案:
- MySQL:适合结构化数据,支持事务
- Redis:适合缓存和热点数据
- MongoDB:适合灵活结构和大数据
最佳实践:
- 封装数据库访问为独立服务
- 使用连接池提高并发能力
- 合理设计缓存策略
- 注意数据一致性和事务处理
参考资料
- Skynet MySQL 驱动:https://github.com/dalinggang/lua-resty-mysql
- Skynet Redis 驱动:https://github.com/zhandouxiaojiji/skynet-redis
- Redis 官方文档:https://redis.io/documentation
- MySQL 官方文档:https://dev.mysql.com/doc/
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。