◆はじめに
揮発性のある軽量な通知機能をWebアプリに実装する必要があり、時系列でメッセージを記録可能で既読処理も単体で実現できるRedis Streamsが適当だと思い採用した。以下、その時行ったサーバー側実装を備忘録的にまとめる。
(※実際の名称などは変えています)
◆構成概要
通知発生
[フロントエンド] ----> [APIサーバー] ----> [Redis]
「ユーザーAが◯◯の△△を更新」などのアクションきっかけで、◯◯に関連のあるユーザーBに対して「更新されました」メッセージを作成しRedisに書き込む。
配信
[フロントエンド] <--(WebSocket)-- [APIサーバー] <---- [Redis]
APIサーバー上でRedisを監視するスレッドを起動させ、一定間隔で未送信メッセージを取得する。宛先のユーザーがWebSocketをopenしていた場合にマッチングし送信を行う。
◆使用コマンド詳細
実装上はクライアントライブラリを使用しているが、ここでは汎用的にCLIベースで記載する。
通知発生
・通知書き込み(XADD)
XADD {streamName} MINID ~ {minId} * {userId} {message}
【{streamName}】任意のストリーム名(固定)
【MINID ~ {minId}】自動採番のメッセージIDがUNIXタイムスタンプベースであるのを利用し、minIdに過去日時のIDを指定(e.g. 1720170000000-0)して、書き込むと同時に古いメッセージを自動削除
【* {userId} {message}】メッセージIDを自動採番(*)し、一意のユーザーID(userId)をキー、通知内容(message)を値として登録
・滞留コレクション書き込み(ZADD)
XADDと同時に行う。
ZADD {pendingCollectionName} {score} {messageId}
【{pendingCollectionName}】ユーザーID + 固定文字列をコレクション名とする
【{score}】コレクション内でソート基準となるスコア。メッセージIDから抽出したタイムスタンプを設定
【{messageId}】コレクションに追加するメッセージID
配信
・新着メッセージ取得(XREADGROUP)
XREADGROUP GROUP {groupName} {consumerName} COUNT {count} BLOCK {milliseconds} STREAMS {streamName} >
【{groupName}】任意のコンシューマグループ名(固定)
【{consumerName}】任意のコンシューマ名(ランダム生成)。起動しているサーバーインスタンス毎にユニーク
【COUNT {count}】取得するメッセージ数
【BLOCK {milliseconds}】ストリームにメッセージがまだない場合に待機する時間(ミリ秒)
【STREAMS {streamName} >】書き込み時のストリーム名。">"は、コンシューマが 新しいメッセージだけを読むように指定する記号
・滞留メッセージ取得(ZRANGE)
セッションが存在するユーザー毎の滞留メッセージIDを取得する。
ZRANGE {pendingCollectionName} 0 -1
【{pendingCollectionName}】ZADDで指定したコレクション名
【0 -1】コレクションの先頭(0)から末尾(-1)まで
・滞留メッセージ再割り当て(XCLAIM)
滞留しているメッセージを処理中のコンシューマで再配信可能にする。
XCLAIM {streamName} {groupName} {consumerName} {minIdleTimeMs} [{messageId} ...]
【{streamName}】書き込み時のストリーム名
【{groupName}】XREADGROUPと同一
【{consumerName}】XREADGROUPと同一
【{minIdleTimeMs}】対象とするメッセージの滞留時間(ミリ秒)
【[{messageId} ...]】ZRANGEで取得したメッセージID(複数)
配信済にする
・既読(XACK)
送信に成功した場合に使用。
XACK {streamName} {groupName} {messageId}
【{streamName}】書き込み時のストリーム名
【{groupName}】XREADGROUPと同一
【{messageId}】対象メッセージID
・滞留コレクションから削除(ZREM)
XACKと同時に行う。
ZREM {pendingCollectionName} {messageId}
【{pendingCollectionName}】ZADDで指定したコレクション名
【{messageId}】削除するメッセージID