this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

start on relay test framework

+356
+147
cmd/relayered/testing/consumer.go
··· 1 + package testing 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "sync" 8 + "time" 9 + 10 + comatproto "github.com/bluesky-social/indigo/api/atproto" 11 + "github.com/bluesky-social/indigo/events" 12 + "github.com/bluesky-social/indigo/events/schedulers/sequential" 13 + 14 + "github.com/gorilla/websocket" 15 + ) 16 + 17 + // testing helper which receives a set of firehose events 18 + type Consumer struct { 19 + Host string 20 + Events []*events.XRPCStreamEvent 21 + LastSeq int64 22 + Timeout time.Duration 23 + eventsLk sync.Mutex 24 + cancel func() 25 + } 26 + 27 + func NewConsumer(host string) *Consumer { 28 + c := Consumer{ 29 + Host: host, 30 + Timeout: time.Second * 3, 31 + } 32 + return &c 33 + } 34 + 35 + func (c *Consumer) eventCallbacks() *events.RepoStreamCallbacks { 36 + rsc := &events.RepoStreamCallbacks{ 37 + RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 38 + c.eventsLk.Lock() 39 + defer c.eventsLk.Unlock() 40 + c.Events = append(c.Events, &events.XRPCStreamEvent{RepoCommit: evt}) 41 + c.LastSeq = evt.Seq 42 + return nil 43 + }, 44 + RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error { 45 + c.eventsLk.Lock() 46 + defer c.eventsLk.Unlock() 47 + c.Events = append(c.Events, &events.XRPCStreamEvent{RepoSync: evt}) 48 + c.LastSeq = evt.Seq 49 + return nil 50 + }, 51 + RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 52 + c.eventsLk.Lock() 53 + defer c.eventsLk.Unlock() 54 + c.Events = append(c.Events, &events.XRPCStreamEvent{RepoIdentity: evt}) 55 + c.LastSeq = evt.Seq 56 + return nil 57 + }, 58 + RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { 59 + c.eventsLk.Lock() 60 + defer c.eventsLk.Unlock() 61 + c.Events = append(c.Events, &events.XRPCStreamEvent{RepoAccount: evt}) 62 + c.LastSeq = evt.Seq 63 + return nil 64 + }, 65 + // NOTE: this is included to test that the events are *not* passed through; can be removed in the near future 66 + RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 67 + c.eventsLk.Lock() 68 + defer c.eventsLk.Unlock() 69 + c.Events = append(c.Events, &events.XRPCStreamEvent{RepoHandle: evt}) 70 + c.LastSeq = evt.Seq 71 + return nil 72 + }, 73 + } 74 + return rsc 75 + } 76 + 77 + func (c *Consumer) Connect(ctx context.Context, cursor int) error { 78 + 79 + u := c.Host + "/xrpc/com.atproto.sync.subscribeRepos" 80 + if cursor >= 0 { 81 + u = u + fmt.Sprintf("?cursor=%d", cursor) 82 + } 83 + 84 + dialer := websocket.Dialer{} 85 + conn, resp, err := dialer.Dial(u, nil) 86 + if err != nil { 87 + return err 88 + } 89 + 90 + if resp.StatusCode != 101 { 91 + return fmt.Errorf("expected HTTP 101 for websocket: %d", resp.StatusCode) 92 + } 93 + 94 + ctx, cancel := context.WithCancel(ctx) 95 + c.cancel = cancel 96 + 97 + go func() { 98 + <-ctx.Done() 99 + conn.Close() 100 + }() 101 + 102 + seqScheduler := sequential.NewScheduler("test", c.eventCallbacks().EventHandler) 103 + go func() { 104 + if err := events.HandleRepoStream(ctx, conn, seqScheduler, nil); err != nil { 105 + slog.Error("consumer failed processing event", "err", err) 106 + cancel() 107 + } 108 + }() 109 + time.Sleep(time.Millisecond * 2) // XXX: is this good? 110 + return nil 111 + } 112 + 113 + func (c *Consumer) Count() int { 114 + c.eventsLk.Lock() 115 + defer c.eventsLk.Unlock() 116 + return len(c.Events) 117 + } 118 + 119 + func (c *Consumer) Clear() { 120 + c.eventsLk.Lock() 121 + defer c.eventsLk.Unlock() 122 + c.Events = []*events.XRPCStreamEvent{} 123 + } 124 + 125 + func (c *Consumer) Shutdown() { 126 + if c.cancel != nil { 127 + c.cancel() 128 + } 129 + } 130 + 131 + // connects to host and consumes 'count' events, then returns them. will try up to 'c.Timeout', and error if not enough events are seen 132 + // 133 + // cursor: pass -1 to consume from current 134 + func (c *Consumer) ConsumeEvents(count int) ([]*events.XRPCStreamEvent, error) { 135 + // poll until we have enough events 136 + start := time.Now() 137 + for { 138 + if c.Count() >= count { 139 + break 140 + } 141 + if time.Since(start) > c.Timeout { 142 + return nil, fmt.Errorf("test stream consumer timeout: %s", c.Timeout) 143 + } 144 + time.Sleep(time.Millisecond * 5) 145 + } 146 + return c.Events, nil 147 + }
+47
cmd/relayered/testing/framework_test.go
··· 1 + package testing 2 + 3 + import ( 4 + "context" 5 + "testing" 6 + 7 + comatproto "github.com/bluesky-social/indigo/api/atproto" 8 + "github.com/bluesky-social/indigo/atproto/syntax" 9 + "github.com/bluesky-social/indigo/events" 10 + 11 + "github.com/stretchr/testify/assert" 12 + ) 13 + 14 + // meta test for the testing framework itself. simply connects the consumer to the producer 15 + func TestFramework(t *testing.T) { 16 + assert := assert.New(t) 17 + ctx := context.Background() // XXX 18 + 19 + p := NewProducer(":9900") 20 + p.Listen() 21 + defer p.Shutdown() 22 + 23 + c := NewConsumer("ws://localhost:9900") 24 + err := c.Connect(ctx, -1) 25 + if err != nil { 26 + t.Fatal(err) 27 + } 28 + defer c.Shutdown() 29 + 30 + h := "example.atbin.dev" 31 + e1 := events.XRPCStreamEvent{ 32 + RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{ 33 + Did: "did:web:example.atbin.dev", 34 + Handle: &h, 35 + Seq: 1234, 36 + Time: syntax.DatetimeNow().String(), 37 + }, 38 + } 39 + p.Emit(&e1) 40 + 41 + evts, err := c.ConsumeEvents(1) 42 + if err != nil { 43 + t.Fatal(err) 44 + } 45 + assert.Equal(1, len(evts)) 46 + assert.Equal(e1.RepoIdentity, evts[0].RepoIdentity) 47 + }
+162
cmd/relayered/testing/producer.go
··· 1 + package testing 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "net/http" 8 + "sync" 9 + 10 + "github.com/bluesky-social/indigo/events" 11 + 12 + "github.com/gorilla/websocket" 13 + ) 14 + 15 + // testing helper which outputs a sequence of events over a websocket 16 + type Producer struct { 17 + Bind string 18 + BufferSize int 19 + mux *http.ServeMux 20 + subs []*Subscriber 21 + subsLk sync.Mutex 22 + } 23 + 24 + type Subscriber struct { 25 + outgoing chan *events.XRPCStreamEvent 26 + done chan struct{} 27 + } 28 + 29 + func NewProducer(bind string) *Producer { 30 + mux := http.NewServeMux() 31 + p := Producer{ 32 + Bind: bind, 33 + BufferSize: 1024, 34 + mux: mux, 35 + } 36 + mux.HandleFunc("/xrpc/com.atproto.sync.subscribeRepos", p.handleSubscribeRepos) 37 + return &p 38 + } 39 + 40 + func (p *Producer) handleSubscribeRepos(resp http.ResponseWriter, req *http.Request) { 41 + slog.Info("XXX: subscribeRepos") 42 + 43 + ctx, cancel := context.WithCancel(req.Context()) 44 + defer cancel() 45 + 46 + conn, err := websocket.Upgrade(resp, req, nil, 1024, 1024) 47 + if err != nil { 48 + slog.Error("websocket upgrade", "err", err) 49 + return 50 + } 51 + 52 + // read messages from the client and discard them 53 + go func() { 54 + for { 55 + _, _, err := conn.ReadMessage() 56 + if err != nil { 57 + slog.Warn("failed to read message from client", "err", err) 58 + cancel() 59 + return 60 + } 61 + } 62 + }() 63 + 64 + evts, err := p.AddSubscriber(ctx) 65 + if err != nil { 66 + slog.Error("websocket new subscriber", "err", err) 67 + return 68 + } 69 + 70 + // pull events from channel and send over websocket 71 + for { 72 + select { 73 + case evt, ok := <-evts: 74 + if !ok { 75 + slog.Error("event stream closed unexpectedly") 76 + return 77 + } 78 + 79 + wc, err := conn.NextWriter(websocket.BinaryMessage) 80 + if err != nil { 81 + slog.Error("failed to get next writer", "err", err) 82 + return 83 + } 84 + 85 + if evt.Preserialized != nil { 86 + _, err = wc.Write(evt.Preserialized) 87 + } else { 88 + err = evt.Serialize(wc) 89 + } 90 + if err != nil { 91 + slog.Error("failed to write event", "err", err) 92 + return 93 + } 94 + 95 + if err := wc.Close(); err != nil { 96 + slog.Warn("failed to flush-close our event write", "err", err) 97 + return 98 + } 99 + case <-ctx.Done(): 100 + return 101 + } 102 + slog.Info("XXX: emitted event") 103 + } 104 + } 105 + 106 + func (p *Producer) Listen() { 107 + go func() { 108 + err := http.ListenAndServe(p.Bind, p.mux) 109 + if err != nil { 110 + slog.Error("test producer shutdown", "err", err) 111 + } 112 + }() 113 + } 114 + 115 + func (p *Producer) Shutdown() { 116 + p.subsLk.Lock() 117 + defer p.subsLk.Unlock() 118 + for _, sub := range p.subs { 119 + close(sub.done) 120 + close(sub.outgoing) 121 + } 122 + } 123 + 124 + func (p *Producer) AddSubscriber(ctx context.Context) (<-chan *events.XRPCStreamEvent, error) { 125 + 126 + slog.Info("XXX: adding subscriber") 127 + sub := &Subscriber{ 128 + outgoing: make(chan *events.XRPCStreamEvent, p.BufferSize), 129 + done: make(chan struct{}), 130 + } 131 + 132 + p.subsLk.Lock() 133 + defer p.subsLk.Unlock() 134 + p.subs = append(p.subs, sub) 135 + 136 + return sub.outgoing, nil 137 + } 138 + 139 + func (p *Producer) Emit(evt *events.XRPCStreamEvent) error { 140 + if err := evt.Preserialize(); err != nil { 141 + return err 142 + } 143 + 144 + p.subsLk.Lock() 145 + defer p.subsLk.Unlock() 146 + 147 + if len(p.subs) == 0 { 148 + slog.Warn("sending event, but no subscribers") 149 + } 150 + for _, s := range p.subs { 151 + slog.Info("XXX: outgoing") 152 + select { 153 + case s.outgoing <- evt: 154 + // sent evt on this subscriber's chan! yay! 155 + case <-s.done: 156 + // this subscriber is closing, quickly do nothing 157 + default: 158 + return fmt.Errorf("test firehose producer channel blocked") 159 + } 160 + } 161 + return nil 162 + }