this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add more verbose logging to search indexing operations (#333)

authored by

Jaz and committed by
GitHub
1f761b3c 0499e8a6

+47 -19
+39 -16
search/indexing.go
··· 18 18 ) 19 19 20 20 func (s *Server) deletePost(ctx context.Context, ident *identity.Identity, rkey string) error { 21 - s.logger.Info("deleting post from index", "repo", ident.DID, "rkey", rkey) 21 + log := s.logger.With("repo", ident.DID, "rkey", rkey, "op", "deletePost") 22 + log.Info("deleting post from index") 22 23 docID := fmt.Sprintf("%s_%s", ident.DID.String(), rkey) 23 24 req := esapi.DeleteRequest{ 24 25 Index: s.postIndex, ··· 31 32 return fmt.Errorf("failed to delete post: %w", err) 32 33 } 33 34 defer res.Body.Close() 34 - io.ReadAll(res.Body) 35 + body, err := io.ReadAll(res.Body) 36 + if err != nil { 37 + return fmt.Errorf("failed to read indexing response: %w", err) 38 + } 35 39 if res.IsError() { 36 - s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) 40 + log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body)) 37 41 return fmt.Errorf("indexing error, code=%d", res.StatusCode) 38 42 } 39 43 return nil 40 44 } 41 45 42 46 func (s *Server) indexPost(ctx context.Context, ident *identity.Identity, rec *appbsky.FeedPost, path string, rcid cid.Cid) error { 43 - 47 + log := s.logger.With("repo", ident.DID, "path", path, "op", "indexPost") 44 48 parts := strings.SplitN(path, "/", 3) 45 49 // TODO: replace with an atproto/syntax package type for TID 46 50 var tidRegex = regexp.MustCompile(`^[234567abcdefghijklmnopqrstuvwxyz]{13}$`) 47 51 if len(parts) != 2 || !tidRegex.MatchString(parts[1]) { 48 - s.logger.Warn("skipping index post record with weird path/TID", "did", ident.DID, "path", path) 52 + log.Warn("skipping index post record with weird path/TID", "did", ident.DID, "path", path) 49 53 return nil 50 54 } 51 55 rkey := parts[1] 52 56 57 + log = log.With("rkey", rkey) 58 + 53 59 _, err := util.ParseTimestamp(rec.CreatedAt) 54 60 if err != nil { 55 - s.logger.Warn("post had invalid timestamp", "repo", ident.DID, "rkey", rkey, "createdAt", rec.CreatedAt, "parseErr", err) 61 + log.Warn("post had invalid timestamp", "createdAt", rec.CreatedAt, "parseErr", err) 56 62 rec.CreatedAt = "" 57 63 } 58 64 ··· 62 68 return err 63 69 } 64 70 65 - s.logger.Debug("indexing post", "did", ident.DID, "rkey", rkey) 71 + log.Debug("indexing post") 66 72 req := esapi.IndexRequest{ 67 73 Index: s.postIndex, 68 74 DocumentID: doc.DocId(), ··· 71 77 72 78 res, err := req.Do(ctx, s.escli) 73 79 if err != nil { 80 + log.Warn("failed to send indexing request", "err", err) 74 81 return fmt.Errorf("failed to send indexing request: %w", err) 75 82 } 76 83 defer res.Body.Close() 77 - io.ReadAll(res.Body) 84 + body, err := io.ReadAll(res.Body) 85 + if err != nil { 86 + log.Warn("failed to read indexing response", "err", err) 87 + return fmt.Errorf("failed to read indexing response: %w", err) 88 + } 78 89 if res.IsError() { 79 - s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) 90 + log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body)) 80 91 return fmt.Errorf("indexing error, code=%d", res.StatusCode) 81 92 } 82 93 return nil 83 94 } 84 95 85 96 func (s *Server) indexProfile(ctx context.Context, ident *identity.Identity, rec *appbsky.ActorProfile, path string, rcid cid.Cid) error { 86 - 97 + log := s.logger.With("repo", ident.DID, "path", path, "op", "indexProfile") 87 98 parts := strings.SplitN(path, "/", 3) 88 99 if len(parts) != 2 || parts[1] != "self" { 89 - s.logger.Warn("skipping indexing non-canonical profile record", "did", ident.DID, "path", path) 100 + log.Warn("skipping indexing non-canonical profile record", "did", ident.DID, "path", path) 90 101 return nil 91 102 } 92 103 93 - s.logger.Info("indexing profile", "did", ident.DID, "handle", ident.Handle) 104 + log.Info("indexing profile", "handle", ident.Handle) 94 105 95 106 doc := TransformProfile(rec, ident, rcid.String()) 96 107 b, err := json.Marshal(doc) ··· 105 116 106 117 res, err := req.Do(ctx, s.escli) 107 118 if err != nil { 119 + log.Warn("failed to send indexing request", "err", err) 108 120 return fmt.Errorf("failed to send indexing request: %w", err) 109 121 } 110 122 defer res.Body.Close() 111 - io.ReadAll(res.Body) 123 + body, err := io.ReadAll(res.Body) 124 + if err != nil { 125 + log.Warn("failed to read indexing response", "err", err) 126 + return fmt.Errorf("failed to read indexing response: %w", err) 127 + } 112 128 if res.IsError() { 113 - s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) 129 + log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body)) 114 130 return fmt.Errorf("indexing error, code=%d", res.StatusCode) 115 131 } 116 132 return nil 117 133 } 118 134 119 135 func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error { 136 + log := s.logger.With("repo", did, "op", "updateUserHandle", "handle", handle) 120 137 b, err := json.Marshal(map[string]any{ 121 138 "script": map[string]any{ 122 139 "source": "ctx._source.handle = params.handle", ··· 127 144 }, 128 145 }) 129 146 if err != nil { 147 + log.Warn("failed to marshal update script", "err", err) 130 148 return err 131 149 } 132 150 ··· 138 156 139 157 res, err := req.Do(ctx, s.escli) 140 158 if err != nil { 159 + log.Warn("failed to send indexing request", "err", err) 141 160 return fmt.Errorf("failed to send indexing request: %w", err) 142 161 } 143 162 defer res.Body.Close() 144 - io.ReadAll(res.Body) 163 + body, err := io.ReadAll(res.Body) 164 + if err != nil { 165 + log.Warn("failed to read indexing response", "err", err) 166 + return fmt.Errorf("failed to read indexing response: %w", err) 167 + } 145 168 if res.IsError() { 146 - s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) 169 + log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body)) 147 170 return fmt.Errorf("indexing error, code=%d", res.StatusCode) 148 171 } 149 172 return nil
+8 -3
search/transform.go
··· 31 31 DID string `json:"did"` 32 32 RecordRkey string `json:"record_rkey"` 33 33 RecordCID string `json:"record_cid"` 34 - CreatedAt string `json:"created_at"` 34 + CreatedAt *string `json:"created_at,omitempty"` 35 35 Text string `json:"text"` 36 36 LangCode []string `json:"lang_code,omitempty"` 37 37 LangCodeIso2 []string `json:"lang_code_iso2,omitempty"` ··· 157 157 } 158 158 } 159 159 160 - return PostDoc{ 160 + doc := PostDoc{ 161 161 DocIndexTs: time.Now().UTC().Format(util.ISO8601), 162 162 DID: ident.DID.String(), 163 163 RecordRkey: rkey, 164 164 RecordCID: cid, 165 - CreatedAt: post.CreatedAt, 166 165 Text: post.Text, 167 166 LangCode: post.Langs, 168 167 LangCodeIso2: langCodeIso2, ··· 177 176 Hashtag: parseHashtags(post.Text), 178 177 Emoji: parseEmojis(post.Text), 179 178 } 179 + 180 + if post.CreatedAt != "" { 181 + doc.CreatedAt = &post.CreatedAt 182 + } 183 + 184 + return doc 180 185 } 181 186 182 187 func parseHashtags(s string) []string {