Redis 集成:高性能缓存与数据存储

学习在 Go 中使用 Redis,掌握缓存、会话存储、消息队列等常见应用场景

Redis 集成:高性能缓存与数据存储

Redis 是一个高性能的内存数据库,广泛用于缓存、会话存储、消息队列等场景。在 Go 应用中集成 Redis,可以显著提升系统性能和响应速度。

本文将介绍如何使用 go-redis 库与 Redis 交互,并实现常见的应用场景。

安装 go-redis

go get github.com/redis/go-redis/v9

基础连接

package main

import (
    "context"
    "fmt"
    "log"
    
    "github.com/redis/go-redis/v9"
)

var ctx = context.Background()

func main() {
    // 创建 Redis 客户端
    rdb := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "", // 无密码
        DB:       0,  // 默认数据库
    })
    
    // 测试连接
    pong, err := rdb.Ping(ctx).Result()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("Connected to Redis:", pong)
}

基本数据类型操作

String(字符串)

// SET
err := rdb.Set(ctx, "username", "alice", 0).Err()
if err != nil {
    panic(err)
}

// GET
val, err := rdb.Get(ctx, "username").Result()
if err == redis.Nil {
    fmt.Println("Key does not exist")
} else if err != nil {
    panic(err)
} else {
    fmt.Println("username:", val)
}

// SET with expiration (1 hour)
err = rdb.Set(ctx, "session:abc123", "user_data", 1*time.Hour).Err()

// SETNX (SET if Not eXists)
ok, err := rdb.SetNX(ctx, "counter", 1, 0).Result()
if ok {
    fmt.Println("Key was set")
}

// INCR / DECR
rdb.Set(ctx, "visits", 0, 0)
rdb.Incr(ctx, "visits")
rdb.IncrBy(ctx, "visits", 10)
visits, _ := rdb.Get(ctx, "visits").Int()
fmt.Println("Visits:", visits) // 11

Hash(哈希)

// HSET
rdb.HSet(ctx, "user:123", map[string]interface{}{
    "name":  "Alice",
    "email": "alice@example.com",
    "age":   30,
})

// HGET
name, err := rdb.HGet(ctx, "user:123", "name").Result()
fmt.Println("Name:", name)

// HGETALL
user, err := rdb.HGetAll(ctx, "user:123").Result()
fmt.Println("User:", user)

// HMGET (get multiple fields)
vals, err := rdb.HMGet(ctx, "user:123", "name", "email").Result()
fmt.Println("Values:", vals)

// HINCRBY
rdb.HIncrBy(ctx, "user:123", "login_count", 1)

List(列表)

// LPUSH (left push)
rdb.LPush(ctx, "queue:tasks", "task1", "task2", "task3")

// RPUSH (right push)
rdb.RPush(ctx, "queue:tasks", "task4")

// LPOP (left pop)
task, err := rdb.LPop(ctx, "queue:tasks").Result()
fmt.Println("Task:", task)

// RPOP (right pop)
task, err = rdb.RPop(ctx, "queue:tasks").Result()

// LRANGE (get range)
tasks, err := rdb.LRange(ctx, "queue:tasks", 0, -1).Result()
fmt.Println("All tasks:", tasks)

// LLEN (length)
length, err := rdb.LLen(ctx, "queue:tasks").Result()
fmt.Println("Queue length:", length)

// BRPOP (blocking pop, timeout 10 seconds)
result, err := rdb.BRPop(ctx, 10*time.Second, "queue:tasks").Result()
if err == redis.Nil {
    fmt.Println("Timeout")
} else {
    fmt.Println("Popped:", result[1])
}

Set(集合)

// SADD
rdb.SAdd(ctx, "tags:post:1", "go", "redis", "tutorial")

// SMEMBERS
tags, err := rdb.SMembers(ctx, "tags:post:1").Result()
fmt.Println("Tags:", tags)

// SISMEMBER
isMember, err := rdb.SIsMember(ctx, "tags:post:1", "go").Result()
fmt.Println("Has 'go' tag:", isMember)

// SINTER (intersection)
rdb.SAdd(ctx, "tags:post:2", "go", "web", "api")
common, err := rdb.SInter(ctx, "tags:post:1", "tags:post:2").Result()
fmt.Println("Common tags:", common)

