An experimental pub/sub client and server project.
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Implement publisher code and refactor / comments

Will 5f39d281 4eaa21e8

+177 -34
+63
example/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + 8 + "github.com/willdot/messagebroker" 9 + "github.com/willdot/messagebroker/pubsub" 10 + "github.com/willdot/messagebroker/server" 11 + ) 12 + 13 + func main() { 14 + server, err := server.New(context.Background(), ":3000") 15 + if err != nil { 16 + panic(err) 17 + } 18 + defer server.Shutdown() 19 + 20 + go sendMessages() 21 + 22 + sub, err := pubsub.NewSubscriber(":3000") 23 + if err != nil { 24 + panic(err) 25 + } 26 + defer sub.Close() 27 + 28 + sub.SubscribeToTopics([]string{"topic a"}) 29 + 30 + consumer := sub.Consume(context.Background()) 31 + if consumer.Err != nil { 32 + panic(err) 33 + } 34 + 35 + for msg := range consumer.Msgs { 36 + slog.Info("received message", "message", string(msg.Data)) 37 + } 38 + 39 + } 40 + 41 + func sendMessages() { 42 + publisher, err := pubsub.NewPublisher("localhost:3000") 43 + if err != nil { 44 + panic(err) 45 + } 46 + defer publisher.Close() 47 + 48 + // send some messages 49 + i := 0 50 + for { 51 + i++ 52 + msg := messagebroker.Message{ 53 + Topic: "topic a", 54 + Data: []byte(fmt.Sprintf("message %d", i)), 55 + } 56 + 57 + err = publisher.PublishMessage(msg) 58 + if err != nil { 59 + slog.Error("failed to publish message", "error", err) 60 + continue 61 + } 62 + } 63 + }
+1
message.go
··· 1 1 package messagebroker 2 2 3 + // Message represents a message that can be published or consumed 3 4 type Message struct { 4 5 Topic string `json:"topic"` 5 6 Data []byte `json:"data"`
+59
pubsub/publisher.go
··· 1 + package pubsub 2 + 3 + import ( 4 + "encoding/binary" 5 + "encoding/json" 6 + "fmt" 7 + "net" 8 + 9 + "github.com/willdot/messagebroker" 10 + "github.com/willdot/messagebroker/server" 11 + ) 12 + 13 + // Publisher allows messages to be published to a server 14 + type Publisher struct { 15 + conn net.Conn 16 + } 17 + 18 + // NewPublisher connects to the server at the given address and registers as a publisher 19 + func NewPublisher(addr string) (*Publisher, error) { 20 + conn, err := net.Dial("tcp", addr) 21 + if err != nil { 22 + return nil, fmt.Errorf("failed to dial: %w", err) 23 + } 24 + 25 + err = binary.Write(conn, binary.BigEndian, server.Publish) 26 + if err != nil { 27 + conn.Close() 28 + return nil, fmt.Errorf("failed to register publish to server: %w", err) 29 + } 30 + 31 + return &Publisher{ 32 + conn: conn, 33 + }, nil 34 + } 35 + 36 + // Close cleanly shuts down the publisher 37 + func (p *Publisher) Close() error { 38 + return p.conn.Close() 39 + } 40 + 41 + // Publish will publish the given message to the server 42 + func (p *Publisher) PublishMessage(message messagebroker.Message) error { 43 + b, err := json.Marshal(message) 44 + if err != nil { 45 + return fmt.Errorf("failed to marshal message: %w", err) 46 + } 47 + 48 + err = binary.Write(p.conn, binary.BigEndian, uint32(len(b))) 49 + if err != nil { 50 + return fmt.Errorf("failed to write message size to server") 51 + } 52 + 53 + _, err = p.conn.Write(b) 54 + if err != nil { 55 + return fmt.Errorf("failed to publish data to server") 56 + } 57 + 58 + return nil 59 + }
+4 -1
server/server.go
··· 21 21 Publish Action = 3 22 22 ) 23 23 24 + // Server accepts subscribe and publish connections and passes messages around 24 25 type Server struct { 25 26 addr string 26 27 lis net.Listener ··· 29 30 topics map[string]topic 30 31 } 31 32 33 + // New creates and starts a new server 32 34 func New(ctx context.Context, addr string) (*Server, error) { 33 35 lis, err := net.Listen("tcp", addr) 34 36 if err != nil { ··· 45 47 return srv, nil 46 48 } 47 49 50 + // Shutdown will cleanly shutdown the server 48 51 func (s *Server) Shutdown() error { 49 52 return s.lis.Close() 50 53 } ··· 231 234 t = newTopic(topicName) 232 235 } 233 236 234 - t.subscriptions[peer.addr()] = Subscriber{ 237 + t.subscriptions[peer.addr()] = subscriber{ 235 238 peer: peer, 236 239 currentOffset: 0, 237 240 }
+1 -1
server/server_test.go
··· 29 29 srv := createServer(t) 30 30 srv.topics[topicName] = topic{ 31 31 name: topicName, 32 - subscriptions: make(map[net.Addr]Subscriber), 32 + subscriptions: make(map[net.Addr]subscriber), 33 33 } 34 34 35 35 return srv
+2 -2
server/subscriber.go
··· 5 5 "fmt" 6 6 ) 7 7 8 - type Subscriber struct { 8 + type subscriber struct { 9 9 peer peer 10 10 currentOffset int 11 11 } 12 12 13 - func (s *Subscriber) SendMessage(msg []byte) error { 13 + func (s *subscriber) sendMessage(msg []byte) error { 14 14 dataLen := uint64(len(msg)) 15 15 16 16 err := binary.Write(&s.peer, binary.BigEndian, dataLen)
+3 -3
server/topic.go
··· 11 11 12 12 type topic struct { 13 13 name string 14 - subscriptions map[net.Addr]Subscriber 14 + subscriptions map[net.Addr]subscriber 15 15 mu sync.Mutex 16 16 } 17 17 18 18 func newTopic(name string) topic { 19 19 return topic{ 20 20 name: name, 21 - subscriptions: make(map[net.Addr]Subscriber), 21 + subscriptions: make(map[net.Addr]subscriber), 22 22 } 23 23 } 24 24 ··· 41 41 } 42 42 43 43 for addr, subscriber := range subscribers { 44 - err := subscriber.SendMessage(msgData) 44 + err := subscriber.sendMessage(msgData) 45 45 if err != nil { 46 46 slog.Error("failed to send to message", "error", err, "peer", addr) 47 47 continue
+12 -4
subscriber/subscriber.go pubsub/subscriber.go
··· 1 - package subscriber 1 + package pubsub 2 2 3 3 import ( 4 4 "context" ··· 13 13 "github.com/willdot/messagebroker/server" 14 14 ) 15 15 16 + // Subscriber allows subscriptions to a server and the consumption of messages 16 17 type Subscriber struct { 17 18 conn net.Conn 18 19 } 19 20 20 - func New(addr string) (*Subscriber, error) { 21 + // NewSubscriber will connect to the server at the given address 22 + func NewSubscriber(addr string) (*Subscriber, error) { 21 23 conn, err := net.Dial("tcp", addr) 22 24 if err != nil { 23 25 return nil, fmt.Errorf("failed to dial: %w", err) ··· 28 30 }, nil 29 31 } 30 32 33 + // Close cleanly shuts down the subscriber 31 34 func (s *Subscriber) Close() error { 32 35 return s.conn.Close() 33 36 } 34 37 38 + // SubscribeToTopics will subscribe to the provided topics 35 39 func (s *Subscriber) SubscribeToTopics(topicNames []string) error { 36 40 err := binary.Write(s.conn, binary.BigEndian, server.Subscribe) 37 41 if err != nil { ··· 66 70 return nil 67 71 } 68 72 73 + // Consumer allows the consumption of messages. It is thread safe to range over the Msgs channel to consume. If during the consumer 74 + // receiving messages from the server an error occurs, it will be stored in Err 69 75 type Consumer struct { 70 76 Msgs chan messagebroker.Message 71 - Err error 77 + // TODO: better error handling? Maybe a channel of errors? 78 + Err error 72 79 } 73 80 74 - // TODO: maybe buffer the message channel up? 81 + // Consume will create a consumer and start it running in a go routine. You can then use the Msgs channel of the consumer 82 + // to read the messages 75 83 func (s *Subscriber) Consume(ctx context.Context) *Consumer { 76 84 consumer := &Consumer{ 77 85 Msgs: make(chan messagebroker.Message),
+32 -23
subscriber/subscriber_test.go pubsub/subscriber_test.go
··· 1 - package subscriber_test 1 + package pubsub 2 2 3 3 import ( 4 4 "context" 5 - "encoding/binary" 6 - "encoding/json" 7 5 "fmt" 8 - "net" 9 6 "testing" 10 7 "time" 11 8 12 9 "github.com/stretchr/testify/assert" 13 10 "github.com/stretchr/testify/require" 14 11 "github.com/willdot/messagebroker" 12 + 15 13 "github.com/willdot/messagebroker/server" 16 - "github.com/willdot/messagebroker/subscriber" 17 14 ) 18 15 19 16 const ( ··· 29 26 }) 30 27 } 31 28 32 - func TestNew(t *testing.T) { 29 + func TestNewSubscriber(t *testing.T) { 33 30 createServer(t) 34 31 35 - sub, err := subscriber.New(serverAddr) 32 + sub, err := NewSubscriber(serverAddr) 33 + require.NoError(t, err) 34 + 35 + t.Cleanup(func() { 36 + sub.Close() 37 + }) 38 + } 39 + 40 + func TestNewSubscriberInvalidServerAddr(t *testing.T) { 41 + createServer(t) 42 + 43 + _, err := NewSubscriber(":123456") 44 + require.Error(t, err) 45 + } 46 + 47 + func TestNewPublisher(t *testing.T) { 48 + createServer(t) 49 + 50 + sub, err := NewPublisher(serverAddr) 36 51 require.NoError(t, err) 37 52 38 53 t.Cleanup(func() { ··· 40 55 }) 41 56 } 42 57 43 - func TestNewInvalidServerAddr(t *testing.T) { 58 + func TestNewPublisherInvalidServerAddr(t *testing.T) { 44 59 createServer(t) 45 60 46 - _, err := subscriber.New(":123456") 61 + _, err := NewPublisher(":123456") 47 62 require.Error(t, err) 48 63 } 49 64 50 65 func TestSubscribeToTopics(t *testing.T) { 51 66 createServer(t) 52 67 53 - sub, err := subscriber.New(serverAddr) 68 + sub, err := NewSubscriber(serverAddr) 54 69 require.NoError(t, err) 55 70 56 71 t.Cleanup(func() { ··· 63 78 require.NoError(t, err) 64 79 } 65 80 66 - func TestSubscribeConsumeFromSubscription(t *testing.T) { 81 + func TestPublishAndSubscribe(t *testing.T) { 67 82 createServer(t) 68 83 69 - sub, err := subscriber.New(serverAddr) 84 + sub, err := NewSubscriber(serverAddr) 70 85 require.NoError(t, err) 71 86 72 87 t.Cleanup(func() { ··· 98 113 consumerFinCh <- struct{}{} 99 114 }() 100 115 101 - publisherConn, err := net.Dial("tcp", "localhost:3000") 116 + publisher, err := NewPublisher("localhost:3000") 102 117 require.NoError(t, err) 103 - 104 - err = binary.Write(publisherConn, binary.BigEndian, server.Publish) 105 - require.NoError(t, err) 118 + t.Cleanup(func() { 119 + publisher.Close() 120 + }) 106 121 107 122 // send some messages 108 123 sentMessages := make([]messagebroker.Message, 0, 10) ··· 114 129 115 130 sentMessages = append(sentMessages, msg) 116 131 117 - b, err := json.Marshal(msg) 118 - require.NoError(t, err) 119 - 120 - err = binary.Write(publisherConn, binary.BigEndian, uint32(len(b))) 121 - require.NoError(t, err) 122 - n, err := publisherConn.Write(b) 132 + err = publisher.PublishMessage(msg) 123 133 require.NoError(t, err) 124 - require.Equal(t, len(b), n) 125 134 } 126 135 127 136 // give the consumer some time to read the messages -- TODO: make better!