WebSocket 实时通信:构建交互式应用

学习使用 WebSocket 实现实时双向通信,构建聊天室、实时通知等交互式应用

WebSocket 实时通信:构建交互式应用

传统的 HTTP 请求-响应模型无法满足实时应用的需求。想象一下聊天室、股票行情、在线游戏这些场景,客户端需要即时收到服务器的消息,而不是不断轮询。

WebSocket 提供了一种在单个 TCP 连接上进行全双工通信的协议,让服务器可以主动向客户端推送数据。

HTTP vs WebSocket

HTTP 的局限性

客户端 → 请求 → 服务器
客户端 ← 响应 ← 服务器
(每次通信都需要客户端主动发起)

WebSocket 的优势

客户端 ←→ 持久连接 ←→ 服务器
(双方都可以随时发送数据)

特点:

  • 持久连接,无需重复建立
  • 全双工通信,双向数据传输
  • 低延迟,无 HTTP 头部开销
  • 服务器可以主动推送

WebSocket 协议

WebSocket 连接通过 HTTP 升级建立:

客户端请求:
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

服务器响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

建立连接后,双方可以通过 WebSocket 帧发送数据。

gorilla/websocket 库

Go 标准库不包含 WebSocket 支持,我们使用 gorilla/websocket

go get github.com/gorilla/websocket

基础 Echo 服务器

package main

import (
    "log"
    "net/http"
    
    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true // 允许所有来源(生产环境应该限制)
    },
}

func echoHandler(w http.ResponseWriter, r *http.Request) {
    // 升级为 WebSocket 连接
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println("upgrade error:", err)
        return
    }
    defer conn.Close()
    
    log.Println("Client connected")
    
    // 循环读取和发送消息
    for {
        messageType, message, err := conn.ReadMessage()
        if err != nil {
            log.Println("read error:", err)
            break
        }
        
        log.Printf("Received: %s", message)
        
        // Echo 回客户端
        if err := conn.WriteMessage(messageType, message); err != nil {
            log.Println("write error:", err)
            break
        }
    }
}

func main() {
    http.HandleFunc("/ws", echoHandler)
    log.Println("Server starting on :8080")
    http.ListenAndServe(":8080", nil)
}

客户端代码

<!DOCTYPE html>
<html>
<head>
    <title>WebSocket Demo</title>
</head>
<body>
    <input type="text" id="message" placeholder="Enter message">
    <button onclick="sendMessage()">Send</button>
    <div id="output"></div>
    
    <script>
        const ws = new WebSocket('ws://localhost:8080/ws');
        
        ws.onopen = function() {
            console.log('Connected');
            appendMessage('Connected to server');
        };
        
        ws.onmessage = function(event) {
            appendMessage('Server: ' + event.data);
        };
        
        ws.onclose = function() {
            appendMessage('Disconnected');
        };
        
        function sendMessage() {
            const input = document.getElementById('message');
            const message = input.value;
            ws.send(message);
            appendMessage('You: ' + message);
            input.value = '';
        }
        
        function appendMessage(msg) {
            const output = document.getElementById('output');
            output.innerHTML += '<p>' + msg + '</p>';
        }
    </script>
</body>
</html>

消息类型

WebSocket 支持多种消息类型:

const (
    TextMessage   = 1  // 文本消息
    BinaryMessage = 2  // 二进制消息
    CloseMessage  = 8  // 关闭连接
    PingMessage   = 9  // 心跳检测
    PongMessage   = 10 // 心跳响应
)

// 发送文本
conn.WriteMessage(websocket.TextMessage, []byte("Hello"))

// 发送 JSON
data := map[string]string{"message": "Hello"}
jsonData, _ := json.Marshal(data)
conn.WriteMessage(websocket.TextMessage, jsonData)

// 发送二进制
conn.WriteMessage(websocket.BinaryMessage, imageData)

心跳检测

保持连接活跃,检测断开的连接:

const (
    writeWait      = 10 * time.Second
    pongWait       = 60 * time.Second
    pingPeriod     = (pongWait * 9) / 10
    maxMessageSize = 512
)

