Skynet 数据库集成实践

全面介绍 Skynet 与各类数据库的集成方案,包括 MySQL、Redis、MongoDB 的使用方法和连接池管理

在游戏服务器开发中,数据持久化是核心需求之一。Skynet 本身不提供数据库驱动,但可以通过 C 模块或 Lua 库与各种数据库集成。本教程将详细介绍如何在 Skynet 中使用 MySQL、Redis、MongoDB 等主流数据库。

数据库选择策略

在开始集成之前,需要了解不同数据库的特点和适用场景:

数据库类型适用场景性能
MySQL关系型结构化数据、事务操作中等
Redis内存缓存、排行榜、会话极高
MongoDB文档型灵活结构、大数据
PostgreSQL关系型复杂查询、JSON 支持中等

推荐方案

  • 核心业务数据 → MySQL/PostgreSQL
  • 缓存和热点数据 → Redis
  • 日志和分析数据 → MongoDB

MySQL 集成

安装 MySQL 驱动

Skynet 社区提供了多个 MySQL 驱动,推荐使用 skynet-fly/mysqllua-resty-mysql

# 下载 MySQL 驱动
cd ~/my-skynet-project
git clone https://github.com/dalinggang/lua-resty-mysql.git lualib/mysql

MySQL 服务封装

-- service/mysql_mgr.lua
local skynet = require "skynet"
local mysql = require "mysql"

local db_config = {
    host = "127.0.0.1",
    port = 3306,
    database = "game_db",
    user = "root",
    password = "password",
    charset = "utf8mb4",
    max_packet_size = 1024 * 1024,
}

local db_conn = nil

-- 建立连接
local function connect()
    local conn, err = mysql.connect(db_config)
    if not conn then
        skynet.error("MySQL 连接失败:", err)
        return false
    end
    
    -- 设置字符集
    conn:query("SET NAMES utf8mb4")
    db_conn = conn
    return true
end

-- 执行查询
local function query(sql)
    if not db_conn then
        if not connect() then
            return nil, "无法连接数据库"
        end
    end
    
    local result, err = db_conn:query(sql)
    if not result then
        -- 可能是连接断开,尝试重连
        if string.find(err, "closed") or string.find(err, "lost") then
            skynet.error("MySQL 连接断开,尝试重连")
            db_conn = nil
            if connect() then
                result, err = db_conn:query(sql)
            end
        end
        
        if not result then
            return nil, err
        end
    end
    
    return result
end

-- 执行带参数的查询(防 SQL 注入)
local function execute(sql, params)
    -- 简单的参数替换实现
    local param_index = 0
    local real_sql = string.gsub(sql, "?", function()
        param_index = param_index + 1
        local value = params[param_index]
        if type(value) == "number" then
            return tostring(value)
        elseif type(value) == "string" then
            return "'" .. db_conn:escape(value) .. "'"
        elseif type(value) == "boolean" then
            return value and "1" or "0"
        elseif value == nil then
            return "NULL"
        end
    end)
    
    return query(real_sql)
end

local CMD = {}

function CMD.query(sql)
    return query(sql)
end

function CMD.execute(sql, params)
    return execute(sql, params)
end

