An experimental pub/sub client and server project.
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor the message types to just 1

Will 3919cdab 1ae71b12

+15 -23
+4 -4
server/message_store.go
··· 7 7 8 8 type MemoryStore struct { 9 9 mu sync.Mutex 10 - msgs map[int]MessageToSend 10 + msgs map[int]message 11 11 offset int 12 12 } 13 13 14 14 func NewMemoryStore() *MemoryStore { 15 15 return &MemoryStore{ 16 - msgs: make(map[int]MessageToSend), 16 + msgs: make(map[int]message), 17 17 } 18 18 } 19 19 20 - func (m *MemoryStore) Write(msg MessageToSend) error { 20 + func (m *MemoryStore) Write(msg message) error { 21 21 m.mu.Lock() 22 22 defer m.mu.Unlock() 23 23 ··· 28 28 return nil 29 29 } 30 30 31 - func (m *MemoryStore) ReadFrom(offset int, handleFunc func(msg MessageToSend)) error { 31 + func (m *MemoryStore) ReadFrom(offset int, handleFunc func(msg message)) error { 32 32 if offset < 0 || offset > m.offset { 33 33 return fmt.Errorf("invalid offset provided") 34 34 }
+5 -13
server/server.go
··· 302 302 _ = peer.RunConnOperation(op) 303 303 } 304 304 305 - type MessageToSend struct { 306 - topic string 307 - data []byte 308 - } 309 - 310 305 func (s *Server) handlePublish(peer *peer.Peer) { 311 306 slog.Info("handling publisher", "peer", peer.Addr()) 312 307 for { ··· 357 352 return nil 358 353 } 359 354 360 - message := MessageToSend{ 361 - topic: topicStr, 362 - data: dataBuf, 355 + topic := s.getTopic(topicStr) 356 + if topic == nil { 357 + topic = newTopic(topicStr) 358 + s.topics[topicStr] = topic 363 359 } 364 360 365 - topic := s.getTopic(message.topic) 366 - if topic == nil { 367 - topic = newTopic(message.topic) 368 - s.topics[message.topic] = topic 369 - } 361 + message := newMessage(dataBuf) 370 362 371 363 err = topic.sendMessageToSubscribers(message) 372 364 if err != nil {
+2 -2
server/subscriber.go
··· 44 44 offset := startAt 45 45 46 46 go func() { 47 - err := topic.messageStore.ReadFrom(offset, func(msg MessageToSend) { 48 - s.messages <- newMessage(msg.data) 47 + err := topic.messageStore.ReadFrom(offset, func(msg message) { 48 + s.messages <- msg 49 49 }) 50 50 if err != nil { 51 51 slog.Error("failed to replay messages from offset", "error", err, "offset", offset)
+4 -4
server/topic.go
··· 7 7 ) 8 8 9 9 type Store interface { 10 - Write(msg MessageToSend) error 11 - ReadFrom(offset int, handleFunc func(msg MessageToSend)) error 10 + Write(msg message) error 11 + ReadFrom(offset int, handleFunc func(msg message)) error 12 12 } 13 13 14 14 type topic struct { ··· 27 27 } 28 28 } 29 29 30 - func (t *topic) sendMessageToSubscribers(msg MessageToSend) error { 30 + func (t *topic) sendMessageToSubscribers(msg message) error { 31 31 err := t.messageStore.Write(msg) 32 32 if err != nil { 33 33 return fmt.Errorf("failed to write message to store: %w", err) ··· 38 38 t.mu.Unlock() 39 39 40 40 for _, subscriber := range subscribers { 41 - subscriber.addMessage(newMessage(msg.data), 0) 41 + subscriber.addMessage(msg, 0) 42 42 } 43 43 44 44 return nil