WebSocket API: Đẩy Số Click Xuống Dashboard Realtime
Bộ đếm click giờ chính xác, nhưng dashboard vẫn phải hỏi lại server mới biết số mới. Trải nghiệm "số nhảy ngay khi có người bấm link" cần chiều ngược lại: server đẩy dữ liệu xuống trình duyệt. HTTP request-response không làm được điều đó vì client phải hỏi trước. WebSocket thì giữ một kết nối hai chiều mở sẵn, và đó là thứ ta dựng trong bài này.
Mục tiêu
Dựng WebSocket API của API Gateway với route $connect và $disconnect, lưu mỗi kết nối vào DynamoDB gắn với link nó theo dõi, rồi cho aggregator đẩy số click mới xuống đúng các kết nối đang mở. Ta mở một kết nối thật, click link, và thấy số được đẩy về. WebSocket API tính tiền theo phút kết nối và số message, lượng test nhỏ không đáng kể.
WebSocket khác HTTP API ở đâu
Với HTTP API, mỗi request là một vòng độc lập: client hỏi, server đáp, kết nối đóng. Server không có cách nào chủ động gửi gì cho client sau đó. WebSocket lật mô hình: client mở một kết nối và nó mở liên tục, cả hai phía gửi message bất cứ lúc nào. API Gateway quản lý các kết nối đó và gọi Lambda khi có sự kiện kết nối.
WebSocket API có ba loại route. $connect chạy khi client mở kết nối, $disconnect chạy khi đóng, và các route tùy biến chạy theo nội dung message client gửi. Ta dùng $connect để nhớ ai đang theo dõi link nào, và $disconnect để quên đi khi họ rời. Việc đẩy dữ liệu xuống không phải là một route; nó là một lời gọi API ngược từ phía server, sẽ nói tới ở phần aggregator.
Nhớ kết nối trong DynamoDB
Khi một kết nối mở, API Gateway cấp cho nó một connectionId. Server muốn đẩy dữ liệu sau này thì phải nhớ id đó, và nhớ kết nối này đang theo dõi link nào. Handler $connect đọc code từ query string lúc kết nối và lưu vào single-table:
const connId = event.requestContext.connectionId;
const code = event.queryStringParameters?.code;
if (!code) return { statusCode: 400, body: "thieu code" };
await ddb.send(new PutCommand({
TableName: TABLE,
Item: {
PK: `CONN#${connId}`, SK: "META", code,
GSI1PK: `WSCODE#${code}`, GSI1SK: `CONN#${connId}`,
ttl: Math.floor(Date.now() / 1000) + 7200,
},
}));
Khóa chính CONN#<connId> cho phép $disconnect xóa kết nối khi chỉ biết connectionId. Còn GSI1PK = WSCODE#<code> tái dùng đúng index dựng ở bài 05, để aggregator tìm được mọi kết nối đang theo dõi một link bằng một query. TTL hai giờ là cái lưới an toàn: nếu vì lý do nào đó $disconnect không chạy, bản ghi kết nối vẫn tự hết hạn thay vì kẹt lại mãi.
Handler $disconnect chỉ cần xóa theo connectionId:
await ddb.send(new DeleteCommand({
TableName: TABLE,
Key: { PK: `CONN#${event.requestContext.connectionId}`, SK: "META" },
}));
Đẩy dữ liệu xuống: lời gọi ngược
Khi aggregator đếm xong một click, nó tìm các kết nối đang theo dõi link đó rồi gửi số mới xuống từng cái. Việc gửi này không qua WebSocket route mà qua một API quản lý kết nối riêng, ApiGatewayManagementApi, trỏ vào endpoint của chính WebSocket API:
const mgmt = new ApiGatewayManagementApiClient({ endpoint: process.env.WS_ENDPOINT });
export async function pushClickUpdate(code: string): Promise<void> {
const meta = await ddb.send(new GetCommand({ TableName: TABLE, Key: linkKey(code) }));
const clicks = (meta.Item?.clicks as number) ?? 0;
const conns = await ddb.send(new QueryCommand({
TableName: TABLE, IndexName: "GSI1",
KeyConditionExpression: "GSI1PK = :p",
ExpressionAttributeValues: { ":p": `WSCODE#${code}` },
}));
const data = Buffer.from(JSON.stringify({ code, clicks }));
await Promise.all((conns.Items ?? []).map(async (c) => {
const connId = (c.GSI1SK as string).replace("CONN#", "");
try {
await mgmt.send(new PostToConnectionCommand({ ConnectionId: connId, Data: data }));
} catch (e) {
const err = e as { name?: string };
if (err.name === "GoneException") {
await ddb.send(new DeleteCommand({ TableName: TABLE, Key: { PK: `CONN#${connId}`, SK: "META" } }));
}
}
}));
}
Một chi tiết thực tế quan trọng: kết nối có thể đã chết mà ta chưa kịp biết (trình duyệt đóng đột ngột, mạng rớt). Khi gửi tới một kết nối như vậy, API Gateway trả GoneException (HTTP 410). Ta bắt lỗi đó và xóa luôn bản ghi kết nối chết, để lần sau không gửi vào khoảng không. Aggregator gọi pushClickUpdate(code) ngay sau khi transaction đếm thành công.
GET /{code} ─▶ resolve ─PutEvents─▶ EventBridge ─▶ SQS ─▶ aggregator
│ 1. transaction dem
│ 2. query GSI1 WSCODE#<code>
│ 3. postToConnection tung connId
▼
trinh duyet ◀═══ WebSocket (mo san) ═══ WebSocketApi ◀──┘
(so click nhay) {code, clicks}
Quyền và endpoint
Aggregator cần hai thứ mới: địa chỉ để gửi ngược, và quyền gửi. Cả hai khai trong template:
Environment:
Variables:
WS_ENDPOINT: !Sub "https://${WebSocketApi}.execute-api.${AWS::Region}.amazonaws.com/prod"
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref Table
- Statement:
- Effect: Allow
Action: execute-api:ManageConnections
Resource: !Sub "arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${WebSocketApi}/*"
execute-api:ManageConnections là quyền cho phép PostToConnection. Endpoint dùng giao thức https, không phải wss, vì đây là lời gọi API quản lý chứ không phải mở một WebSocket khác.
Thử thật
Mở một kết nối WebSocket tới wss://.../prod?code=<code> bằng một client nhỏ, rồi click link ba lần, mỗi lần cách nhau vài giây. Client in ra mỗi message nhận được:
WS OPEN
WS RECV {"code":"XzVZoih","clicks":4}
WS RECV {"code":"XzVZoih","clicks":5}
WS RECV {"code":"XzVZoih","clicks":6}
Link này đã có sẵn 3 click từ trước nên số bắt đầu từ 4. Mỗi lần click link ở một nơi khác, số click mới được đẩy xuống kết nối đang mở gần như tức thì, không có request nào từ phía client hỏi. Đây là kiểu giao tiếp mà HTTP request-response không làm được: server chủ động gửi xuống. Một dashboard thật chỉ cần mở kết nối này lúc tải trang và cập nhật con số mỗi khi nhận message, không bao giờ phải poll.
🧹 Dọn dẹp
# xoa link + cac STAT + ban ghi ket noi con sot
aws dynamodb scan --table-name url-shortener --query 'Items[].{PK:PK.S,SK:SK.S}' --output text | \
while read pk sk; do
aws dynamodb delete-item --table-name url-shortener \
--key "{\"PK\":{\"S\":\"$pk\"},\"SK\":{\"S\":\"$sk\"}}"
done
aws cognito-idp admin-delete-user --user-pool-id "$POOL" --username ws@example.com
Giữ stack cho bài sau.
Tổng kết
Dashboard giờ realtime. WebSocket API giữ kết nối mở, $connect và $disconnect nhớ và quên kết nối trong DynamoDB qua chính single-table và GSI1, và aggregator đẩy số click mới xuống đúng các kết nối đang theo dõi link đó bằng PostToConnection. Kết nối chết bị dọn khi gặp GoneException. Toàn bộ chuỗi từ một cú click tới con số nhảy trên màn hình giờ chạy mà không ai phải hỏi lại server.
Phần lõi event-driven gần xong. Còn một dạng xử lý chưa đụng tới: những quy trình nhiều bước có thứ tự, chờ đợi, và rẽ nhánh, thứ mà nhồi hết vào một hàm Lambda sẽ rối và khó nhìn. Bài sau dùng Step Functions để điều phối một quy trình như vậy, lấy ví dụ kiểm duyệt link trước khi cho nó hoạt động, và bàn tới mẫu saga để hoàn tác khi một bước giữa chừng thất bại.