func (c *Client) readPump() {
    defer func() {
        c.hub.unregister <- c
        c.conn.Close()
    }()
    
    c.conn.SetReadLimit(maxMessageSize)
    c.conn.SetReadDeadline(time.Now().Add(pongWait))
    c.conn.SetPongHandler(func(string) error {
        c.conn.SetReadDeadline(time.Now().Add(pongWait))
        return nil
    })
    
    for {
        _, message, err := c.conn.ReadMessage()
        if err != nil {
            if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
                log.Printf("error: %v", err)
            }
            break
        }
        c.hub.broadcast <- message
    }
}

func (c *Client) writePump() {
    ticker := time.NewTicker(pingPeriod)
    defer func() {
        ticker.Stop()
        c.conn.Close()
    }()
    
    for {
        select {
        case message, ok := <-c.send:
            c.conn.SetWriteDeadline(time.Now().Add(writeWait))
            if !ok {
                c.conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }
            
            w, err := c.conn.NextWriter(websocket.TextMessage)
            if err != nil {
                return
            }
            w.Write(message)
            
            if err := w.Close(); err != nil {
                return
            }
        case <-ticker.C:
            c.conn.SetWriteDeadline(time.Now().Add(writeWait))
            if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

实战:多用户聊天室

package main

import (
    "log"
    "net/http"
    "sync"
    
    "github.com/gorilla/websocket"
)

// Client 表示一个聊天客户端
type Client struct {
    hub  *Hub
    conn *websocket.Conn
    send chan []byte
    name string
}

// Hub 维护所有活跃的客户端
type Hub struct {
    clients    map[*Client]bool
    broadcast  chan []byte
    register   chan *Client
    unregister chan *Client
    mu         sync.RWMutex
}

func newHub() *Hub {
    return &Hub{
        clients:    make(map[*Client]bool),
        broadcast:  make(chan []byte),
        register:   make(chan *Client),
        unregister: make(chan *Client),
    }
}

func (h *Hub) run() {
    for {
        select {
        case client := <-h.register:
            h.mu.Lock()
            h.clients[client] = true
            h.mu.Unlock()
            log.Printf("Client %s joined, total: %d", client.name, len(h.clients))
            
            // 广播加入消息
            msg := []byte(client.name + " joined the chat")
            h.broadcast <- msg
            
        case client := <-h.unregister:
            h.mu.Lock()
            if _, ok := h.clients[client]; ok {
                delete(h.clients, client)
                close(client.send)
            }
            h.mu.Unlock()
            log.Printf("Client %s left, total: %d", client.name, len(h.clients))
            
            // 广播离开消息
            msg := []byte(client.name + " left the chat")
            h.broadcast <- msg
            
        case message := <-h.broadcast:
            h.mu.RLock()
            for client := range h.clients {
                select {
                case client.send <- message:
                default:
                    close(client.send)
                    delete(h.clients, client)
                }
            }
            h.mu.RUnlock()
        }
    }
}

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println(err)
        return
    }
    
    name := r.URL.Query().Get("name")
    if name == "" {
        name = "Anonymous"
    }
    
    client := &Client{
        hub:  hub,
        conn: conn,
        send: make(chan []byte, 256),
        name: name,
    }
    
    client.hub.register <- client
    
    go client.writePump()
    go client.readPump()
}

func (c *Client) readPump() {
    defer func() {
        c.hub.unregister <- c
        c.conn.Close()
    }()
    
    c.conn.SetReadLimit(maxMessageSize)
    c.conn.SetReadDeadline(time.Now().Add(pongWait))
    c.conn.SetPongHandler(func(string) error {
        c.conn.SetReadDeadline(time.Now().Add(pongWait))
        return nil
    })
    
    for {
        _, message, err := c.conn.ReadMessage()
        if err != nil {
            if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
                log.Printf("error: %v", err)
            }
            break
        }
        
        // 添加用户名前缀
        fullMessage := []byte(c.name + ": " + string(message))
        c.hub.broadcast <- fullMessage
    }
}

