ELW株式会社 テックブログ

リアルなログをそのままお届けします。

Redis Streamsを利用した通知アーキテクチャ

◆はじめに

揮発性のある軽量な通知機能をWebアプリに実装する必要があり、時系列でメッセージを記録可能で既読処理も単体で実現できるRedis Streamsが適当だと思い採用した。以下、その時行ったサーバー側実装を備忘録的にまとめる。

(※実際の名称などは変えています)

◆構成概要

通知発生

[フロントエンド] ----> [APIサーバー] ----> [Redis]

「ユーザーAが◯◯の△△を更新」などのアクションきっかけで、◯◯に関連のあるユーザーBに対して「更新されました」メッセージを作成しRedisに書き込む。

配信

[フロントエンド] <--(WebSocket)-- [APIサーバー] <---- [Redis]

APIサーバー上でRedisを監視するスレッドを起動させ、一定間隔で未送信メッセージを取得する。宛先のユーザーがWebSocketをopenしていた場合にマッチングし送信を行う。

◆使用コマンド詳細

実装上はクライアントライブラリを使用しているが、ここでは汎用的にCLIベースで記載する。

通知発生

・通知書き込み(XADD)
XADD {streamName} MINID ~ {minId} * {userId} {message}

【{streamName}】任意のストリーム名(固定)

【MINID ~ {minId}】自動採番のメッセージIDがUNIXタイムスタンプベースであるのを利用し、minIdに過去日時のIDを指定(e.g. 1720170000000-0)して、書き込むと同時に古いメッセージを自動削除

【* {userId} {message}】メッセージIDを自動採番(*)し、一意のユーザーID(userId)をキー、通知内容(message)を値として登録

・滞留コレクション書き込み(ZADD)

XADDと同時に行う。

ZADD {pendingCollectionName} {score} {messageId}

【{pendingCollectionName}】ユーザーID + 固定文字列をコレクション名とする

【{score}】コレクション内でソート基準となるスコア。メッセージIDから抽出したタイムスタンプを設定

【{messageId}】コレクションに追加するメッセージID

配信

・新着メッセージ取得(XREADGROUP)
XREADGROUP GROUP {groupName} {consumerName} COUNT {count} BLOCK {milliseconds} STREAMS {streamName} >

【{groupName}】任意のコンシューマグループ名(固定)

【{consumerName}】任意のコンシューマ名(ランダム生成)。起動しているサーバーインスタンス毎にユニーク

【COUNT {count}】取得するメッセージ数

【BLOCK {milliseconds}】ストリームにメッセージがまだない場合に待機する時間(ミリ秒)

【STREAMS {streamName} >】書き込み時のストリーム名。">"は、コンシューマが 新しいメッセージだけを読むように指定する記号

・滞留メッセージ取得(ZRANGE)

セッションが存在するユーザー毎の滞留メッセージIDを取得する。

ZRANGE {pendingCollectionName} 0 -1

【{pendingCollectionName}】ZADDで指定したコレクション名

【0 -1】コレクションの先頭(0)から末尾(-1)まで

・滞留メッセージ再割り当て(XCLAIM)

滞留しているメッセージを処理中のコンシューマで再配信可能にする。

XCLAIM {streamName} {groupName} {consumerName} {minIdleTimeMs} [{messageId} ...]

【{streamName}】書き込み時のストリーム名

【{groupName}】XREADGROUPと同一

【{consumerName}】XREADGROUPと同一

【{minIdleTimeMs}】対象とするメッセージの滞留時間(ミリ秒)

【[{messageId} ...]】ZRANGEで取得したメッセージID(複数)

配信済にする

・既読(XACK)

送信に成功した場合に使用。

XACK {streamName} {groupName} {messageId}

【{streamName}】書き込み時のストリーム名

【{groupName}】XREADGROUPと同一

【{messageId}】対象メッセージID

・滞留コレクションから削除(ZREM)

XACKと同時に行う。

ZREM {pendingCollectionName} {messageId}

【{pendingCollectionName}】ZADDで指定したコレクション名

【{messageId}】削除するメッセージID