引言
GraphQL订阅为应用提供了实时数据推送能力,相比传统轮询方式更高效、更及时。本文将深入讲解GraphQL订阅的架构设计与实战实现。
GraphQL订阅核心概念
GraphQL订阅工作流:
┌─────────┐ ┌─────────┐
│ Client │ │ Server │
└────┬────┘ └────┬────┘
│ │
│ 1. WebSocket连接建立 │
│ ─────────────────────────▶ │
│ │
│ 2. 发送订阅请求 │
│ subscription { │
│ messageAdded { │
│ id content userId │
│ } │
│ } │
│ ─────────────────────────▶ │
│ │
│ 3. 服务器确认订阅 │
│ ◀───────────────────────── │
│ │
│ 4. 数据变更时推送 │
│ { data: { │
│ messageAdded: {...} │
│ }} │
│ ◀───────────────────────── │
│ │
│ 5. 继续推送... │
│ ◀───────────────────────── │
│ │
│ 6. 客户端取消订阅 │
│ ─────────────────────────▶ │
│ │
│ 7. 关闭连接 │
│ ─────────────────────────▶ │
Apollo Server实现
基础订阅设置
// schema.js
const { gql } = require('apollo-server');
const typeDefs = gql`
type Message {
id: ID!
content: String!
userId: ID!
createdAt: String!
}
type Query {
messages(channelId: ID!): [Message!]!
}
type Mutation {
addMessage(channelId: ID!, content: String!): Message!
}
type Subscription {
messageAdded(channelId: ID!): Message!
userTyping(channelId: ID!): User!
channelUpdated(channelId: ID!): Channel!
}
`;
module.exports = typeDefs;
// resolvers.js
const { PubSub } = require('graphql-subscriptions');
const pubsub = new PubSub();
const MESSAGE_ADDED = 'MESSAGE_ADDED';
const USER_TYPING = 'USER_TYPING';
const CHANNEL_UPDATED = 'CHANNEL_UPDATED';
const resolvers = {
Query: {
messages: async (_, { channelId }, { dataSources }) => {
return dataSources.messageAPI.getMessagesByChannel(channelId);
},
},
Mutation: {
addMessage: async (_, { channelId, content }, { user, dataSources }) => {
const message = await dataSources.messageAPI.createMessage({
channelId,
content,
userId: user.id,
});
// 发布事件
pubsub.publish(`${MESSAGE_ADDED}.${channelId}`, {
messageAdded: message,
});
return message;
},
},
Subscription: {
messageAdded: {
subscribe: (_, { channelId }, { user }) => {
// 权限检查
if (!user.hasAccessToChannel(channelId)) {
throw new Error('Unauthorized');
}
return pubsub.asyncIterator([`${MESSAGE_ADDED}.${channelId}`]);
},
},
userTyping: {
subscribe: (_, { channelId }) => {
return pubsub.asyncIterator([`${USER_TYPING}.${channelId}`]);
},
},
},
};
module.exports = resolvers;
服务器配置
// server.js
const { ApolloServer } = require('apollo-server-express');
const { createServer } = require('http');
const { execute, subscribe } = require('graphql');
const { SubscriptionServer } = require('subscriptions-transport-ws');
const express = require('express');
const typeDefs = require('./schema');
const resolvers = require('./resolvers');
async function startServer() {
const app = express();
const server = new ApolloServer({
typeDefs,
resolvers,
context: ({ req, connection }) => {
// HTTP请求上下文
if (req) {
return {
user: getUserFromToken(req.headers.authorization),
};
}
// WebSocket连接上下文
if (connection) {
return {
user: connection.context.user,
};
}
},
});
await server.start();
server.applyMiddleware({ app });
const httpServer = createServer(app);
// WebSocket服务器
SubscriptionServer.create(
{
schema: server.schema,
execute,
subscribe,
onConnect: (connectionParams, webSocket) => {
// 验证连接
if (connectionParams.authToken) {
const user = validateToken(connectionParams.authToken);
return { user };
}
throw new Error('Missing auth token');
},
onDisconnect: (webSocket, context) => {
console.log('Client disconnected');
},
},
{
server: httpServer,
path: server.graphqlPath,
}
);
const PORT = process.env.PORT || 4000;
httpServer.listen(PORT, () => {
console.log(`🚀 Server ready at http://localhost:${PORT}${server.graphqlPath}`);
console.log(`🚀 Subscriptions ready at ws://localhost:${PORT}${server.graphqlPath}`);
});
}
startServer();
Redis PubSub扩展
分布式PubSub
// redis-pubsub.js
const { RedisPubSub } = require('graphql-redis-subscriptions');
const Redis = require('ioredis');
const options = {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379,
retryStrategy: (times) => {
return Math.min(times * 50, 2000);
},
};
const pubsub = new RedisPubSub({
publisher: new Redis(options),
subscriber: new Redis(options),
});
module.exports = pubsub;
// 使用Redis PubSub
const pubsub = require('./redis-pubsub');
const MESSAGE_ADDED = 'MESSAGE_ADDED';
const resolvers = {
Subscription: {
messageAdded: {
subscribe: (_, { channelId }) => {
// 使用模式匹配订阅多个频道
return pubsub.asyncIterator([
`${MESSAGE_ADDED}.${channelId}`,
]);
},
resolve: (payload) => {
return payload.messageAdded;
},
},
},
};
Go实现
GraphQL订阅服务器
package main
import (
"context"
"log"
"net/http"
"sync"
"github.com/99designs/gqlgen/graphql/handler"
"github.com/99designs/gqlgen/graphql/handler/transport"
"github.com/99designs/gqlgen/graphql/playground"
"github.com/gorilla/websocket"
)
type PubSub struct {
mu sync.RWMutex
subscribers map[string][]chan interface{}
}
func NewPubSub() *PubSub {
return &PubSub{
subscribers: make(map[string][]chan interface{}),
}
}
func (ps *PubSub) Subscribe(topic string) chan interface{} {
ps.mu.Lock()
defer ps.mu.Unlock()
ch := make(chan interface{}, 10)
ps.subscribers[topic] = append(ps.subscribers[topic], ch)
return ch
}
func (ps *PubSub) Unsubscribe(topic string, ch chan interface{}) {
ps.mu.Lock()
defer ps.mu.Unlock()
subs := ps.subscribers[topic]
for i, sub := range subs {
if sub == ch {
ps.subscribers[topic] = append(subs[:i], subs[i+1:]...)
close(ch)
break
}
}
}
func (ps *PubSub) Publish(topic string, data interface{}) {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, ch := range ps.subscribers[topic] {
select {
case ch <- data:
default:
// 频道已满,跳过
}
}
}
// Resolver实现
type Resolver struct {
pubsub *PubSub
}
func (r *Resolver) MessageAdded(ctx context.Context, channelID string) (<-chan *Message, error) {
ch := r.pubsub.Subscribe("message_added." + channelID)
messageCh := make(chan *Message, 10)
go func() {
defer close(messageCh)
for {
select {
case <-ctx.Done():
r.pubsub.Unsubscribe("message_added."+channelID, ch)
return
case data := <-ch:
if msg, ok := data.(*Message); ok {
messageCh <- msg
}
}
}
}()
return messageCh, nil
}
func (r *Resolver) AddMessage(ctx context.Context, channelID, content string) (*Message, error) {
user := getUserFromContext(ctx)
msg := &Message{
ID: generateID(),
Content: content,
UserID: user.ID,
CreatedAt: time.Now(),
}
// 保存到数据库
if err := saveMessage(msg); err != nil {
return nil, err
}
// 发布事件
r.pubsub.Publish("message_added."+channelID, msg)
return msg, nil
}
// 服务器启动
func main() {
pubsub := NewPubSub()
resolver := &Resolver{pubsub: pubsub}
srv := handler.New(gqlgen.NewExecutableSchema(gqlgen.Config{
Resolvers: resolver,
}))
// 添加WebSocket传输
srv.AddTransport(&transport.Websocket{
Upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
},
InitFunc: func(ctx context.Context, initPayload *transport.InitPayload) (context.Context, error) {
// 验证连接
token := initPayload.Get("authToken")
user, err := validateToken(token)
if err != nil {
return ctx, err
}
return context.WithValue(ctx, "user", user), nil
},
})
srv.AddTransport(transport.POST{})
http.Handle("/", playground.Handler("GraphQL", "/query"))
http.Handle("/query", srv)
log.Println("Server running on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
客户端实现
React + Apollo Client
// client.js
import { ApolloClient, InMemoryCache, split, HttpLink } from '@apollo/client';
import { GraphQLWsLink } from '@apollo/client/link/subscriptions';
import { createClient } from 'graphql-ws';
import { getMainDefinition } from '@apollo/client/utilities';
const httpLink = new HttpLink({
uri: 'http://localhost:4000/graphql',
});
const wsLink = new GraphQLWsLink(
createClient({
url: 'ws://localhost:4000/graphql',
connectionParams: {
authToken: localStorage.getItem('token'),
},
})
);
const splitLink = split(
({ query }) => {
const definition = getMainDefinition(query);
return (
definition.kind === 'OperationDefinition' &&
definition.operation === 'subscription'
);
},
wsLink,
httpLink
);
const client = new ApolloClient({
link: splitLink,
cache: new InMemoryCache(),
});
// MessageList.jsx
import { useSubscription, useQuery, gql } from '@apollo/client';
const GET_MESSAGES = gql`
query GetMessages($channelId: ID!) {
messages(channelId: $channelId) {
id
content
userId
createdAt
}
}
`;
const MESSAGE_ADDED = gql`
subscription MessageAdded($channelId: ID!) {
messageAdded(channelId: $channelId) {
id
content
userId
createdAt
}
}
`;
function MessageList({ channelId }) {
const { data, loading } = useQuery(GET_MESSAGES, {
variables: { channelId },
});
useSubscription(MESSAGE_ADDED, {
variables: { channelId },
onData: ({ data: { messageAdded } }) => {
// 更新缓存
client.cache.modify({
fields: {
messages(existingMessages = []) {
const newMessageRef = client.cache.writeFragment({
data: messageAdded,
fragment: gql`
fragment NewMessage on Message {
id
content
userId
createdAt
}
`,
});
return [...existingMessages, newMessageRef];
},
},
});
},
});
if (loading) return <div>Loading...</div>;
return (
<div className="message-list">
{data.messages.map((message) => (
<div key={message.id} className="message">
<p>{message.content}</p>
<span>{message.createdAt}</span>
</div>
))}
</div>
);
}
高级特性
事件过滤
// 带过滤的订阅
const resolvers = {
Subscription: {
messageAdded: {
subscribe: withFilter(
(_, { channelId }) => pubsub.asyncIterator([MESSAGE_ADDED]),
(payload, variables) => {
// 只推送给特定频道的订阅者
return payload.messageAdded.channelId === variables.channelId;
}
),
},
// 多条件过滤
notificationReceived: {
subscribe: withFilter(
() => pubsub.asyncIterator([NOTIFICATION]),
(payload, variables, context) => {
const notification = payload.notificationReceived;
return (
notification.userId === context.user.id &&
notification.type === variables.type
);
}
),
},
},
};
订阅认证与授权
// 中间件认证
const authMiddleware = async (resolve, parent, args, context, info) => {
if (!context.user) {
throw new AuthenticationError('Must authenticate');
}
// 检查用户是否有权限订阅
const hasPermission = await checkPermission(
context.user,
info.fieldName,
args
);
if (!hasPermission) {
throw new ForbiddenError('Not authorized');
}
return resolve(parent, args, context, info);
};
// 应用中间件
const resolvers = {
Subscription: {
messageAdded: {
subscribe: authMiddleware,
},
},
};
总结
GraphQL订阅最佳实践
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 聊天应用 | WebSocket + Redis PubSub | 实时性高,支持多实例 |
| 实时通知 | 事件过滤 + 权限控制 | 精准推送,安全可控 |
| 协作编辑 | CRDT + 订阅 | 冲突解决,实时同步 |
| 数据仪表板 | 节流 + 批量更新 | 减少推送频率 |
关键原则
- 使用分布式PubSub:Redis/Kafka支持多实例部署
- 实现认证授权:WebSocket连接和订阅都需要验证
- 事件过滤:避免无关数据推送
- 连接管理:监控活跃连接数,防止资源耗尽
- 错误处理:优雅处理连接断开和重连
- 性能优化:节流、批量更新、连接复用
延伸阅读
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。