An experimental pub/sub client and server project.
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

update unsubscribe test

Will 5292aad8 de312a62

+55 -1
+55 -1
pubsub/subscriber_test.go
··· 96 96 err = sub.UnsubscribeToTopics([]string{"topic a"}) 97 97 require.NoError(t, err) 98 98 99 - // TODO: is there a way to check? Maybe start consuming and publish to the topic unsubscribed from?? 99 + ctx, cancel := context.WithCancel(context.Background()) 100 + t.Cleanup(func() { 101 + cancel() 102 + }) 103 + 104 + consumer := sub.Consume(ctx) 105 + require.NoError(t, err) 106 + 107 + var receivedMessages []messagebroker.Message 108 + consumerFinCh := make(chan struct{}) 109 + go func() { 110 + for msg := range consumer.Msgs { 111 + receivedMessages = append(receivedMessages, msg) 112 + } 113 + 114 + require.NoError(t, err) 115 + consumerFinCh <- struct{}{} 116 + }() 117 + 118 + // publish a message to both topics and check the subscriber only gets the message from the 1 topic 119 + // and not the unsubscribed topic 120 + 121 + publisher, err := NewPublisher("localhost:3000") 122 + require.NoError(t, err) 123 + t.Cleanup(func() { 124 + publisher.Close() 125 + }) 126 + 127 + msg := messagebroker.Message{ 128 + Topic: "topic a", 129 + Data: []byte("hello world"), 130 + } 131 + 132 + err = publisher.PublishMessage(msg) 133 + require.NoError(t, err) 134 + 135 + msg.Topic = "topic b" 136 + err = publisher.PublishMessage(msg) 137 + require.NoError(t, err) 138 + 139 + cancel() 140 + 141 + // give the consumer some time to read the messages -- TODO: make better! 142 + time.Sleep(time.Millisecond * 500) 143 + cancel() 144 + 145 + select { 146 + case <-consumerFinCh: 147 + break 148 + case <-time.After(time.Second): 149 + t.Fatal("timed out waiting for consumer to read messages") 150 + } 151 + 152 + assert.Len(t, receivedMessages, 1) 153 + assert.Equal(t, "topic b", receivedMessages[0].Topic) 100 154 } 101 155 102 156 func TestPublishAndSubscribe(t *testing.T) {