package main import ( "encoding/json" "slices" "github.com/avast/retry-go/v4" maelstrom "github.com/jepsen-io/maelstrom/demo/go" ) type broadcastMessage struct { Type string `json:"type"` Message int `json:"message"` } func (s *server) handleBroadcast(msg maelstrom.Message) error { var body broadcastMessage if err := json.Unmarshal(msg.Body, &body); err != nil { return err } s.mu.Lock() defer s.mu.Unlock() if !slices.Contains(s.ids, body.Message) { s.ids = append(s.ids, body.Message) s.relayMessage(body, msg.Src) } resp := map[string]any{ "type": "broadcast_ok", } return s.node.Reply(msg, resp) } type readMessage struct { Type string `json:"type"` Messages []int `json:"messages"` } func (s *server) handleRead(msg maelstrom.Message) error { s.mu.Lock() defer s.mu.Unlock() resp := readMessage{ Type: "read_ok", Messages: s.ids, } return s.node.Reply(msg, resp) } func (s *server) relayMessage(msg broadcastMessage, src string) { resp := map[string]any{ "type": "broadcast", "message": msg.Message, } nodes := s.neighbours.getNeighbours() for _, node := range nodes { if src == node || node == s.node.ID() { continue } retry.Do(func() error { return s.node.RPC(node, resp, func(msg maelstrom.Message) error { return nil }) }) } } type topology struct { Topology map[string][]string `json:"topology"` } func (s *server) handleTopology(msg maelstrom.Message) error { var topology topology if err := json.Unmarshal(msg.Body, &topology); err != nil { return err } s.neighbours.addNeighbours(topology.Topology[s.node.ID()]) resp := map[string]any{ "type": "topology_ok", } return s.node.Reply(msg, resp) }