// SUNION (union)
all, err := rdb.SUnion(ctx, "tags:post:1", "tags:post:2").Result()
fmt.Println("All tags:", all)

// SREM (remove)
rdb.SRem(ctx, "tags:post:1", "tutorial")

Sorted Set(有序集合)

// ZADD
rdb.ZAdd(ctx, "leaderboard", 
    redis.Z{Score: 100, Member: "alice"},
    redis.Z{Score: 200, Member: "bob"},
    redis.Z{Score: 150, Member: "charlie"},
)

// ZRANGE (get range by index, ascending)
leaders, err := rdb.ZRange(ctx, "leaderboard", 0, -1).Result()
fmt.Println("Leaders:", leaders)

// ZREVRANGE (get range by index, descending)
topLeaders, err := rdb.ZRevRange(ctx, "leaderboard", 0, 2).Result()
fmt.Println("Top 3:", topLeaders)

// ZRANGEBYSCORE (get range by score)
highScorers, err := rdb.ZRangeByScore(ctx, "leaderboard", &redis.ZRangeBy{
    Min: "150",
    Max: "+inf",
}).Result()
fmt.Println("High scorers:", highScorers)

// ZINCRBY (increment score)
rdb.ZIncrBy(ctx, "leaderboard", 50, "alice")

// ZRANK (get rank, 0-based)
rank, err := rdb.ZRank(ctx, "leaderboard", "bob").Result()
fmt.Println("Bob's rank:", rank+1)

实战应用

1. 缓存层

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "time"
    
    "github.com/redis/go-redis/v9"
)

type Cache struct {
    rdb *redis.Client
}

func NewCache(rdb *redis.Client) *Cache {
    return &Cache{rdb: rdb}
}

// Get 从缓存获取
func (c *Cache) Get(ctx context.Context, key string, dest interface{}) error {
    val, err := c.rdb.Get(ctx, key).Result()
    if err == redis.Nil {
        return fmt.Errorf("cache miss")
    }
    if err != nil {
        return err
    }
    
    return json.Unmarshal([]byte(val), dest)
}

// Set 设置缓存
func (c *Cache) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
    data, err := json.Marshal(value)
    if err != nil {
        return err
    }
    
    return c.rdb.Set(ctx, key, data, expiration).Err()
}

// Delete 删除缓存
func (c *Cache) Delete(ctx context.Context, key string) error {
    return c.rdb.Del(ctx, key).Err()
}

// GetOrSet 获取或设置缓存
func (c *Cache) GetOrSet(ctx context.Context, key string, dest interface{}, 
    loader func() (interface{}, error), expiration time.Duration) error {
    
    // 尝试从缓存获取
    err := c.Get(ctx, key, dest)
    if err == nil {
        return nil // 缓存命中
    }
    
    // 缓存未命中,从数据源加载
    value, err := loader()
    if err != nil {
        return err
    }
    
    // 设置缓存
    if err := c.Set(ctx, key, value, expiration); err != nil {
        return err
    }
    
    // 将值复制到 dest
    data, _ := json.Marshal(value)
    return json.Unmarshal(data, dest)
}

// 使用示例
type User struct {
    ID    int    `json:"id"`
    Name  string `json:"name"`
    Email string `json:"email"`
}

func getUserFromDB(id int) (*User, error) {
    // 模拟数据库查询
    time.Sleep(100 * time.Millisecond)
    return &User{ID: id, Name: "Alice", Email: "alice@example.com"}, nil
}

func main() {
    rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    cache := NewCache(rdb)
    ctx := context.Background()
    
    var user User
    err := cache.GetOrSet(ctx, "user:123", &user, func() (interface{}, error) {
        return getUserFromDB(123)
    }, 1*time.Hour)
    
    if err != nil {
        panic(err)
    }
    
    fmt.Printf("User: %+v\n", user)
}

2. 会话存储

package main

import (
    "context"
    "crypto/rand"
    "encoding/hex"
    "encoding/json"
    "net/http"
    "time"
    
    "github.com/redis/go-redis/v9"
)

type Session struct {
    UserID    int       `json:"user_id"`
    Username  string    `json:"username"`
    CreatedAt time.Time `json:"created_at"`
}

type SessionStore struct {
    rdb *redis.Client
}