function CMD.insert(table_name, data)
    local fields = {}
    local values = {}
    
    for k, v in pairs(data) do
        fields[#fields + 1] = k
        values[#values + 1] = v
    end
    
    local placeholders = string.rep("?", #values, ",")
    local sql = string.format("INSERT INTO %s (%s) VALUES (%s)",
        table_name,
        table.concat(fields, ","),
        placeholders)
    
    local result, err = execute(sql, values)
    if result then
        return result.insert_id
    end
    return nil, err
end

function CMD.update(table_name, data, where)
    local sets = {}
    local params = {}
    
    for k, v in pairs(data) do
        sets[#sets + 1] = k .. "=?"
        params[#params + 1] = v
    end
    
    local where_sql = ""
    if where then
        local where_parts = {}
        for k, v in pairs(where) do
            where_parts[#where_parts + 1] = k .. "=?"
            params[#params + 1] = v
        end
        where_sql = " WHERE " .. table.concat(where_parts, " AND ")
    end
    
    local sql = string.format("UPDATE %s SET %s%s",
        table_name,
        table.concat(sets, ","),
        where_sql)
    
    return execute(sql, params)
end

function CMD.delete(table_name, where)
    local params = {}
    local where_parts = {}
    
    for k, v in pairs(where) do
        where_parts[#where_parts + 1] = k .. "=?"
        params[#params + 1] = v
    end
    
    local sql = string.format("DELETE FROM %s WHERE %s",
        table_name,
        table.concat(where_parts, " AND "))
    
    return execute(sql, params)
end

function CMD.select(table_name, where, fields, order_by, limit)
    fields = fields or "*"
    local params = {}
    local where_sql = ""
    
    if where then
        local where_parts = {}
        for k, v in pairs(where) do
            where_parts[#where_parts + 1] = k .. "=?"
            params[#params + 1] = v
        end
        where_sql = " WHERE " .. table.concat(where_parts, " AND ")
    end
    
    local order_sql = order_by and (" ORDER BY " .. order_by) or ""
    local limit_sql = limit and (" LIMIT " .. limit) or ""
    
    local sql = string.format("SELECT %s FROM %s%s%s%s",
        fields, table_name, where_sql, order_sql, limit_sql)
    
    return execute(sql, params)
end

skynet.start(function()
    -- 建立初始连接
    if not connect() then
        skynet.error("数据库连接失败,服务启动失败")
        skynet.exit()
        return
    end
    
    skynet.register(".mysql_mgr")
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd], "Unknown command: " .. cmd)
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

MySQL 连接池

对于高并发场景,单个连接可能成为瓶颈,需要使用连接池:

-- lualib/mysql_pool.lua
local skynet = require "skynet"
local mysql = require "mysql"

local MySQLPool = {}
MySQLPool.__index = MySQLPool

function MySQLPool.new(config, max_size)
    return setmetatable({
        config = config,
        max_size = max_size or 10,
        connections = {},
        free_list = {},
    }, MySQLPool)
end

function MySQLPool:create_connection()
    local conn, err = mysql.connect(self.config)
    if not conn then
        return nil, err
    end
    conn:query("SET NAMES utf8mb4")
    return conn
end

function MySQLPool:acquire()
    -- 尝试从空闲列表获取
    if #self.free_list > 0 then
        local conn = table.remove(self.free_list)
        -- 测试连接是否有效
        local ok = pconn:query("SELECT 1")
        if ok then
            return conn
        end
        -- 连接失效,创建新的
    end
    
    -- 创建新连接
    if #self.connections < self.max_size then
        local conn, err = self:create_connection()
        if conn then
            table.insert(self.connections, conn)
            return conn
        end
        return nil, err
    end
    
    -- 等待可用连接
    return nil, "连接池已满"
end

function MySQLPool:release(conn)
    table.insert(self.free_list, conn)
end

function MySQLPool:query(sql)
    local conn, err = self:acquire()
    if not conn then
        return nil, err
    end
    
    local result, err = conn:query(sql)
    self:release(conn)
    return result, err
end

return MySQLPool

MySQL 使用示例

-- service/user_service.lua
local skynet = require "skynet"

local mysql_mgr

local CMD = {}

function CMD.create_user(name, level, gold)
    local id, err = skynet.call(mysql_mgr, "lua", "insert", "users", {
        name = name,
        level = level,
        gold = gold,
        created_at = os.time(),
    })
    return id, err
end

function CMD.get_user(user_id)
    local result, err = skynet.call(mysql_mgr, "lua", "select", 
        "users", {id = user_id})
    if result and #result > 0 then
        return result[1]
    end
    return nil, err
end

function CMD.update_gold(user_id, gold)
    return skynet.call(mysql_mgr, "lua", "update", 
        "users", {gold = gold}, {id = user_id})
end

function CMD.get_top_players(limit)
    return skynet.call(mysql_mgr, "lua", "select",
        "users", nil, "*", "gold DESC", limit or 100)
end

skynet.start(function()
    mysql_mgr = skynet.uniqueservice("mysql_mgr")
    skynet.register(".user_service")
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

Redis 集成

安装 Redis 驱动

cd ~/my-skynet-project
git clone https://github.com/zhandouxiaojiji/skynet-redis.git lualib/redis

Redis 服务封装

-- service/redis_mgr.lua
local skynet = require "skynet"
local redis = require "skynet.db.redis"

local redis_conn = nil

local config = {
    host = "127.0.0.1",
    port = 6379,
    auth = nil,
    db = 0,
}

local function connect()
    local ok, conn = pcall(redis.connect, config)
    if not ok then
        skynet.error("Redis 连接失败:", conn)
        return false
    end
    redis_conn = conn
    return true
end

local CMD = {}

-- 基础字符串操作
function CMD.get(key)
    return redis_conn:get(key)
end

function CMD.set(key, value, ex)
    if ex then
        return redis_conn:setex(key, ex, value)
    else
        return redis_conn:set(key, value)
    end
end

function CMD.del(key)
    return redis_conn:del(key)
end

function CMD.exists(key)
    return redis_conn:exists(key)
end

-- Hash 操作
function CMD.hget(key, field)
    return redis_conn:hget(key, field)
end

function CMD.hset(key, field, value)
    return redis_conn:hset(key, field, value)
end

function CMD.hgetall(key)
    return redis_conn:hgetall(key)
end

function CMD.hmset(key, hash)
    local args = {key}
    for k, v in pairs(hash) do
        args[#args + 1] = k
        args[#args + 1] = tostring(v)
    end
    return redis_conn:hmset(table.unpack(args))
end

-- List 操作
function CMD.lpush(key, value)
    return redis_conn:lpush(key, value)
end

function CMD.rpop(key)
    return redis_conn:rpop(key)
end

function CMD.lrange(key, start, stop)
    return redis_conn:lrange(key, start, stop)
end

-- Set 操作
function CMD.sadd(key, member)
    return redis_conn:sadd(key, member)
end

function CMD.srem(key, member)
    return redis_conn:srem(key, member)
end

function CMD.smembers(key)
    return redis_conn:smembers(key)
end

-- Sorted Set 操作(排行榜)
function CMD.zadd(key, score, member)
    return redis_conn:zadd(key, score, member)
end

function CMD.zrange(key, start, stop, withscores)
    if withscores then
        return redis_conn:zrange(key, start, stop, "WITHSCORES")
    else
        return redis_conn:zrange(key, start, stop)
    end
end

function CMD.zrevrange(key, start, stop, withscores)
    if withscores then
        return redis_conn:zrevrange(key, start, stop, "WITHSCORES")
    else
        return redis_conn:zrevrange(key, start, stop)
    end
end

function CMD.zrank(key, member)
    return redis_conn:zrank(key, member)
end

function CMD.zscore(key, member)
    return redis_conn:zscore(key, member)
end

-- 过期时间
function CMD.expire(key, seconds)
    return redis_conn:expire(key, seconds)
end

function CMD.ttl(key)
    return redis_conn:ttl(key)
end

-- 原子操作
function CMD.incr(key)
    return redis_conn:incr(key)
end

function CMD.incrby(key, increment)
    return redis_conn:incrby(key, increment)
end

function CMD.decr(key)
    return redis_conn:decr(key)
end

-- 分布式锁
function CMD.acquire_lock(key, value, timeout)
    local ok = redis_conn:setnx(key, value)
    if ok == 1 then
        redis_conn:expire(key, timeout)
        return true
    end
    return false
end

function CMD.release_lock(key, value)
    -- 使用 Lua 脚本确保原子性
    local script = [[
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
    ]]
    return redis_conn:eval(script, 1, key, value)
end

skynet.start(function()
    if not connect() then
        skynet.error("Redis 连接失败,服务启动失败")
        skynet.exit()
        return
    end
    
    skynet.register(".redis_mgr")
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd], "Unknown command: " .. cmd)
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

Redis 使用示例:游戏排行榜

-- service/leaderboard.lua
local skynet = require "skynet"
local cjson = require "cjson"

local redis_mgr

local LEADERBOARD_KEY = "leaderboard:score"

local CMD = {}

function CMD.update_score(user_id, score)
    -- 更新排行榜
    skynet.call(redis_mgr, "lua", "zadd", LEADERBOARD_KEY, score, user_id)
    
    -- 同时保存用户详细信息
    local user_key = "user:" .. user_id
    skynet.call(redis_mgr, "lua", "hset", user_key, "score", score)
    skynet.call(redis_mgr, "lua", "expire", user_key, 86400 * 30)  -- 30 天过期
end

function CMD.get_top(n)
    n = n or 100
    local result = skynet.call(redis_mgr, "lua", "zrevrange", 
        LEADERBOARD_KEY, 0, n - 1, true)
    
    local top = {}
    for i = 1, #result, 2 do
        local user_id = result[i]
        local score = tonumber(result[i + 1])
        top[#top + 1] = {
            rank = math.ceil(i / 2),
            user_id = user_id,
            score = score
        }
    end
    return top
end

function CMD.get_rank(user_id)
    local rank = skynet.call(redis_mgr, "lua", "zrank", LEADERBOARD_KEY, user_id)
    local score = skynet.call(redis_mgr, "lua", "zscore", LEADERBOARD_KEY, user_id)
    return rank, score
end

function CMD.get_around(user_id, range)
    range = range or 5
    local rank = skynet.call(redis_mgr, "lua", "zrank", LEADERBOARD_KEY, user_id)
    if not rank then
        return {}
    end
    
    local start = math.max(0, rank - range)
    local stop = rank + range
    
    local result = skynet.call(redis_mgr, "lua", "zrange", 
        LEADERBOARD_KEY, start, stop, true)
    
    local around = {}
    for i = 1, #result, 2 do
        around[#around + 1] = {
            rank = start + math.ceil(i / 2),
            user_id = result[i],
            score = tonumber(result[i + 1])
        }
    end
    return around
end

skynet.start(function()
    redis_mgr = skynet.uniqueservice("redis_mgr")
    skynet.register(".leaderboard")
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

MongoDB 集成

安装 MongoDB 驱动

cd ~/my-skynet-project
git clone https://github.com/Neopall/mongo-lua-driver.git lualib/mongo

MongoDB 服务封装

-- service/mongo_mgr.lua
local skynet = require "skynet"
local mongo = require "mongo"

local db = nil

local config = {
    host = "127.0.0.1",
    port = 27017,
    database = "game_db",
}

local function connect()
    local client = mongo.Client(string.format("mongodb://%s:%d", 
        config.host, config.port))
    
    if not client then
        skynet.error("MongoDB 连接失败")
        return false
    end
    
    db = client:getDatabase(config.database)
    return true
end

local CMD = {}

function CMD.insert(collection, document)
    local coll = db:getCollection(collection)
    local result = coll:insert_one(document)
    return result and result:inserted_id()
end

function CMD.insert_many(collection, documents)
    local coll = db:getCollection(collection)
    local result = coll:insert_many(documents)
    return result and result:inserted_ids()
end

function CMD.find(collection, filter, options)
    local coll = db:getCollection(collection)
    local cursor = coll:find(filter or {}, options or {})
    
    local results = {}
    for doc in cursor:iterator() do
        results[#results + 1] = doc
    end
    return results
end

function CMD.find_one(collection, filter)
    local coll = db:getCollection(collection)
    return coll:find_one(filter or {})
end

function CMD.update(collection, filter, update, upsert)
    local coll = db:getCollection(collection)
    local result = coll:update_many(filter, update, {upsert = upsert})
    return result and result:modified_count()
end

function CMD.update_one(collection, filter, update, upsert)
    local coll = db:getCollection(collection)
    local result = coll:update_one(filter, update, {upsert = upsert})
    return result and result:modified_count()
end

function CMD.delete(collection, filter)
    local coll = db:getCollection(collection)
    local result = coll:delete_many(filter)
    return result and result:deleted_count()
end

function CMD.count(collection, filter)
    local coll = db:getCollection(collection)
    return coll:count_documents(filter or {})
end

function CMD.aggregate(collection, pipeline)
    local coll = db:getCollection(collection)
    local cursor = coll:aggregate(pipeline)
    
    local results = {}
    for doc in cursor:iterator() do
        results[#results + 1] = doc
    end
    return results
end

-- 创建索引
function CMD.create_index(collection, keys, options)
    local coll = db:getCollection(collection)
    return coll:create_index(keys, options or {})
end

skynet.start(function()
    if not connect() then
        skynet.error("MongoDB 连接失败,服务启动失败")
        skynet.exit()
        return
    end
    
    skynet.register(".mongo_mgr")
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd], "Unknown command: " .. cmd)
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

MongoDB 使用示例:游戏日志

-- service/game_log.lua
local skynet = require "skynet"

local mongo_mgr

local CMD = {}

function CMD.log_login(user_id, ip, device)
    return skynet.call(mongo_mgr, "lua", "insert", "login_logs", {
        user_id = user_id,
        ip = ip,
        device = device,
        timestamp = os.time(),
        date = os.date("%Y-%m-%d"),
    })
end

function CMD.log_battle(user_id, battle_type, result, duration)
    return skynet.call(mongo_mgr, "lua", "insert", "battle_logs", {
        user_id = user_id,
        battle_type = battle_type,
        result = result,
        duration = duration,
        timestamp = os.time(),
    })
end

function CMD.log_payment(user_id, order_id, amount, item)
    return skynet.call(mongo_mgr, "lua", "insert", "payment_logs", {
        user_id = user_id,
        order_id = order_id,
        amount = amount,
        item = item,
        timestamp = os.time(),
        status = "pending",
    })
end

function CMD.get_user_logs(user_id, log_type, limit)
    return skynet.call(mongo_mgr, "lua", "find", 
        log_type .. "_logs",
        {user_id = user_id},
        {limit = limit or 100, sort = {timestamp = -1}})
end

function CMD.get_daily_stats(date)
    -- 聚合查询:按日期统计登录用户数
    return skynet.call(mongo_mgr, "lua", "aggregate", "login_logs", {
        {$match = {date = date}},
        {$group = {
            _id = "$user_id",
            count = {$sum = 1}
        }},
        {$count = "total"}
    })
end

skynet.start(function()
    mongo_mgr = skynet.uniqueservice("mongo_mgr")
    skynet.register(".game_log")
    
    -- 创建必要的索引
    skynet.call(mongo_mgr, "lua", "create_index", "login_logs", 
        {user_id = 1, timestamp = -1})
    skynet.call(mongo_mgr, "lua", "create_index", "battle_logs",
        {user_id = 1, timestamp = -1})
    skynet.call(mongo_mgr, "lua", "create_index", "payment_logs",
        {order_id = 1}, {unique = true})
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

数据缓存策略

多级缓存架构

请求 → Redis 缓存(L1)→ MySQL 数据库(L2)→ 回写缓存
-- lualib/cache_helper.lua
local skynet = require "skynet"

local CacheHelper = {}

local redis_mgr
local mysql_mgr

function CacheHelper.init()
    redis_mgr = skynet.uniqueservice("redis_mgr")
    mysql_mgr = skynet.uniqueservice("mysql_mgr")
end

-- 缓存优先查询
function CacheHelper.get_user(user_id)
    local cache_key = "user:" .. user_id
    
    -- 1. 先查 Redis 缓存
    local cached = skynet.call(redis_mgr, "lua", "get", cache_key)
    if cached then
        return cjson.decode(cached)
    end
    
    -- 2. 缓存未命中,查询 MySQL
    local result = skynet.call(mysql_mgr, "lua", "select",
        "users", {id = user_id})
    
    if result and #result > 0 then
        local user = result[1]
        
        -- 3. 写入缓存(5 分钟过期)
        skynet.call(redis_mgr, "lua", "set", 
            cache_key, cjson.encode(user), 300)
        
        return user
    end
    
    return nil
end

-- 更新时同步更新缓存
function CacheHelper.update_user(user_id, data)
    -- 更新数据库
    skynet.call(mysql_mgr, "lua", "update", "users", data, {id = user_id})
    
    -- 失效缓存
    skynet.call(redis_mgr, "lua", "del", "user:" .. user_id)
end

return CacheHelper

数据一致性保证

事务处理

local CMD = {}

-- 转账事务
function CMD.transfer(from_id, to_id, amount)
    -- 开启事务
    skynet.call(mysql_mgr, "lua", "query", "START TRANSACTION")
    
    local ok, err = pcall(function()
        -- 扣除发起方金额
        skynet.call(mysql_mgr, "lua", "execute",
            "UPDATE users SET gold = gold - ? WHERE id = ? AND gold >= ?",
            {amount, from_id, amount})
        
        -- 增加接收方金额
        skynet.call(mysql_mgr, "lua", "execute",
            "UPDATE users SET gold = gold + ? WHERE id = ?",
            {amount, to_id})
        
        -- 记录交易日志
        skynet.call(mysql_mgr, "lua", "insert", "transactions", {
            from_id = from_id,
            to_id = to_id,
            amount = amount,
            timestamp = os.time(),
        })
    end)
    
    if ok then
        -- 提交事务
        skynet.call(mysql_mgr, "lua", "query", "COMMIT")
        return true
    else
        -- 回滚事务
        skynet.call(mysql_mgr, "lua", "query", "ROLLBACK")
        return false, err
    end
end

分布式锁

-- 使用 Redis 实现分布式锁
function CMD.safe_transfer(from_id, to_id, amount)
    local lock_key = "lock:transfer:" .. math.min(from_id, to_id) .. ":" .. math.max(from_id, to_id)
    local lock_value = tostring(skynet.time())
    
    -- 获取锁(5 秒超时)
    local locked = skynet.call(redis_mgr, "lua", "acquire_lock", lock_key, lock_value, 5)
    if not locked then
        return false, "获取锁失败"
    end
    
    -- 执行转账
    local ok, err = CMD.transfer(from_id, to_id, amount)
    
    -- 释放锁
    skynet.call(redis_mgr, "lua", "release_lock", lock_key, lock_value)
    
    return ok, err
end

总结

本教程介绍了 Skynet 与主流数据库的集成方案:

  1. MySQL:适合结构化数据,支持事务
  2. Redis:适合缓存和热点数据
  3. MongoDB:适合灵活结构和大数据

最佳实践

  • 封装数据库访问为独立服务
  • 使用连接池提高并发能力
  • 合理设计缓存策略
  • 注意数据一致性和事务处理

参考资料

  1. Skynet MySQL 驱动:https://github.com/dalinggang/lua-resty-mysql
  2. Skynet Redis 驱动:https://github.com/zhandouxiaojiji/skynet-redis
  3. Redis 官方文档:https://redis.io/documentation
  4. MySQL 官方文档:https://dev.mysql.com/doc/

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页