this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

:sparkles: subscribe and publish into topic

+62 -7
+62 -7
pubsub.go
··· 2 2 3 3 import ( 4 4 "encoding/json" 5 + "errors" 5 6 "fmt" 6 7 "net/http" 8 + "slices" 7 9 "time" 8 10 ) 9 11 12 + type SubClients struct { 13 + id string 14 + channel chan string 15 + } 16 + 10 17 type PubSub struct { 11 - channel chan string 18 + topics map[string][]*SubClients 12 19 } 13 20 14 21 func NewPubSub() *PubSub { 15 - return &PubSub{channel: make(chan string)} 22 + return &PubSub{topics: make(map[string][]*SubClients)} 23 + } 24 + 25 + func (p *PubSub) getOrCreateTopic(topicName, requestId string) *SubClients { 26 + clients, exists := p.topics[topicName] 27 + if exists == false { 28 + clients = []*SubClients{} 29 + p.topics[topicName] = clients 30 + } 31 + 32 + var currClient *SubClients 33 + for _, client := range clients { 34 + if client.id == requestId { 35 + currClient = client 36 + break 37 + } 38 + } 39 + 40 + if currClient == nil { 41 + currClient = &SubClients{id: requestId, channel: make(chan string)} 42 + p.topics[topicName] = append(p.topics[topicName], currClient) 43 + } 44 + 45 + return currClient 16 46 } 17 47 18 48 func (p *PubSub) HandleSubscribe(w http.ResponseWriter, r *http.Request) { 49 + topicName := r.URL.Query().Get("topic") 50 + clientInTopic := p.getOrCreateTopic(topicName, r.Context().Value("REQUEST_ID").(string)) 51 + 19 52 w.Header().Set("Content-Type", "text/event-stream") 20 53 w.Header().Set("Connection", "keep-alive") 21 54 w.Header().Set("Cache-Control", "no-cache") 22 55 23 - clientGone := r.Context().Done() 24 - 25 56 rc := http.NewResponseController(w) 26 57 for { 27 58 select { 28 - case <-clientGone: 59 + case <-r.Context().Done(): 29 60 fmt.Println("Client disconnected") 61 + 62 + var index int 63 + for i, client := range p.topics[topicName] { 64 + if client.id == clientInTopic.id { 65 + index = i 66 + break 67 + } 68 + } 69 + 70 + p.topics[topicName] = slices.Delete(p.topics[topicName], index, index+1) 71 + 72 + if len(p.topics[topicName]) == 0 { 73 + delete(p.topics, topicName) 74 + } 75 + 30 76 return 31 - case data := <-p.channel: 77 + case data := <-clientInTopic.channel: 32 78 _, err := fmt.Fprintf(w, "data: %s\n", data) 33 79 if err != nil { 34 80 return ··· 47 93 } 48 94 49 95 func (p *PubSub) HandlePublish(w http.ResponseWriter, r *http.Request) { 96 + topicName := r.URL.Query().Get("topic") 97 + topic, exists := p.topics[topicName] 98 + if !exists { 99 + http.Error(w, errors.New("'topic' is not registered").Error(), http.StatusBadRequest) 100 + return 101 + } 102 + 50 103 var input Input 51 104 if err := json.NewDecoder(r.Body).Decode(&input); err != nil { 52 105 http.Error(w, err.Error(), http.StatusBadRequest) 53 106 return 54 107 } 55 108 56 - p.channel <- fmt.Sprintf("%+v", input) 109 + for _, client := range topic { 110 + client.channel <- fmt.Sprintf("%+v", input) 111 + } 57 112 58 113 w.Header().Set("Content-Type", "application/json") 59 114 w.WriteHeader(http.StatusOK)