func NewSessionStore(rdb *redis.Client) *SessionStore {
    return &SessionStore{rdb: rdb}
}

func (s *SessionStore) generateSessionID() string {
    bytes := make([]byte, 32)
    rand.Read(bytes)
    return hex.EncodeToString(bytes)
}

// CreateSession 创建会话
func (s *SessionStore) CreateSession(ctx context.Context, userID int, username string) (string, error) {
    sessionID := s.generateSessionID()
    
    session := Session{
        UserID:    userID,
        Username:  username,
        CreatedAt: time.Now(),
    }
    
    data, err := json.Marshal(session)
    if err != nil {
        return "", err
    }
    
    key := "session:" + sessionID
    err = s.rdb.Set(ctx, key, data, 24*time.Hour).Err()
    if err != nil {
        return "", err
    }
    
    return sessionID, nil
}

// GetSession 获取会话
func (s *SessionStore) GetSession(ctx context.Context, sessionID string) (*Session, error) {
    key := "session:" + sessionID
    data, err := s.rdb.Get(ctx, key).Result()
    if err == redis.Nil {
        return nil, nil
    }
    if err != nil {
        return nil, err
    }
    
    var session Session
    err = json.Unmarshal([]byte(data), &session)
    if err != nil {
        return nil, err
    }
    
    // 刷新过期时间
    s.rdb.Expire(ctx, key, 24*time.Hour)
    
    return &session, nil
}

// DeleteSession 删除会话
func (s *SessionStore) DeleteSession(ctx context.Context, sessionID string) error {
    key := "session:" + sessionID
    return s.rdb.Del(ctx, key).Err()
}

var sessionStore *SessionStore

func sessionMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        cookie, err := r.Cookie("session_id")
        if err == nil {
            session, err := sessionStore.GetSession(r.Context(), cookie.Value)
            if err == nil && session != nil {
                // 将会话信息添加到 context
                ctx := context.WithValue(r.Context(), "session", session)
                r = r.WithContext(ctx)
            }
        }
        
        next.ServeHTTP(w, r)
    })
}

func loginHandler(w http.ResponseWriter, r *http.Request) {
    // 验证用户凭证(简化)
    userID := 123
    username := "alice"
    
    // 创建会话
    sessionID, err := sessionStore.CreateSession(r.Context(), userID, username)
    if err != nil {
        http.Error(w, "Failed to create session", http.StatusInternalServerError)
        return
    }
    
    // 设置 Cookie
    http.SetCookie(w, &http.Cookie{
        Name:     "session_id",
        Value:    sessionID,
        Path:     "/",
        HttpOnly: true,
        Secure:   true,
        MaxAge:   86400, // 24 hours
    })
    
    w.Write([]byte("Login successful"))
}

func profileHandler(w http.ResponseWriter, r *http.Request) {
    session := r.Context().Value("session").(*Session)
    if session == nil {
        http.Error(w, "Not authenticated", http.StatusUnauthorized)
        return
    }
    
    w.Write([]byte(fmt.Sprintf("Welcome, %s!", session.Username)))
}

func logoutHandler(w http.ResponseWriter, r *http.Request) {
    cookie, err := r.Cookie("session_id")
    if err == nil {
        sessionStore.DeleteSession(r.Context(), cookie.Value)
    }
    
    // 删除 Cookie
    http.SetCookie(w, &http.Cookie{
        Name:   "session_id",
        Value:  "",
        Path:   "/",
        MaxAge: -1,
    })
    
    w.Write([]byte("Logged out"))
}

func main() {
    rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    sessionStore = NewSessionStore(rdb)
    
    mux := http.NewServeMux()
    mux.HandleFunc("/login", loginHandler)
    mux.HandleFunc("/logout", logoutHandler)
    mux.Handle("/profile", sessionMiddleware(http.HandlerFunc(profileHandler)))
    
    http.ListenAndServe(":8080", mux)
}

3. 分布式锁

package main

import (
    "context"
    "fmt"
    "time"
    
    "github.com/redis/go-redis/v9"
)

type RedisLock struct {
    rdb *redis.Client
}

func NewRedisLock(rdb *redis.Client) *RedisLock {
    return &RedisLock{rdb: rdb}
}

