package main import ( "context" "encoding/json" "fmt" "log" "sync" "time" "github.com/redis/go-redis/v9" ) const ( redisChannel = "wicket:events" redisBufferKey = "wicket:buffer" ) type RedisBackend struct { client *redis.Client bufferSize int mu sync.Mutex closed bool } func NewRedisBackend(url string, bufferSize int) (*RedisBackend, error) { opts, err := redis.ParseURL(url) if err != nil { return nil, fmt.Errorf("parsing redis URL: %w", err) } client := redis.NewClient(opts) if err := client.Ping(context.Background()).Err(); err != nil { return nil, fmt.Errorf("connecting to redis: %w", err) } return &RedisBackend{ client: client, bufferSize: bufferSize, }, nil } func newRedisBackendFromClient(client *redis.Client, bufferSize int) *RedisBackend { return &RedisBackend{ client: client, bufferSize: bufferSize, } } func (r *RedisBackend) Publish(event *Event) error { data, err := json.Marshal(event) if err != nil { return fmt.Errorf("marshaling event: %w", err) } ctx := context.Background() pipe := r.client.Pipeline() pipe.LPush(ctx, redisBufferKey, data) pipe.LTrim(ctx, redisBufferKey, 0, int64(r.bufferSize-1)) pipe.Expire(ctx, redisBufferKey, 24*time.Hour) pipe.Publish(ctx, redisChannel, data) _, err = pipe.Exec(ctx) return err } func (r *RedisBackend) Subscribe(ctx context.Context) <-chan *Event { ch := make(chan *Event, 256) pubsub := r.client.Subscribe(ctx, redisChannel) go func() { defer close(ch) defer pubsub.Close() msgCh := pubsub.Channel() for { select { case <-ctx.Done(): return case msg := <-msgCh: if msg == nil { return } var event Event if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil { log.Printf("redis: unmarshal event: %v", err) continue } select { case ch <- &event: default: } } } }() return ch } func (r *RedisBackend) Since(lastEventID string, subscribePath string) []*Event { ctx := context.Background() vals, err := r.client.LRange(ctx, redisBufferKey, 0, -1).Result() if err != nil { log.Printf("redis: LRANGE: %v", err) return nil } // Redis list is newest-first (LPUSH), reverse to chronological order events := make([]*Event, 0, len(vals)) for i := len(vals) - 1; i >= 0; i-- { var e Event if err := json.Unmarshal([]byte(vals[i]), &e); err != nil { continue } events = append(events, &e) } found := false foundIdx := 0 for i, e := range events { if e.ID == lastEventID { found = true foundIdx = i + 1 break } } if !found { foundIdx = 0 } var result []*Event for i := foundIdx; i < len(events); i++ { if pathMatches(subscribePath, events[i].Path) { result = append(result, events[i]) } } return result } func (r *RedisBackend) Close() error { r.mu.Lock() defer r.mu.Unlock() if r.closed { return nil } r.closed = true return r.client.Close() }