func (c *Client) writePump() {
    ticker := time.NewTicker(pingPeriod)
    defer func() {
        ticker.Stop()
        c.conn.Close()
    }()
    
    for {
        select {
        case message, ok := <-c.send:
            c.conn.SetWriteDeadline(time.Now().Add(writeWait))
            if !ok {
                c.conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }
            
            w, err := c.conn.NextWriter(websocket.TextMessage)
            if err != nil {
                return
            }
            w.Write(message)
            
            if err := w.Close(); err != nil {
                return
            }
        case <-ticker.C:
            c.conn.SetWriteDeadline(time.Now().Add(writeWait))
            if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

func main() {
    hub := newHub()
    go hub.run()
    
    http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        serveWs(hub, w, r)
    })
    
    // 提供 HTML 页面
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        http.ServeFile(w, r, "chat.html")
    })
    
    log.Println("Chat server starting on :8080")
    http.ListenAndServe(":8080", nil)
}

聊天室前端

<!DOCTYPE html>
<html>
<head>
    <title>Chat Room</title>
    <style>
        #messages { height: 400px; overflow-y: scroll; border: 1px solid #ccc; padding: 10px; }
        .message { margin: 5px 0; }
        .system { color: #888; font-style: italic; }
    </style>
</head>
<body>
    <h1>Chat Room</h1>
    <div id="messages"></div>
    <input type="text" id="message" placeholder="Type your message...">
    <button onclick="sendMessage()">Send</button>
    
    <script>
        const name = prompt("Enter your name:") || "Anonymous";
        const ws = new WebSocket(`ws://localhost:8080/ws?name=${encodeURIComponent(name)}`);
        
        ws.onmessage = function(event) {
            const messages = document.getElementById('messages');
            const div = document.createElement('div');
            div.className = 'message';
            
            const text = event.data;
            if (text.includes('joined') || text.includes('left')) {
                div.className += ' system';
            }
            
            div.textContent = text;
            messages.appendChild(div);
            messages.scrollTop = messages.scrollHeight;
        };
        
        function sendMessage() {
            const input = document.getElementById('message');
            const message = input.value.trim();
            if (message) {
                ws.send(message);
                input.value = '';
            }
        }
        
        document.getElementById('message').addEventListener('keypress', function(e) {
            if (e.key === 'Enter') {
                sendMessage();
            }
        });
    </script>
</body>
</html>

房间功能

扩展支持多个聊天房间:

type Room struct {
    name    string
    clients map[*Client]bool
    hub     *Hub
}

type Hub struct {
    rooms      map[string]*Room
    register   chan *Client
    unregister chan *Client
    broadcast  chan *Message
}

type Message struct {
    room    string
    message []byte
}

func (h *Hub) run() {
    for {
        select {
        case client := <-h.register:
            room := h.getOrCreateRoom(client.room)
            room.clients[client] = true
            
        case client := <-h.unregister:
            if room, ok := h.rooms[client.room]; ok {
                delete(room.clients, client)
                close(client.send)
            }
            
        case msg := <-h.broadcast:
            if room, ok := h.rooms[msg.room]; ok {
                for client := range room.clients {
                    select {
                    case client.send <- msg.message:
                    default:
                        delete(room.clients, client)
                        close(client.send)
                    }
                }
            }
        }
    }
}

总结

WebSocket 为实时应用提供了强大的通信能力:

  1. 全双工通信:服务器和客户端都可以主动发送数据
  2. 低延迟:无需重复建立连接,无 HTTP 头部开销
  3. 持久连接:通过心跳保持连接活跃
  4. 消息类型:支持文本、二进制、控制帧等多种类型

常见应用场景:

  • 聊天室和即时通讯
  • 实时通知和推送
  • 在线游戏
  • 协作编辑
  • 实时数据展示(股票、天气等)

记住:WebSocket 虽然强大,但不是所有场景都需要。简单的轮询或 Server-Sent Events(SSE)可能更适合某些场景。

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页