// Lock 获取锁
func (l *RedisLock) Lock(ctx context.Context, key string, value string, expiration time.Duration) (bool, error) {
    return l.rdb.SetNX(ctx, key, value, expiration).Result()
}

// Unlock 释放锁
func (l *RedisLock) Unlock(ctx context.Context, key string, value string) error {
    // 使用 Lua 脚本确保只释放自己的锁
    script := redis.NewScript(`
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
    `)
    
    result, err := script.Run(ctx, l.rdb, []string{key}, value).Int()
    if err != nil {
        return err
    }
    
    if result == 0 {
        return fmt.Errorf("lock not held or already expired")
    }
    
    return nil
}

// 使用示例
func main() {
    rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    lock := NewRedisLock(rdb)
    ctx := context.Background()
    
    lockKey := "lock:order:123"
    lockValue := fmt.Sprintf("%d", time.Now().UnixNano())
    
    // 尝试获取锁
    acquired, err := lock.Lock(ctx, lockKey, lockValue, 10*time.Second)
    if err != nil {
        panic(err)
    }
    
    if !acquired {
        fmt.Println("Failed to acquire lock")
        return
    }
    
    defer func() {
        // 释放锁
        if err := lock.Unlock(ctx, lockKey, lockValue); err != nil {
            fmt.Println("Failed to release lock:", err)
        }
    }()
    
    // 执行受保护的操作
    fmt.Println("Processing order...")
    time.Sleep(5 * time.Second)
    fmt.Println("Order processed")
}

4. 消息队列

package main

import (
    "context"
    "fmt"
    "time"
    
    "github.com/redis/go-redis/v9"
)

type MessageQueue struct {
    rdb *redis.Client
}

func NewMessageQueue(rdb *redis.Client) *MessageQueue {
    return &MessageQueue{rdb: rdb}
}

// Publish 发布消息
func (q *MessageQueue) Publish(ctx context.Context, channel string, message string) error {
    return q.rdb.Publish(ctx, channel, message).Err()
}

// Subscribe 订阅消息
func (q *MessageQueue) Subscribe(ctx context.Context, channels ...string) *redis.PubSub {
    return q.rdb.Subscribe(ctx, channels...)
}

// Producer 生产者
func producer(rdb *redis.Client) {
    mq := NewMessageQueue(rdb)
    ctx := context.Background()
    
    for i := 0; i < 10; i++ {
        message := fmt.Sprintf("Message %d", i)
        err := mq.Publish(ctx, "notifications", message)
        if err != nil {
            fmt.Println("Publish error:", err)
            continue
        }
        fmt.Println("Published:", message)
        time.Sleep(1 * time.Second)
    }
}

// Consumer 消费者
func consumer(rdb *redis.Client) {
    mq := NewMessageQueue(rdb)
    ctx := context.Background()
    
    sub := mq.Subscribe(ctx, "notifications")
    defer sub.Close()
    
    ch := sub.Channel()
    
    for msg := range ch {
        fmt.Println("Received:", msg.Payload)
    }
}

func main() {
    rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    
    // 在不同的 goroutine 中运行生产者和消费者
    go producer(rdb)
    consumer(rdb)
}

连接池配置

rdb := redis.NewClient(&redis.Options{
    Addr:         "localhost:6379",
    Password:     "",
    DB:           0,
    PoolSize:     10,           // 连接池大小
    MinIdleConns: 5,            // 最小空闲连接数
    MaxIdleConns: 10,           // 最大空闲连接数
    PoolTimeout:  4 * time.Second, // 连接池超时
    IdleTimeout:  5 * time.Minute, // 空闲连接超时
})

总结

Redis 在 Go 应用中扮演着重要角色,主要应用场景包括:

  1. 缓存:减少数据库压力,提升响应速度
  2. 会话存储:支持分布式会话管理
  3. 消息队列:实现发布/订阅和任务队列
  4. 分布式锁:协调分布式系统中的并发操作
  5. 计数器:实时统计和限流
  6. 排行榜:有序集合实现排名系统

最佳实践:

  • 合理设置过期时间,避免内存泄漏
  • 使用连接池提高性能
  • 实现缓存穿透、雪崩、击穿的防护
  • 监控 Redis 内存使用情况
  • 使用 Pipeline 批量操作提升性能

记住:Redis 是内存数据库,要注意数据持久化和内存管理。

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页