An experimental pub/sub client and server project.
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Unsubscribe

Will de312a62 731d8366

+68
+47
pubsub/subscriber.go
··· 82 82 return fmt.Errorf("received status %s - %s", resp, buf) 83 83 } 84 84 85 + // UnsubscribeToTopics will unsubscribe to the provided topics 86 + func (s *Subscriber) UnsubscribeToTopics(topicNames []string) error { 87 + err := binary.Write(s.conn, binary.BigEndian, server.Unsubscribe) 88 + if err != nil { 89 + return fmt.Errorf("failed to unsubscribe: %w", err) 90 + } 91 + 92 + b, err := json.Marshal(topicNames) 93 + if err != nil { 94 + return fmt.Errorf("failed to marshal topic names: %w", err) 95 + } 96 + 97 + err = binary.Write(s.conn, binary.BigEndian, uint32(len(b))) 98 + if err != nil { 99 + return fmt.Errorf("failed to write topic data length: %w", err) 100 + } 101 + 102 + _, err = s.conn.Write(b) 103 + if err != nil { 104 + return fmt.Errorf("failed to unsubscribe to topics: %w", err) 105 + } 106 + 107 + var resp server.Status 108 + err = binary.Read(s.conn, binary.BigEndian, &resp) 109 + if err != nil { 110 + return fmt.Errorf("failed to read confirmation of unsubscription: %w", err) 111 + } 112 + 113 + if resp == server.Unsubscribed { 114 + return nil 115 + } 116 + 117 + var dataLen uint32 118 + err = binary.Read(s.conn, binary.BigEndian, &dataLen) 119 + if err != nil { 120 + return fmt.Errorf("received status %s:", resp) 121 + } 122 + 123 + buf := make([]byte, dataLen) 124 + _, err = s.conn.Read(buf) 125 + if err != nil { 126 + return fmt.Errorf("received status %s:", resp) 127 + } 128 + 129 + return fmt.Errorf("received status %s - %s", resp, buf) 130 + } 131 + 85 132 // Consumer allows the consumption of messages. It is thread safe to range over the Msgs channel to consume. If during the consumer 86 133 // receiving messages from the server an error occurs, it will be stored in Err 87 134 type Consumer struct {
+21
pubsub/subscriber_test.go
··· 78 78 require.NoError(t, err) 79 79 } 80 80 81 + func TestUnsubscribesFromTopic(t *testing.T) { 82 + createServer(t) 83 + 84 + sub, err := NewSubscriber(serverAddr) 85 + require.NoError(t, err) 86 + 87 + t.Cleanup(func() { 88 + sub.Close() 89 + }) 90 + 91 + topics := []string{"topic a", "topic b"} 92 + 93 + err = sub.SubscribeToTopics(topics) 94 + require.NoError(t, err) 95 + 96 + err = sub.UnsubscribeToTopics([]string{"topic a"}) 97 + require.NoError(t, err) 98 + 99 + // TODO: is there a way to check? Maybe start consuming and publish to the topic unsubscribed from?? 100 + } 101 + 81 102 func TestPublishAndSubscribe(t *testing.T) { 82 103 createServer(t) 83 104