引言
实时通信是现代应用的核心需求,从聊天系统到协作工具、股票行情到IoT设备监控,都需要服务端主动向客户端推送数据。本文将系统对比三大实时通信技术,并提供生产级实现方案。
技术对比
| 特性 | WebSocket | SSE | 长轮询 |
|---|---|---|---|
| 通信方式 | 双向 | 服务端→客户端 | 客户端请求 |
| 协议 | WS/WSS | HTTP | HTTP |
| 连接 | 持久 | 持久 | 周期性 |
| 浏览器支持 | 99%+ | 98%+ | 100% |
| 代理兼容 | 中等 | 好 | 好 |
| 自动重连 | 需手动 | 浏览器内置 | 不需要 |
| 适用场景 | 聊天、游戏 | 推送、通知 | 简单推送 |
WebSocket深入实现
连接管理
package realtime
import (
"context"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// 生产环境应该校验Origin
origin := r.Header.Get("Origin")
return allowedOrigins[origin]
},
}
type Client struct {
ID string
UserID string
conn *websocket.Conn
send chan []byte
hub *Hub
ctx context.Context
cancel context.CancelFunc
lastSeen time.Time
mu sync.Mutex
}
type Hub struct {
clients map[string]*Client // clientID -> client
userClients map[string]map[string]bool // userID -> set of clientIDs
rooms map[string]map[string]bool // roomID -> set of clientIDs
register chan *Client
unregister chan *Client
broadcast chan *BroadcastMessage
mu sync.RWMutex
}
type BroadcastMessage struct {
RoomID string
Data []byte
Exclude string // 排除的clientID
}
func NewHub() *Hub {
return &Hub{
clients: make(map[string]*Client),
userClients: make(map[string]map[string]bool),
rooms: make(map[string]map[string]bool),
register: make(chan *Client),
unregister: make(chan *Client),
broadcast: make(chan *BroadcastMessage, 256),
}
}
func (h *Hub) Run(ctx context.Context) {
for {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client.ID] = client
// 按用户索引
if _, ok := h.userClients[client.UserID]; !ok {
h.userClients[client.UserID] = make(map[string]bool)
}
h.userClients[client.UserID][client.ID] = true
h.mu.Unlock()
log.Infof("Client connected: %s (user: %s)", client.ID, client.UserID)
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client.ID]; ok {
delete(h.clients, client.ID)
delete(h.userClients[client.UserID], client.ID)
close(client.send)
}
h.mu.Unlock()
log.Infof("Client disconnected: %s", client.ID)
case msg := <-h.broadcast:
h.mu.RLock()
if clients, ok := h.rooms[msg.RoomID]; ok {
for clientID := range clients {
if clientID == msg.Exclude {
continue
}
if client, ok := h.clients[clientID]; ok {
select {
case client.send <- msg.Data:
default:
// 发送队列满,断开连接
close(client.send)
delete(h.clients, clientID)
}
}
}
}
h.mu.RUnlock()
case <-ctx.Done():
return
}
}
}
心跳保活
const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10 // 54秒
maxMessageSize = 512
)
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.mu.Lock()
c.lastSeen = time.Now()
c.mu.Unlock()
return nil
})
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Warnf("WebSocket error: %v", err)
}
return
}
// 处理消息
c.handleMessage(message)
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
case <-c.ctx.Done():
return
}
}
}
房间订阅
func (c *Client) handleMessage(message []byte) {
var msg struct {
Type string `json:"type"`
RoomID string `json:"room_id,omitempty"`
Payload json.RawMessage `json:"payload,omitempty"`
}
if err := json.Unmarshal(message, &msg); err != nil {
c.sendError("invalid message format")
return
}
switch msg.Type {
case "join_room":
c.hub.mu.Lock()
if _, ok := c.hub.rooms[msg.RoomID]; !ok {
c.hub.rooms[msg.RoomID] = make(map[string]bool)
}
c.hub.rooms[msg.RoomID][c.ID] = true
c.hub.mu.Unlock()
c.send <- []byte(`{"type":"room_joined","room_id":"` + msg.RoomID + `"}`)
case "leave_room":
c.hub.mu.Lock()
if clients, ok := c.hub.rooms[msg.RoomID]; ok {
delete(clients, c.ID)
}
c.hub.mu.Unlock()
case "message":
c.hub.broadcast <- &BroadcastMessage{
RoomID: msg.RoomID,
Data: message,
Exclude: c.ID, // 不回传给自己
}
}
}
SSE(Server-Sent Events)
Go实现
type SSEBroker struct {
clients map[string]chan []byte
mu sync.RWMutex
}
func NewSSEBroker() *SSEBroker {
return &SSEBroker{
clients: make(map[string]chan []byte),
}
}
func (b *SSEBroker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
clientID := uuid.New().String()
messageChan := make(chan []byte, 10)
b.mu.Lock()
b.clients[clientID] = messageChan
b.mu.Unlock()
defer func() {
b.mu.Lock()
delete(b.clients, clientID)
close(messageChan)
b.mu.Unlock()
}()
// 监听客户端断开
ctx := r.Context()
for {
select {
case msg := <-messageChan:
fmt.Fprintf(w, "data: %s\n\n", msg)
flusher.Flush()
case <-ctx.Done():
return
}
}
}
func (b *SSEBroker) Broadcast(event string, data interface{}) {
b.mu.RLock()
defer b.mu.RUnlock()
json, _ := json.Marshal(map[string]interface{}{
"event": event,
"data": data,
"time": time.Now(),
})
for _, ch := range b.clients {
select {
case ch <- json:
default:
// 跳过慢客户端
}
}
}
客户端实现(自动重连)
class SSEClient {
constructor(url, options = {}) {
this.url = url;
this.retryCount = 0;
this.maxRetries = options.maxRetries || 5;
this.retryDelay = options.retryDelay || 1000;
this.handlers = new Map();
this.connect();
}
connect() {
this.eventSource = new EventSource(this.url);
this.eventSource.onopen = () => {
console.log('SSE connected');
this.retryCount = 0;
};
this.eventSource.onerror = (error) => {
console.error('SSE error:', error);
if (this.retryCount < this.maxRetries) {
this.retryCount++;
const delay = this.retryDelay * Math.pow(2, this.retryCount);
console.log(`Retrying in ${delay}ms (attempt ${this.retryCount})`);
setTimeout(() => this.connect(), delay);
} else {
console.error('Max retries reached');
this.eventSource.close();
}
};
// 绑定事件处理器
for (const [event, handler] of this.handlers) {
this.eventSource.addEventListener(event, handler);
}
}
on(event, handler) {
this.handlers.set(event, handler);
if (this.eventSource) {
this.eventSource.addEventListener(event, handler);
}
}
close() {
if (this.eventSource) {
this.eventSource.close();
}
}
}
// 使用示例
const client = new SSEClient('/api/events');
client.on('order.update', (event) => {
const data = JSON.parse(event.data);
console.log('Order updated:', data);
});
长轮询实现
type LongPollingBroker struct {
subscribers map[string]chan interface{}
mu sync.RWMutex
}
func (b *LongPollingBroker) Subscribe(clientID string) <-chan interface{} {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan interface{}, 10)
b.subscribers[clientID] = ch
return ch
}
func (b *LongPollingBroker) Unsubscribe(clientID string) {
b.mu.Lock()
defer b.mu.Unlock()
if ch, ok := b.subscribers[clientID]; ok {
close(ch)
delete(b.subscribers, clientID)
}
}
func (b *LongPollingBroker) Poll(ctx context.Context, clientID string, timeout time.Duration) (interface{}, error) {
b.mu.RLock()
ch, ok := b.subscribers[clientID]
b.mu.RUnlock()
if !ok {
return nil, errors.New("not subscribed")
}
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case msg := <-ch:
return msg, nil
case <-timer.C:
return nil, nil // 超时,返回空响应
case <-ctx.Done():
return nil, ctx.Err()
}
}
// HTTP处理器
func (h *LongPollingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
clientID := r.URL.Query().Get("client_id")
timeout := 30 * time.Second
msg, err := h.broker.Poll(r.Context(), clientID, timeout)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if msg == nil {
w.WriteHeader(http.StatusNoContent)
return
}
json.NewEncoder(w).Encode(msg)
}
扩展:分布式消息分发
type DistributedHub struct {
localHub *Hub
pubsub *redis.Client
nodeID string
}
func NewDistributedHub(hub *Hub, redis *redis.Client) *DistributedHub {
return &DistributedHub{
localHub: hub,
pubsub: redis,
nodeID: uuid.New().String(),
}
}
func (h *DistributedHub) Start(ctx context.Context) {
// 订阅Redis频道,接收其他节点的消息
sub := h.pubsub.Subscribe(ctx, "ws:broadcast")
ch := sub.Channel()
go func() {
for msg := range ch {
var payload struct {
SourceNode string
RoomID string
Data []byte
}
json.Unmarshal([]byte(msg.Payload), &payload)
// 避免回环
if payload.SourceNode == h.nodeID {
continue
}
// 分发给本地客户端
h.localHub.broadcast <- &BroadcastMessage{
RoomID: payload.RoomID,
Data: payload.Data,
}
}
}()
}
func (h *DistributedHub) Broadcast(roomID string, data []byte) {
// 分发给本地客户端
h.localHub.broadcast <- &BroadcastMessage{
RoomID: roomID,
Data: data,
}
// 发布到其他节点
payload, _ := json.Marshal(map[string]interface{}{
"source_node": h.nodeID,
"room_id": roomID,
"data": data,
})
h.pubsub.Publish(context.Background(), "ws:broadcast", payload)
}
总结
实时通信技术选择:
- WebSocket:双向通信、聊天、多人协作、实时游戏
- SSE:单向推送、股票行情、通知、日志流
- 长轮询:兼容老旧浏览器、简单推送场景
生产环境关键考虑:
- 心跳保活防止连接泄漏
- 断线自动重连机制
- 消息确认和重传
- 分布式环境下的消息路由(Redis Pub/Sub)
- 连接数监控和限流
延伸阅读
- RFC 6455: WebSocket Protocol
- Server-Sent Events - MDN
- Socket.IO vs WebSocket
- Centrifugo - Real-time messaging server
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。