Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

appview/pulls: update handlers for new pull model

Signed-off-by: oppiliappan <me@oppi.li>

+689 -418
+8 -15
appview/middleware/middleware.go
··· 266 266 return 267 267 } 268 268 269 - pr, err := db.GetPull(mw.db, f.RepoAt(), prIdInt) 269 + pr, err := db.GetPull(mw.db, orm.FilterEq("repo_at", f.RepoAt()), orm.FilterEq("pull_id", prIdInt)) 270 270 if err != nil { 271 271 l.Error("failed to get pull and comments", "err", err) 272 272 mw.pages.Error404(w) ··· 275 275 276 276 ctx := context.WithValue(r.Context(), "pull", pr) 277 277 278 - if pr.IsStacked() { 279 - stack, err := db.GetStack(mw.db, pr.StackId) 280 - if err != nil { 281 - l.Error("failed to get stack", "err", err) 282 - return 283 - } 284 - abandonedPulls, err := db.GetAbandonedPulls(mw.db, pr.StackId) 285 - if err != nil { 286 - l.Error("failed to get abandoned pulls", "err", err) 287 - return 288 - } 278 + stack, err := db.GetStack(mw.db, pr.AtUri()) 279 + if err != nil { 280 + l.Error("failed to get stack", "err", err) 281 + mw.pages.Error404(w) 282 + return 283 + } 289 284 290 - ctx = context.WithValue(ctx, "stack", stack) 291 - ctx = context.WithValue(ctx, "abandonedPulls", abandonedPulls) 292 - } 285 + ctx = context.WithValue(ctx, "stack", stack) 293 286 294 287 next.ServeHTTP(w, r.WithContext(ctx)) 295 288 })
+4 -2
appview/models/pull.go
··· 162 162 var targetRepoAt syntax.ATURI 163 163 var targetBranch string 164 164 if record.Target != nil { 165 - if uri, err := syntax.ParseATURI(record.Target.Repo); err == nil { 166 - targetRepoAt = uri 165 + if record.Target.Repo != nil { 166 + if uri, err := syntax.ParseATURI(*record.Target.Repo); err == nil { 167 + targetRepoAt = uri 168 + } 167 169 } 168 170 targetBranch = record.Target.Branch 169 171 }
+2 -2
appview/notify/db/db.go
··· 285 285 l := log.FromContext(ctx) 286 286 287 287 pull, err := db.GetPull(n.db, 288 - syntax.ATURI(comment.RepoAt), 289 - comment.PullId, 288 + orm.FilterEq("repo_at", syntax.ATURI(comment.RepoAt)), 289 + orm.FilterEq("pull_id", comment.PullId), 290 290 ) 291 291 if err != nil { 292 292 l.Error("failed to get pulls", "err", err)
+514 -350
appview/pulls/pulls.go
··· 47 47 lexutil "github.com/bluesky-social/indigo/lex/util" 48 48 indigoxrpc "github.com/bluesky-social/indigo/xrpc" 49 49 "github.com/go-chi/chi/v5" 50 - "github.com/google/uuid" 51 50 ) 52 51 53 52 const ApplicationGzip = "application/gzip" ··· 101 100 102 101 // htmx fragment 103 102 func (s *Pulls) PullActions(w http.ResponseWriter, r *http.Request) { 103 + l := s.logger.With("handler", "PullActions") 104 + 104 105 switch r.Method { 105 106 case http.MethodGet: 106 107 user := s.oauth.GetMultiAccountUser(r) 108 + if user != nil && user.Active != nil { 109 + l = l.With("user", user.Active.Did) 110 + } 111 + 107 112 f, err := s.repoResolver.Resolve(r) 108 113 if err != nil { 109 - s.logger.Error("failed to get repo and knot", "err", err) 114 + l.Error("failed to get repo and knot", "err", err) 110 115 return 111 116 } 112 117 113 118 pull, ok := r.Context().Value("pull").(*models.Pull) 114 119 if !ok { 115 - s.logger.Error("failed to get pull") 120 + l.Error("failed to get pull") 116 121 s.pages.Notice(w, "pull-error", "Failed to edit patch. Try again later.") 117 122 return 118 123 } 124 + l = l.With("pull_id", pull.PullId, "pull_owner", pull.OwnerDid) 119 125 120 126 // can be nil if this pull is not stacked 121 127 stack, _ := r.Context().Value("stack").(models.Stack) ··· 127 133 } 128 134 if roundNumber >= len(pull.Submissions) { 129 135 http.Error(w, "bad round id", http.StatusBadRequest) 130 - s.logger.Error("failed to parse round id", "err", err) 136 + l.Error("failed to parse round id", "err", err, "round_number", roundNumber) 131 137 return 132 138 } 133 139 ··· 153 159 } 154 160 155 161 func (s *Pulls) repoPullHelper(w http.ResponseWriter, r *http.Request, interdiff bool) { 162 + l := s.logger.With("handler", "repoPullHelper", "interdiff", interdiff) 163 + 156 164 user := s.oauth.GetMultiAccountUser(r) 165 + if user != nil && user.Active != nil { 166 + l = l.With("user", user.Active.Did) 167 + } 168 + 157 169 f, err := s.repoResolver.Resolve(r) 158 170 if err != nil { 159 - s.logger.Error("failed to get repo and knot", "err", err) 171 + l.Error("failed to get repo and knot", "err", err) 160 172 return 161 173 } 162 174 163 175 pull, ok := r.Context().Value("pull").(*models.Pull) 164 176 if !ok { 165 - s.logger.Error("failed to get pull") 177 + l.Error("failed to get pull") 166 178 s.pages.Notice(w, "pull-error", "Failed to edit patch. Try again later.") 167 179 return 168 180 } 181 + l = l.With("pull_id", pull.PullId, "pull_owner", pull.OwnerDid) 169 182 170 183 backlinks, err := db.GetBacklinks(s.db, pull.AtUri()) 171 184 if err != nil { 172 - s.logger.Error("failed to get pull backlinks", "err", err) 185 + l.Error("failed to get pull backlinks", "err", err) 173 186 s.pages.Notice(w, "pull-error", "Failed to get pull. Try again later.") 174 187 return 175 188 } ··· 181 194 } 182 195 if roundIdInt >= len(pull.Submissions) { 183 196 http.Error(w, "bad round id", http.StatusBadRequest) 184 - s.logger.Error("failed to parse round id", "err", err) 197 + l.Error("failed to parse round id", "err", err, "round_number", roundIdInt) 185 198 return 186 199 } 187 200 ··· 192 205 193 206 // can be nil if this pull is not stacked 194 207 stack, _ := r.Context().Value("stack").(models.Stack) 195 - abandonedPulls, _ := r.Context().Value("abandonedPulls").([]*models.Pull) 196 208 197 209 mergeCheckResponse := s.mergeCheck(r, f, pull, stack) 198 210 branchDeleteStatus := s.branchDeleteStatus(r, f, pull) ··· 210 222 for _, p := range stack { 211 223 shas = append(shas, p.LatestSha()) 212 224 } 213 - for _, p := range abandonedPulls { 214 - shas = append(shas, p.LatestSha()) 215 - } 216 225 217 226 ps, err := db.GetPipelineStatuses( 218 227 s.db, ··· 223 232 orm.FilterIn("p.sha", shas), 224 233 ) 225 234 if err != nil { 226 - s.logger.Error("failed to fetch pipeline statuses", "err", err) 235 + l.Error("failed to fetch pipeline statuses", "err", err) 227 236 // non-fatal 228 237 } 229 238 ··· 233 242 234 243 reactionMap, err := db.GetReactionMap(s.db, 20, pull.AtUri()) 235 244 if err != nil { 236 - s.logger.Error("failed to get pull reactions", "err", err) 245 + l.Error("failed to get pull reactions", "err", err) 237 246 } 238 247 239 248 userReactions := map[models.ReactionKind]bool{} ··· 247 256 orm.FilterContains("scope", tangled.RepoPullNSID), 248 257 ) 249 258 if err != nil { 250 - s.logger.Error("failed to fetch labels", "err", err) 259 + l.Error("failed to fetch labels", "err", err) 251 260 s.pages.Error503(w) 252 261 return 253 262 } ··· 264 273 if interdiff { 265 274 currentPatch, err := patchutil.AsDiff(pull.Submissions[roundIdInt].CombinedPatch()) 266 275 if err != nil { 267 - s.logger.Error("failed to interdiff; current patch malformed", "err", err) 276 + l.Error("failed to interdiff; current patch malformed", "err", err, "round_number", roundIdInt) 268 277 s.pages.Notice(w, fmt.Sprintf("interdiff-error-%d", roundIdInt), "Failed to calculate interdiff; current patch is invalid.") 269 278 return 270 279 } 271 280 272 281 previousPatch, err := patchutil.AsDiff(pull.Submissions[roundIdInt-1].CombinedPatch()) 273 282 if err != nil { 274 - s.logger.Error("failed to interdiff; previous patch malformed", "err", err) 283 + l.Error("failed to interdiff; previous patch malformed", "err", err, "round_number", roundIdInt) 275 284 s.pages.Notice(w, fmt.Sprintf("interdiff-error-%d", roundIdInt), "Failed to calculate interdiff; previous patch is invalid.") 276 285 return 277 286 } ··· 284 293 RepoInfo: s.repoResolver.GetRepoInfo(r, user), 285 294 Pull: pull, 286 295 Stack: stack, 287 - AbandonedPulls: abandonedPulls, 288 296 Backlinks: backlinks, 289 297 BranchDeleteStatus: branchDeleteStatus, 290 298 MergeCheck: mergeCheckResponse, ··· 303 311 } 304 312 305 313 func (s *Pulls) RepoSinglePull(w http.ResponseWriter, r *http.Request) { 314 + l := s.logger.With("handler", "RepoSinglePull") 315 + 306 316 pull, ok := r.Context().Value("pull").(*models.Pull) 307 317 if !ok { 308 - s.logger.Error("failed to get pull") 318 + l.Error("failed to get pull") 309 319 s.pages.Notice(w, "pull-error", "Failed to edit patch. Try again later.") 310 320 return 311 321 } ··· 328 338 Host: host, 329 339 } 330 340 331 - patch := pull.LatestPatch() 332 - if pull.IsStacked() { 333 - // combine patches of substack 334 - subStack := stack.Below(pull) 335 - // collect the portion of the stack that is mergeable 336 - mergeable := subStack.Mergeable() 337 - // combine each patch 338 - patch = mergeable.CombinedPatch() 339 - } 341 + // combine patches of substack 342 + subStack := stack.Below(pull) 343 + // collect the portion of the stack that is mergeable 344 + mergeable := subStack.Mergeable() 345 + // combine each patch 346 + patch := mergeable.CombinedPatch() 340 347 341 348 resp, xe := tangled.RepoMergeCheck( 342 349 r.Context(), ··· 349 356 }, 350 357 ) 351 358 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 352 - s.logger.Error("failed to check for mergeability", "err", err) 359 + s.logger.Error("failed to check for mergeability", "err", err, "pull_id", pull.PullId, "target_branch", pull.TargetBranch) 353 360 return types.MergeCheckResponse{ 354 361 Error: fmt.Sprintf("failed to check merge status: %s", err.Error()), 355 362 } ··· 426 433 } 427 434 428 435 func (s *Pulls) resubmitCheck(r *http.Request, repo *models.Repo, pull *models.Pull, stack models.Stack) pages.ResubmitResult { 429 - if pull.State == models.PullMerged || pull.State == models.PullDeleted || pull.PullSource == nil { 436 + if pull.State == models.PullMerged || pull.State == models.PullAbandoned || pull.PullSource == nil { 430 437 return pages.Unknown 431 438 } 432 439 ··· 441 448 branchResp, err := tangled.GitTempGetBranch(r.Context(), xrpcc, pull.PullSource.Branch, sourceRepo.String()) 442 449 if err != nil { 443 450 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 444 - s.logger.Error("failed to call XRPC repo.branches", "err", xrpcerr) 451 + s.logger.Error("failed to call XRPC repo.branches", "err", xrpcerr, "pull_id", pull.PullId, "branch", pull.PullSource.Branch) 445 452 return pages.Unknown 446 453 } 447 - s.logger.Error("failed to reach knotserver", "err", err) 454 + s.logger.Error("failed to reach knotserver", "err", err, "pull_id", pull.PullId) 448 455 return pages.Unknown 449 456 } 450 457 451 458 targetBranch := branchResp 452 459 453 - latestSourceRev := pull.LatestSha() 454 - 455 - if pull.IsStacked() && stack != nil { 456 - top := stack[0] 457 - latestSourceRev = top.LatestSha() 458 - } 460 + top := stack[0] 461 + latestSourceRev := top.LatestSha() 459 462 460 463 if latestSourceRev != targetBranch.Hash { 461 464 return pages.ShouldResubmit ··· 473 476 } 474 477 475 478 func (s *Pulls) RepoPullPatchRaw(w http.ResponseWriter, r *http.Request) { 479 + l := s.logger.With("handler", "RepoPullPatchRaw") 480 + 476 481 pull, ok := r.Context().Value("pull").(*models.Pull) 477 482 if !ok { 478 - s.logger.Error("failed to get pull") 483 + l.Error("failed to get pull") 479 484 s.pages.Notice(w, "pull-error", "Failed to edit patch. Try again later.") 480 485 return 481 486 } 487 + l = l.With("pull_id", pull.PullId) 482 488 483 489 roundId := chi.URLParam(r, "round") 484 490 roundIdInt, err := strconv.Atoi(roundId) 485 491 if err != nil || roundIdInt >= len(pull.Submissions) { 486 492 http.Error(w, "bad round id", http.StatusBadRequest) 487 - s.logger.Error("failed to parse round id", "err", err) 493 + l.Error("failed to parse round id", "err", err, "round_id_str", roundId) 488 494 return 489 495 } 490 496 ··· 496 502 l := s.logger.With("handler", "RepoPulls") 497 503 498 504 user := s.oauth.GetMultiAccountUser(r) 505 + if user != nil && user.Active != nil { 506 + l = l.With("user", user.Active.Did) 507 + } 508 + 499 509 params := r.URL.Query() 500 510 page := pagination.FromContext(r.Context()) 501 511 502 512 f, err := s.repoResolver.Resolve(r) 503 513 if err != nil { 504 - s.logger.Error("failed to get repo and knot", "err", err) 514 + l.Error("failed to get repo and knot", "err", err) 505 515 return 506 516 } 517 + l = l.With("repo_at", f.RepoAt().String()) 507 518 508 519 query := searchquery.Parse(params.Get("q")) 509 520 ··· 618 629 countOpts := searchOpts 619 630 countOpts.Page = pagination.Page{Limit: 1} 620 631 for _, ps := range []models.PullState{models.PullOpen, models.PullMerged, models.PullClosed} { 621 - ps := ps 622 632 countOpts.State = &ps 623 633 countRes, err := s.indexer.Search(r.Context(), countOpts) 624 634 if err != nil { ··· 670 680 if p.PullSource.RepoAt != nil { 671 681 pullSourceRepo, err = db.GetRepoByAtUri(s.db, p.PullSource.RepoAt.String()) 672 682 if err != nil { 673 - s.logger.Error("failed to get repo by at uri", "err", err) 683 + l.Error("failed to get repo by at uri", "err", err, "repo_at", p.PullSource.RepoAt.String()) 674 684 continue 675 685 } else { 676 686 p.PullSource.Repo = pullSourceRepo ··· 679 689 } 680 690 } 681 691 682 - // we want to group all stacked PRs into just one list 683 - stacks := make(map[string]models.Stack) 692 + var stacks []models.Stack 684 693 var shas []string 685 - n := 0 694 + 695 + pullMap := make(map[string]*models.Pull) 686 696 for _, p := range pulls { 687 - // store the sha for later 688 697 shas = append(shas, p.LatestSha()) 689 - // this PR is stacked 690 - if p.StackId != "" { 691 - // we have already seen this PR stack 692 - if _, seen := stacks[p.StackId]; seen { 693 - stacks[p.StackId] = append(stacks[p.StackId], p) 694 - // skip this PR 698 + pullMap[p.AtUri().String()] = p 699 + } 700 + 701 + // track which PRs have been added to stacks 702 + visited := make(map[string]bool) 703 + 704 + // group stacked PRs together using dependent_on relationships 705 + for _, p := range pulls { 706 + if visited[p.AtUri().String()] { 707 + continue 708 + } 709 + 710 + root := p 711 + for root.DependentOn != nil { 712 + if parent, ok := pullMap[root.DependentOn.String()]; ok { 713 + root = parent 695 714 } else { 696 - stacks[p.StackId] = nil 697 - pulls[n] = p 698 - n++ 715 + break // parent not in current page 699 716 } 700 - } else { 701 - pulls[n] = p 702 - n++ 703 717 } 718 + 719 + var stack models.Stack 720 + current := root 721 + for { 722 + if visited[current.AtUri().String()] { 723 + break 724 + } 725 + stack = append(stack, current) 726 + visited[current.AtUri().String()] = true 727 + 728 + found := false 729 + for _, candidate := range pulls { 730 + if candidate.DependentOn != nil && 731 + candidate.DependentOn.String() == current.AtUri().String() { 732 + current = candidate 733 + found = true 734 + break 735 + } 736 + } 737 + if !found { 738 + break 739 + } 740 + } 741 + 742 + slices.Reverse(stack) 743 + stacks = append(stacks, stack) 704 744 } 705 - pulls = pulls[:n] 706 745 707 746 ps, err := db.GetPipelineStatuses( 708 747 s.db, ··· 713 752 orm.FilterIn("p.sha", shas), 714 753 ) 715 754 if err != nil { 716 - s.logger.Error("failed to fetch pipeline statuses", "err", err) 755 + l.Warn("failed to fetch pipeline statuses", "err", err) 717 756 // non-fatal 718 757 } 719 758 m := make(map[string]models.Pipeline) ··· 757 796 } 758 797 759 798 func (s *Pulls) PullComment(w http.ResponseWriter, r *http.Request) { 799 + l := s.logger.With("handler", "PullComment") 800 + 760 801 user := s.oauth.GetMultiAccountUser(r) 802 + if user != nil && user.Active != nil { 803 + l = l.With("user", user.Active.Did) 804 + } 805 + 761 806 f, err := s.repoResolver.Resolve(r) 762 807 if err != nil { 763 - s.logger.Error("failed to get repo and knot", "err", err) 808 + l.Error("failed to get repo and knot", "err", err) 764 809 return 765 810 } 766 811 767 812 pull, ok := r.Context().Value("pull").(*models.Pull) 768 813 if !ok { 769 - s.logger.Error("failed to get pull") 814 + l.Error("failed to get pull") 770 815 s.pages.Notice(w, "pull-error", "Failed to edit patch. Try again later.") 771 816 return 772 817 } 818 + l = l.With("pull_id", pull.PullId, "pull_owner", pull.OwnerDid) 773 819 774 820 roundNumberStr := chi.URLParam(r, "round") 775 821 roundNumber, err := strconv.Atoi(roundNumberStr) 776 822 if err != nil || roundNumber >= len(pull.Submissions) { 777 823 http.Error(w, "bad round id", http.StatusBadRequest) 778 - s.logger.Error("failed to parse round id", "err", err) 824 + l.Error("failed to parse round id", "err", err, "round_number_str", roundNumberStr) 779 825 return 780 826 } 781 827 ··· 800 846 // Start a transaction 801 847 tx, err := s.db.BeginTx(r.Context(), nil) 802 848 if err != nil { 803 - s.logger.Error("failed to start transaction", "err", err) 849 + l.Error("failed to start transaction", "err", err) 804 850 s.pages.Notice(w, "pull-comment", "Failed to create comment.") 805 851 return 806 852 } ··· 810 856 811 857 client, err := s.oauth.AuthorizedClient(r) 812 858 if err != nil { 813 - s.logger.Error("failed to get authorized client", "err", err) 859 + l.Error("failed to get authorized client", "err", err) 814 860 s.pages.Notice(w, "pull-comment", "Failed to create comment.") 815 861 return 816 862 } ··· 827 873 }, 828 874 }) 829 875 if err != nil { 830 - s.logger.Error("failed to create pull comment", "err", err) 876 + l.Error("failed to create pull comment", "err", err) 831 877 s.pages.Notice(w, "pull-comment", "Failed to create comment.") 832 878 return 833 879 } ··· 846 892 // Create the pull comment in the database with the commentAt field 847 893 commentId, err := db.NewPullComment(tx, comment) 848 894 if err != nil { 849 - s.logger.Error("failed to create pull comment", "err", err) 895 + l.Error("failed to create pull comment in database", "err", err) 850 896 s.pages.Notice(w, "pull-comment", "Failed to create comment.") 851 897 return 852 898 } 853 899 854 900 // Commit the transaction 855 901 if err = tx.Commit(); err != nil { 856 - s.logger.Error("failed to commit transaction", "err", err) 902 + l.Error("failed to commit transaction", "err", err) 857 903 s.pages.Notice(w, "pull-comment", "Failed to create comment.") 858 904 return 859 905 } ··· 867 913 } 868 914 869 915 func (s *Pulls) NewPull(w http.ResponseWriter, r *http.Request) { 916 + l := s.logger.With("handler", "NewPull") 917 + 870 918 user := s.oauth.GetMultiAccountUser(r) 919 + if user != nil && user.Active != nil { 920 + l = l.With("user", user.Active.Did) 921 + } 922 + 871 923 f, err := s.repoResolver.Resolve(r) 872 924 if err != nil { 873 - s.logger.Error("failed to get repo and knot", "err", err) 925 + l.Error("failed to get repo and knot", "err", err) 874 926 return 875 927 } 928 + l = l.With("repo_at", f.RepoAt().String()) 876 929 877 930 switch r.Method { 878 931 case http.MethodGet: ··· 881 934 xrpcBytes, err := tangled.GitTempListBranches(r.Context(), xrpcc, "", 0, f.RepoAt().String()) 882 935 if err != nil { 883 936 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 884 - s.logger.Error("failed to call XRPC repo.branches", "err", xrpcerr) 937 + l.Error("failed to call XRPC repo.branches", "err", xrpcerr) 885 938 s.pages.Error503(w) 886 939 return 887 940 } 888 - s.logger.Error("failed to fetch branches", "err", err) 941 + l.Error("failed to fetch branches", "err", err) 889 942 return 890 943 } 891 944 892 945 var result types.RepoBranchesResponse 893 946 if err := json.Unmarshal(xrpcBytes, &result); err != nil { 894 - s.logger.Error("failed to decode XRPC response", "err", err) 947 + l.Error("failed to decode XRPC response", "err", err) 895 948 s.pages.Error503(w) 896 949 return 897 950 } ··· 1034 1087 sourceBranch string, 1035 1088 isStacked bool, 1036 1089 ) { 1090 + l := s.logger.With("handler", "handleBranchBasedPull", "user", user.Active.Did, "target_branch", targetBranch, "source_branch", sourceBranch, "is_stacked", isStacked) 1091 + 1037 1092 scheme := "http" 1038 1093 if !s.config.Core.Dev { 1039 1094 scheme = "https" ··· 1046 1101 xrpcBytes, err := tangled.RepoCompare(r.Context(), xrpcc, repo.RepoIdentifier(), targetBranch, sourceBranch) 1047 1102 if err != nil { 1048 1103 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1049 - s.logger.Error("failed to call XRPC repo.compare", "err", xrpcerr) 1104 + l.Error("failed to call XRPC repo.compare", "err", xrpcerr) 1050 1105 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1051 1106 return 1052 1107 } 1053 - s.logger.Error("failed to compare", "err", err) 1108 + l.Error("failed to compare", "err", err) 1054 1109 s.pages.Notice(w, "pull", err.Error()) 1055 1110 return 1056 1111 } 1057 1112 1058 1113 var comparison types.RepoFormatPatchResponse 1059 1114 if err := json.Unmarshal(xrpcBytes, &comparison); err != nil { 1060 - s.logger.Error("failed to decode XRPC compare response", "err", err) 1115 + l.Error("failed to decode XRPC compare response", "err", err) 1061 1116 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1062 1117 return 1063 1118 } ··· 1077 1132 } 1078 1133 recordPullSource := &tangled.RepoPull_Source{ 1079 1134 Branch: sourceBranch, 1080 - Sha: comparison.Rev2, 1081 1135 } 1082 1136 1083 1137 s.createPullRequest(w, r, repo, user, title, body, targetBranch, patch, combined, sourceRev, pullSource, recordPullSource, isStacked) ··· 1094 1148 } 1095 1149 1096 1150 func (s *Pulls) handleForkBasedPull(w http.ResponseWriter, r *http.Request, repo *models.Repo, user *oauth.MultiAccountUser, forkRepo string, title, body, targetBranch, sourceBranch string, isStacked bool) { 1151 + l := s.logger.With("handler", "handleForkBasedPull", "user", user.Active.Did, "fork_repo", forkRepo, "target_branch", targetBranch, "source_branch", sourceBranch, "is_stacked", isStacked) 1152 + 1097 1153 repoString := strings.SplitN(forkRepo, "/", 2) 1098 1154 forkOwnerDid := repoString[0] 1099 1155 repoName := repoString[1] ··· 1102 1158 s.pages.Notice(w, "pull", "No such fork.") 1103 1159 return 1104 1160 } else if err != nil { 1105 - s.logger.Error("failed to fetch fork:", "err", err) 1161 + l.Error("failed to fetch fork", "err", err, "fork_owner_did", forkOwnerDid, "repo_name", repoName) 1106 1162 s.pages.Notice(w, "pull", "Failed to fetch fork.") 1107 1163 return 1108 1164 } ··· 1155 1211 forkXrpcBytes, err := tangled.RepoCompare(r.Context(), forkXrpcc, fork.RepoIdentifier(), hiddenRef, sourceBranch) 1156 1212 if err != nil { 1157 1213 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1158 - s.logger.Error("failed to call XRPC repo.compare for fork", "err", xrpcerr) 1214 + l.Error("failed to call XRPC repo.compare for fork", "err", xrpcerr, "hidden_ref", hiddenRef) 1159 1215 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1160 1216 return 1161 1217 } 1162 - s.logger.Error("failed to compare across branches", "err", err) 1218 + l.Error("failed to compare across branches", "err", err, "hidden_ref", hiddenRef) 1163 1219 s.pages.Notice(w, "pull", err.Error()) 1164 1220 return 1165 1221 } 1166 1222 1167 1223 var comparison types.RepoFormatPatchResponse 1168 1224 if err := json.Unmarshal(forkXrpcBytes, &comparison); err != nil { 1169 - s.logger.Error("failed to decode XRPC compare response for fork", "err", err) 1225 + l.Error("failed to decode XRPC compare response for fork", "err", err) 1170 1226 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1171 1227 return 1172 1228 } ··· 1191 1247 recordPullSource := &tangled.RepoPull_Source{ 1192 1248 Branch: sourceBranch, 1193 1249 Repo: &forkAtUriStr, 1194 - Sha: sourceRev, 1195 1250 } 1196 1251 if fork.RepoDid != "" { 1197 1252 recordPullSource.RepoDid = &fork.RepoDid ··· 1213 1268 recordPullSource *tangled.RepoPull_Source, 1214 1269 isStacked bool, 1215 1270 ) { 1271 + l := s.logger.With("handler", "createPullRequest", "user", user.Active.Did, "target_branch", targetBranch, "is_stacked", isStacked) 1272 + 1216 1273 if isStacked { 1217 1274 // creates a series of PRs, each linking to the previous, identified by jj's change-id 1218 1275 s.createStackedPullRequest( ··· 1230 1287 1231 1288 client, err := s.oauth.AuthorizedClient(r) 1232 1289 if err != nil { 1233 - s.logger.Error("failed to get authorized client", "err", err) 1290 + l.Error("failed to get authorized client", "err", err) 1234 1291 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1235 1292 return 1236 1293 } 1237 1294 1238 1295 tx, err := s.db.BeginTx(r.Context(), nil) 1239 1296 if err != nil { 1240 - s.logger.Error("failed to start tx", "err", err) 1297 + l.Error("failed to start tx", "err", err) 1241 1298 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1242 1299 return 1243 1300 } ··· 1267 1324 mentions, references := s.mentionsResolver.Resolve(r.Context(), body) 1268 1325 1269 1326 rkey := tid.TID() 1327 + 1328 + blob, err := xrpc.RepoUploadBlob(r.Context(), client, gz(patch), ApplicationGzip) 1329 + if err != nil { 1330 + l.Error("failed to upload patch", "err", err) 1331 + s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1332 + return 1333 + } 1334 + 1335 + record := tangled.RepoPull{ 1336 + Title: title, 1337 + Body: &body, 1338 + Target: repoPullTarget(repo, targetBranch), 1339 + Source: recordPullSource, 1340 + CreatedAt: time.Now().Format(time.RFC3339), 1341 + Rounds: []*tangled.RepoPull_Round{ 1342 + { 1343 + CreatedAt: time.Now().Format(time.RFC3339), 1344 + PatchBlob: blob.Blob, 1345 + }, 1346 + }, 1347 + } 1270 1348 initialSubmission := models.PullSubmission{ 1271 1349 Patch: patch, 1272 1350 Combined: combined, 1273 1351 SourceRev: sourceRev, 1352 + Blob: *blob.Blob, 1274 1353 } 1275 1354 pull := &models.Pull{ 1276 1355 Title: title, ··· 1285 1364 &initialSubmission, 1286 1365 }, 1287 1366 PullSource: pullSource, 1367 + State: models.PullOpen, 1288 1368 } 1289 - err = db.NewPull(tx, pull) 1369 + 1370 + _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 1371 + Collection: tangled.RepoPullNSID, 1372 + Repo: user.Active.Did, 1373 + Rkey: rkey, 1374 + Record: &lexutil.LexiconTypeDecoder{ 1375 + Val: &record, 1376 + }, 1377 + }) 1290 1378 if err != nil { 1291 - s.logger.Error("failed to create pull request", "err", err) 1292 - s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1293 - return 1294 - } 1295 - pullId, err := db.NextPullId(tx, repo.RepoAt()) 1296 - if err != nil { 1297 - s.logger.Error("failed to get pull id", "err", err) 1379 + l.Error("failed to create pull request", "err", err) 1298 1380 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1299 1381 return 1300 1382 } 1301 1383 1302 - blob, err := xrpc.RepoUploadBlob(r.Context(), client, gz(patch), ApplicationGzip) 1384 + err = db.PutPull(tx, pull) 1303 1385 if err != nil { 1304 - s.logger.Error("failed to upload patch", "err", err) 1386 + l.Error("failed to create pull request in database", "err", err) 1305 1387 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1306 1388 return 1307 1389 } 1308 - 1309 - _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 1310 - Collection: tangled.RepoPullNSID, 1311 - Repo: user.Active.Did, 1312 - Rkey: rkey, 1313 - Record: &lexutil.LexiconTypeDecoder{ 1314 - Val: &tangled.RepoPull{ 1315 - Title: title, 1316 - Target: repoPullTarget(repo, targetBranch), 1317 - PatchBlob: blob.Blob, 1318 - Source: recordPullSource, 1319 - CreatedAt: time.Now().Format(time.RFC3339), 1320 - }, 1321 - }, 1322 - }) 1390 + pullId, err := db.NextPullId(tx, repo.RepoAt()) 1323 1391 if err != nil { 1324 - s.logger.Error("failed to create pull request", "err", err) 1392 + s.logger.Error("failed to get pull id", "err", err) 1325 1393 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1326 1394 return 1327 1395 } 1328 1396 1329 1397 if err = tx.Commit(); err != nil { 1330 - s.logger.Error("failed to create pull request", "err", err) 1398 + l.Error("failed to commit transaction for pull request", "err", err) 1331 1399 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1332 1400 return 1333 1401 } ··· 1348 1416 sourceRev string, 1349 1417 pullSource *models.PullSource, 1350 1418 ) { 1419 + l := s.logger.With("handler", "createStackedPullRequest", "user", user.Active.Did, "target_branch", targetBranch, "source_rev", sourceRev) 1420 + 1351 1421 // run some necessary checks for stacked-prs first 1352 1422 1353 1423 // must be branch or fork based 1354 1424 if sourceRev == "" { 1355 - s.logger.Warn("stacked PR from patch-based pull") 1425 + l.Error("stacked PR from patch-based pull") 1356 1426 s.pages.Notice(w, "pull", "Stacking is only supported on branch and fork based pull-requests.") 1357 1427 return 1358 1428 } 1359 1429 1360 1430 formatPatches, err := patchutil.ExtractPatches(patch) 1361 1431 if err != nil { 1362 - s.logger.Error("failed to extract patches", "err", err) 1432 + l.Error("failed to extract patches", "err", err) 1363 1433 s.pages.Notice(w, "pull", fmt.Sprintf("Failed to extract patches: %v", err)) 1364 1434 return 1365 1435 } 1366 1436 1367 1437 // must have atleast 1 patch to begin with 1368 1438 if len(formatPatches) == 0 { 1369 - s.logger.Error("empty patches") 1439 + l.Error("empty patches") 1370 1440 s.pages.Notice(w, "pull", "No patches found in the generated format-patch.") 1371 1441 return 1372 1442 } 1373 1443 1374 - // build a stack out of this patch 1375 - stackId := uuid.New() 1376 - stack, err := s.newStack(r.Context(), repo, user, targetBranch, patch, pullSource, stackId.String()) 1377 - if err != nil { 1378 - s.logger.Error("failed to create stack", "err", err) 1379 - s.pages.Notice(w, "pull", fmt.Sprintf("Failed to create stack: %v", err)) 1380 - return 1381 - } 1382 - 1383 1444 client, err := s.oauth.AuthorizedClient(r) 1384 1445 if err != nil { 1385 - s.logger.Error("failed to get authorized client", "err", err) 1446 + l.Error("failed to get authorized client", "err", err) 1386 1447 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1387 1448 return 1388 1449 } 1389 1450 1390 - // apply all record creations at once 1391 - var writes []*comatproto.RepoApplyWrites_Input_Writes_Elem 1392 - for _, p := range stack { 1393 - blob, err := xrpc.RepoUploadBlob(r.Context(), client, gz(p.LatestPatch()), ApplicationGzip) 1451 + // first upload all blobs 1452 + blobs := make([]*lexutil.LexBlob, len(formatPatches)) 1453 + for i, p := range formatPatches { 1454 + blob, err := xrpc.RepoUploadBlob(r.Context(), client, gz(p.Raw), ApplicationGzip) 1394 1455 if err != nil { 1395 - s.logger.Error("failed to upload patch blob", "err", err) 1456 + l.Error("failed to upload patch blob", "err", err, "patch_index", i) 1396 1457 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1397 1458 return 1398 1459 } 1460 + l.Info("uploaded blob", "idx", i+1, "total", len(formatPatches)) 1461 + blobs[i] = blob.Blob 1462 + } 1399 1463 1464 + // build a stack out of this patch 1465 + stack, err := s.newStack(r.Context(), repo, user, targetBranch, pullSource, formatPatches, blobs) 1466 + if err != nil { 1467 + l.Error("failed to create stack", "err", err) 1468 + s.pages.Notice(w, "pull", fmt.Sprintf("Failed to create stack: %v", err)) 1469 + return 1470 + } 1471 + 1472 + // apply all record creations at once 1473 + var writes []*comatproto.RepoApplyWrites_Input_Writes_Elem 1474 + for _, p := range stack { 1400 1475 record := p.AsRecord() 1401 - record.PatchBlob = blob.Blob 1402 1476 writes = append(writes, &comatproto.RepoApplyWrites_Input_Writes_Elem{ 1403 1477 RepoApplyWrites_Create: &comatproto.RepoApplyWrites_Create{ 1404 1478 Collection: tangled.RepoPullNSID, ··· 1414 1488 Writes: writes, 1415 1489 }) 1416 1490 if err != nil { 1417 - s.logger.Error("failed to create stacked pull request", "err", err) 1491 + l.Error("failed to create stacked pull request", "err", err) 1418 1492 s.pages.Notice(w, "pull", "Failed to create stacked pull request. Try again later.") 1419 1493 return 1420 1494 } ··· 1422 1496 // create all pulls at once 1423 1497 tx, err := s.db.BeginTx(r.Context(), nil) 1424 1498 if err != nil { 1425 - s.logger.Error("failed to start tx", "err", err) 1499 + l.Error("failed to start tx", "err", err) 1426 1500 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1427 1501 return 1428 1502 } 1429 1503 defer tx.Rollback() 1430 1504 1431 1505 for _, p := range stack { 1432 - err = db.NewPull(tx, p) 1506 + err = db.PutPull(tx, p) 1433 1507 if err != nil { 1434 - s.logger.Error("failed to create pull request", "err", err) 1508 + l.Error("failed to create pull request in database", "err", err, "pull_rkey", p.Rkey) 1435 1509 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1436 1510 return 1437 1511 } ··· 1439 1513 } 1440 1514 1441 1515 if err = tx.Commit(); err != nil { 1442 - s.logger.Error("failed to create pull request", "err", err) 1516 + l.Error("failed to commit transaction for pull requests", "err", err) 1443 1517 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1444 1518 return 1445 1519 } ··· 1456 1530 } 1457 1531 1458 1532 func (s *Pulls) ValidatePatch(w http.ResponseWriter, r *http.Request) { 1533 + l := s.logger.With("handler", "ValidatePatch") 1534 + 1459 1535 _, err := s.repoResolver.Resolve(r) 1460 1536 if err != nil { 1461 - s.logger.Error("failed to get repo and knot", "err", err) 1537 + l.Error("failed to get repo and knot", "err", err) 1462 1538 return 1463 1539 } 1464 1540 ··· 1469 1545 } 1470 1546 1471 1547 if err := s.validator.ValidatePatch(&patch); err != nil { 1472 - s.logger.Error("failed to validate patch", "err", err) 1548 + l.Error("failed to validate patch", "err", err) 1473 1549 s.pages.Notice(w, "patch-error", "Invalid patch format. Please provide a valid git diff or format-patch.") 1474 1550 return 1475 1551 } ··· 1490 1566 } 1491 1567 1492 1568 func (s *Pulls) CompareBranchesFragment(w http.ResponseWriter, r *http.Request) { 1569 + l := s.logger.With("handler", "CompareBranchesFragment") 1570 + 1493 1571 user := s.oauth.GetMultiAccountUser(r) 1494 1572 f, err := s.repoResolver.Resolve(r) 1495 1573 if err != nil { 1496 - s.logger.Error("failed to get repo and knot", "err", err) 1574 + l.Error("failed to get repo and knot", "err", err) 1497 1575 return 1498 1576 } 1499 1577 ··· 1501 1579 1502 1580 xrpcBytes, err := tangled.GitTempListBranches(r.Context(), xrpcc, "", 0, f.RepoAt().String()) 1503 1581 if err != nil { 1504 - s.logger.Error("failed to fetch branches", "err", err) 1582 + l.Error("failed to fetch branches", "err", err) 1505 1583 s.pages.Error503(w) 1506 1584 return 1507 1585 } 1508 1586 1509 1587 var result types.RepoBranchesResponse 1510 1588 if err := json.Unmarshal(xrpcBytes, &result); err != nil { 1511 - s.logger.Error("failed to decode XRPC response", "err", err) 1589 + l.Error("failed to decode XRPC response", "err", err) 1512 1590 s.pages.Error503(w) 1513 1591 return 1514 1592 } ··· 1533 1611 } 1534 1612 1535 1613 func (s *Pulls) CompareForksFragment(w http.ResponseWriter, r *http.Request) { 1614 + l := s.logger.With("handler", "CompareForksFragment") 1615 + 1536 1616 user := s.oauth.GetMultiAccountUser(r) 1617 + if user != nil && user.Active != nil { 1618 + l = l.With("user", user.Active.Did) 1619 + } 1537 1620 1538 1621 forks, err := db.GetForksByDid(s.db, user.Active.Did) 1539 1622 if err != nil { 1540 - s.logger.Error("failed to get forks", "err", err) 1623 + l.Error("failed to get forks", "err", err) 1541 1624 return 1542 1625 } 1543 1626 ··· 1549 1632 } 1550 1633 1551 1634 func (s *Pulls) CompareForksBranchesFragment(w http.ResponseWriter, r *http.Request) { 1635 + l := s.logger.With("handler", "CompareForksBranchesFragment") 1636 + 1552 1637 user := s.oauth.GetMultiAccountUser(r) 1638 + if user != nil && user.Active != nil { 1639 + l = l.With("user", user.Active.Did) 1640 + } 1553 1641 1554 1642 f, err := s.repoResolver.Resolve(r) 1555 1643 if err != nil { 1556 - s.logger.Error("failed to get repo and knot", "err", err) 1644 + l.Error("failed to get repo and knot", "err", err) 1557 1645 return 1558 1646 } 1559 1647 ··· 1570 1658 orm.FilterEq("name", forkName), 1571 1659 ) 1572 1660 if err != nil { 1573 - s.logger.Error("failed to get repo", "did", forkOwnerDid, "name", forkName, "err", err) 1661 + l.Error("failed to get repo", "fork_owner_did", forkOwnerDid, "fork_name", forkName, "err", err) 1574 1662 return 1575 1663 } 1576 1664 1577 1665 sourceXrpcBytes, err := tangled.GitTempListBranches(r.Context(), xrpcc, "", 0, repo.RepoAt().String()) 1578 1666 if err != nil { 1579 1667 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1580 - s.logger.Error("failed to call XRPC repo.branches for source", "err", xrpcerr) 1668 + l.Error("failed to call XRPC repo.branches for source", "err", xrpcerr) 1581 1669 s.pages.Error503(w) 1582 1670 return 1583 1671 } 1584 - s.logger.Error("failed to fetch source branches", "err", err) 1672 + l.Error("failed to fetch source branches", "err", err) 1585 1673 return 1586 1674 } 1587 1675 1588 1676 // Decode source branches 1589 1677 var sourceBranches types.RepoBranchesResponse 1590 1678 if err := json.Unmarshal(sourceXrpcBytes, &sourceBranches); err != nil { 1591 - s.logger.Error("failed to decode source branches XRPC response", "err", err) 1679 + l.Error("failed to decode source branches XRPC response", "err", err) 1592 1680 s.pages.Error503(w) 1593 1681 return 1594 1682 } ··· 1596 1684 targetXrpcBytes, err := tangled.GitTempListBranches(r.Context(), xrpcc, "", 0, f.RepoAt().String()) 1597 1685 if err != nil { 1598 1686 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1599 - s.logger.Error("failed to call XRPC repo.branches for target", "err", xrpcerr) 1687 + l.Error("failed to call XRPC repo.branches for target", "err", xrpcerr) 1600 1688 s.pages.Error503(w) 1601 1689 return 1602 1690 } 1603 - s.logger.Error("failed to fetch target branches", "err", err) 1691 + l.Error("failed to fetch target branches", "err", err) 1604 1692 return 1605 1693 } 1606 1694 1607 1695 // Decode target branches 1608 1696 var targetBranches types.RepoBranchesResponse 1609 1697 if err := json.Unmarshal(targetXrpcBytes, &targetBranches); err != nil { 1610 - s.logger.Error("failed to decode target branches XRPC response", "err", err) 1698 + l.Error("failed to decode target branches XRPC response", "err", err) 1611 1699 s.pages.Error503(w) 1612 1700 return 1613 1701 } ··· 1624 1712 } 1625 1713 1626 1714 func (s *Pulls) ResubmitPull(w http.ResponseWriter, r *http.Request) { 1715 + l := s.logger.With("handler", "ResubmitPull") 1716 + 1627 1717 user := s.oauth.GetMultiAccountUser(r) 1718 + if user != nil && user.Active != nil { 1719 + l = l.With("user", user.Active.Did) 1720 + } 1628 1721 1629 1722 pull, ok := r.Context().Value("pull").(*models.Pull) 1630 1723 if !ok { 1631 - s.logger.Error("failed to get pull") 1724 + l.Error("failed to get pull") 1632 1725 s.pages.Notice(w, "pull-error", "Failed to edit patch. Try again later.") 1633 1726 return 1634 1727 } 1728 + l = l.With("pull_id", pull.PullId, "pull_owner", pull.OwnerDid) 1635 1729 1636 1730 switch r.Method { 1637 1731 case http.MethodGet: ··· 1655 1749 } 1656 1750 1657 1751 func (s *Pulls) resubmitPatch(w http.ResponseWriter, r *http.Request) { 1752 + l := s.logger.With("handler", "resubmitPatch") 1753 + 1658 1754 user := s.oauth.GetMultiAccountUser(r) 1755 + if user != nil && user.Active != nil { 1756 + l = l.With("user", user.Active.Did) 1757 + } 1659 1758 1660 1759 pull, ok := r.Context().Value("pull").(*models.Pull) 1661 1760 if !ok { 1662 - s.logger.Error("failed to get pull") 1761 + l.Error("failed to get pull") 1663 1762 s.pages.Notice(w, "pull-error", "Failed to edit patch. Try again later.") 1664 1763 return 1665 1764 } 1765 + l = l.With("pull_id", pull.PullId, "pull_owner", pull.OwnerDid) 1666 1766 1667 1767 f, err := s.repoResolver.Resolve(r) 1668 1768 if err != nil { 1669 - s.logger.Error("failed to get repo and knot", "err", err) 1769 + l.Error("failed to get repo and knot", "err", err) 1670 1770 return 1671 1771 } 1672 1772 1673 1773 if user.Active.Did != pull.OwnerDid { 1674 - s.logger.Warn("unauthorized user") 1774 + l.Error("unauthorized user", "actual_user", user.Active.Did, "expected_owner", pull.OwnerDid) 1675 1775 w.WriteHeader(http.StatusUnauthorized) 1676 1776 return 1677 1777 } ··· 1682 1782 } 1683 1783 1684 1784 func (s *Pulls) resubmitBranch(w http.ResponseWriter, r *http.Request) { 1785 + l := s.logger.With("handler", "resubmitBranch") 1786 + 1685 1787 user := s.oauth.GetMultiAccountUser(r) 1788 + if user != nil && user.Active != nil { 1789 + l = l.With("user", user.Active.Did) 1790 + } 1686 1791 1687 1792 pull, ok := r.Context().Value("pull").(*models.Pull) 1688 1793 if !ok { 1689 - s.logger.Error("failed to get pull") 1794 + l.Error("failed to get pull") 1690 1795 s.pages.Notice(w, "resubmit-error", "Failed to edit patch. Try again later.") 1691 1796 return 1692 1797 } 1798 + l = l.With("pull_id", pull.PullId, "pull_owner", pull.OwnerDid, "target_branch", pull.TargetBranch) 1693 1799 1694 1800 f, err := s.repoResolver.Resolve(r) 1695 1801 if err != nil { 1696 - s.logger.Error("failed to get repo and knot", "err", err) 1802 + l.Error("failed to get repo and knot", "err", err) 1697 1803 return 1698 1804 } 1699 1805 1700 1806 if user.Active.Did != pull.OwnerDid { 1701 - s.logger.Warn("unauthorized user") 1807 + l.Error("unauthorized user", "actual_user", user.Active.Did, "expected_owner", pull.OwnerDid) 1702 1808 w.WriteHeader(http.StatusUnauthorized) 1703 1809 return 1704 1810 } 1705 1811 1706 1812 roles := repoinfo.RolesInRepo{Roles: s.enforcer.GetPermissionsInRepo(user.Active.Did, f.Knot, f.RepoIdentifier())} 1707 1813 if !roles.IsPushAllowed() { 1708 - s.logger.Warn("unauthorized user") 1814 + l.Error("unauthorized user - no push permission") 1709 1815 w.WriteHeader(http.StatusUnauthorized) 1710 1816 return 1711 1817 } ··· 1722 1828 xrpcBytes, err := tangled.RepoCompare(r.Context(), xrpcc, f.RepoIdentifier(), pull.TargetBranch, pull.PullSource.Branch) 1723 1829 if err != nil { 1724 1830 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1725 - s.logger.Error("failed to call XRPC repo.compare", "err", xrpcerr) 1831 + l.Error("failed to call XRPC repo.compare", "err", xrpcerr, "source_branch", pull.PullSource.Branch) 1726 1832 s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.") 1727 1833 return 1728 1834 } 1729 - s.logger.Error("compare request failed", "err", err) 1835 + l.Error("compare request failed", "err", err, "source_branch", pull.PullSource.Branch) 1730 1836 s.pages.Notice(w, "resubmit-error", err.Error()) 1731 1837 return 1732 1838 } 1733 1839 1734 1840 var comparison types.RepoFormatPatchResponse 1735 1841 if err := json.Unmarshal(xrpcBytes, &comparison); err != nil { 1736 - s.logger.Error("failed to decode XRPC compare response", "err", err) 1842 + l.Error("failed to decode XRPC compare response", "err", err) 1737 1843 s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.") 1738 1844 return 1739 1845 } ··· 1746 1852 } 1747 1853 1748 1854 func (s *Pulls) resubmitFork(w http.ResponseWriter, r *http.Request) { 1855 + l := s.logger.With("handler", "resubmitFork") 1856 + 1749 1857 user := s.oauth.GetMultiAccountUser(r) 1858 + if user != nil && user.Active != nil { 1859 + l = l.With("user", user.Active.Did) 1860 + } 1750 1861 1751 1862 pull, ok := r.Context().Value("pull").(*models.Pull) 1752 1863 if !ok { 1753 - s.logger.Error("failed to get pull") 1864 + l.Error("failed to get pull") 1754 1865 s.pages.Notice(w, "resubmit-error", "Failed to edit patch. Try again later.") 1755 1866 return 1756 1867 } 1868 + l = l.With("pull_id", pull.PullId, "pull_owner", pull.OwnerDid, "target_branch", pull.TargetBranch) 1757 1869 1758 1870 f, err := s.repoResolver.Resolve(r) 1759 1871 if err != nil { 1760 - s.logger.Error("failed to get repo and knot", "err", err) 1872 + l.Error("failed to get repo and knot", "err", err) 1761 1873 return 1762 1874 } 1763 1875 1764 1876 if user.Active.Did != pull.OwnerDid { 1765 - s.logger.Warn("unauthorized user") 1877 + l.Error("unauthorized user", "actual_user", user.Active.Did, "expected_owner", pull.OwnerDid) 1766 1878 w.WriteHeader(http.StatusUnauthorized) 1767 1879 return 1768 1880 } 1769 1881 1770 1882 forkRepo, err := db.GetRepoByAtUri(s.db, pull.PullSource.RepoAt.String()) 1771 1883 if err != nil { 1772 - s.logger.Error("failed to get source repo", "err", err) 1884 + l.Error("failed to get source repo", "err", err, "repo_at", pull.PullSource.RepoAt.String()) 1773 1885 s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.") 1774 1886 return 1775 1887 } ··· 1782 1894 oauth.WithDev(s.config.Core.Dev), 1783 1895 ) 1784 1896 if err != nil { 1785 - s.logger.Error("failed to connect to knot server", "err", err) 1897 + l.Error("failed to connect to knot server", "err", err, "fork_knot", forkRepo.Knot) 1786 1898 return 1787 1899 } 1788 1900 ··· 1800 1912 return 1801 1913 } 1802 1914 if !resp.Success { 1803 - s.logger.Warn("failed to update tracking ref", "err", resp.Error) 1915 + l.Error("failed to update tracking ref", "err", resp.Error, "fork_ref", pull.PullSource.Branch, "remote_ref", pull.TargetBranch) 1804 1916 s.pages.Notice(w, "resubmit-error", "Failed to update tracking ref.") 1805 1917 return 1806 1918 } ··· 1815 1927 forkXrpcBytes, err := tangled.RepoCompare(r.Context(), &indigoxrpc.Client{Host: forkHost}, forkRepo.RepoIdentifier(), hiddenRef, pull.PullSource.Branch) 1816 1928 if err != nil { 1817 1929 if xrpcerr := xrpcclient.HandleXrpcErr(err); xrpcerr != nil { 1818 - s.logger.Error("failed to call XRPC repo.compare for fork", "err", xrpcerr) 1930 + l.Error("failed to call XRPC repo.compare for fork", "err", xrpcerr, "hidden_ref", hiddenRef, "source_branch", pull.PullSource.Branch) 1819 1931 s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.") 1820 1932 return 1821 1933 } 1822 - s.logger.Error("failed to compare branches", "err", err) 1934 + l.Error("failed to compare branches", "err", err, "hidden_ref", hiddenRef, "source_branch", pull.PullSource.Branch) 1823 1935 s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.") 1824 1936 return 1825 1937 } 1826 1938 1827 1939 var forkComparison types.RepoFormatPatchResponse 1828 1940 if err := json.Unmarshal(forkXrpcBytes, &forkComparison); err != nil { 1829 - s.logger.Error("failed to decode XRPC compare response for fork", "err", err) 1941 + l.Error("failed to decode XRPC compare response for fork", "err", err) 1830 1942 s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.") 1831 1943 return 1832 1944 } ··· 1851 1963 combined string, 1852 1964 sourceRev string, 1853 1965 ) { 1854 - if pull.IsStacked() { 1855 - s.logger.Info("resubmitting stacked PR") 1856 - s.resubmitStackedPullHelper(w, r, repo, user, pull, patch, pull.StackId) 1966 + l := s.logger.With("handler", "resubmitPullHelper", "user", user.Active.Did, "pull_id", pull.PullId, "target_branch", pull.TargetBranch) 1967 + 1968 + stack := r.Context().Value("stack").(models.Stack) 1969 + if stack != nil && len(stack) != 1 { 1970 + l.Info("resubmitting stacked PR", "stack_size", len(stack)) 1971 + s.resubmitStackedPullHelper(w, r, repo, user, pull, patch) 1857 1972 return 1858 1973 } 1859 1974 ··· 1875 1990 } 1876 1991 } 1877 1992 1878 - tx, err := s.db.BeginTx(r.Context(), nil) 1879 - if err != nil { 1880 - s.logger.Error("failed to start tx", "err", err) 1881 - s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.") 1882 - return 1883 - } 1884 - defer tx.Rollback() 1885 - 1886 1993 pullAt := pull.AtUri() 1887 1994 newRoundNumber := len(pull.Submissions) 1888 1995 newPatch := patch 1889 1996 newSourceRev := sourceRev 1890 1997 combinedPatch := combined 1891 - err = db.ResubmitPull(tx, pullAt, newRoundNumber, newPatch, combinedPatch, newSourceRev) 1892 - if err != nil { 1893 - s.logger.Error("failed to create pull request", "err", err) 1894 - s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.") 1895 - return 1896 - } 1998 + 1897 1999 client, err := s.oauth.AuthorizedClient(r) 1898 2000 if err != nil { 1899 - s.logger.Error("failed to authorize client", "err", err) 2001 + l.Error("failed to authorize client", "err", err) 1900 2002 s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.") 1901 2003 return 1902 2004 } ··· 1904 2006 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoPullNSID, user.Active.Did, pull.Rkey) 1905 2007 if err != nil { 1906 2008 // failed to get record 2009 + l.Error("failed to get record from PDS", "err", err, "rkey", pull.Rkey) 1907 2010 s.pages.Notice(w, "resubmit-error", "Failed to update pull, no record found on PDS.") 1908 2011 return 1909 2012 } 1910 2013 1911 2014 blob, err := xrpc.RepoUploadBlob(r.Context(), client, gz(patch), ApplicationGzip) 1912 2015 if err != nil { 1913 - s.logger.Error("failed to upload patch blob", "err", err) 2016 + l.Error("failed to upload patch blob", "err", err) 1914 2017 s.pages.Notice(w, "resubmit-error", "Failed to update pull request on the PDS. Try again later.") 1915 2018 return 1916 2019 } 1917 2020 record := pull.AsRecord() 1918 - record.PatchBlob = blob.Blob 2021 + record.Rounds = append(record.Rounds, &tangled.RepoPull_Round{ 2022 + CreatedAt: time.Now().Format(time.RFC3339), 2023 + PatchBlob: blob.Blob, 2024 + }) 1919 2025 record.CreatedAt = time.Now().Format(time.RFC3339) 1920 2026 1921 - if record.Source != nil { 1922 - record.Source.Sha = newSourceRev 1923 - } 1924 - 1925 2027 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 1926 2028 Collection: tangled.RepoPullNSID, 1927 2029 Repo: user.Active.Did, ··· 1932 2034 }, 1933 2035 }) 1934 2036 if err != nil { 1935 - s.logger.Error("failed to update record", "err", err) 2037 + l.Error("failed to update record on PDS", "err", err, "rkey", pull.Rkey) 1936 2038 s.pages.Notice(w, "resubmit-error", "Failed to update pull request on the PDS. Try again later.") 1937 2039 return 1938 2040 } 1939 2041 1940 - if err = tx.Commit(); err != nil { 1941 - s.logger.Error("failed to commit transaction", "err", err) 1942 - s.pages.Notice(w, "resubmit-error", "Failed to resubmit pull.") 2042 + err = db.ResubmitPull(s.db, pullAt, newRoundNumber, newPatch, combinedPatch, newSourceRev, blob.Blob) 2043 + if err != nil { 2044 + l.Error("failed to resubmit pull request in database", "err", err, "round_number", newRoundNumber) 2045 + s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.") 1943 2046 return 1944 2047 } 1945 2048 ··· 1954 2057 user *oauth.MultiAccountUser, 1955 2058 pull *models.Pull, 1956 2059 patch string, 1957 - stackId string, 1958 2060 ) { 2061 + l := s.logger.With("handler", "resubmitStackedPullHelper", "user", user.Active.Did, "pull_id", pull.PullId, "target_branch", pull.TargetBranch) 2062 + 1959 2063 targetBranch := pull.TargetBranch 1960 2064 1961 2065 origStack, _ := r.Context().Value("stack").(models.Stack) 1962 - newStack, err := s.newStack(r.Context(), repo, user, targetBranch, patch, pull.PullSource, stackId) 2066 + 2067 + formatPatches, err := patchutil.ExtractPatches(patch) 1963 2068 if err != nil { 1964 - s.logger.Error("failed to create resubmitted stack", "err", err) 2069 + l.Error("failed to extract patches", "err", err) 2070 + s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Failed to parse patches.") 2071 + return 2072 + } 2073 + 2074 + // must have atleast 1 patch to begin with 2075 + if len(formatPatches) == 0 { 2076 + l.Error("no patches found in the generated format-patch") 2077 + s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request: No patches found in the generated patch.") 2078 + return 2079 + } 2080 + 2081 + client, err := s.oauth.AuthorizedClient(r) 2082 + if err != nil { 2083 + l.Error("failed to get authorized client", "err", err) 2084 + s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 2085 + return 2086 + } 2087 + 2088 + // first upload all blobs 2089 + blobs := make([]*lexutil.LexBlob, len(formatPatches)) 2090 + for i, p := range formatPatches { 2091 + blob, err := xrpc.RepoUploadBlob(r.Context(), client, gz(p.Raw), ApplicationGzip) 2092 + if err != nil { 2093 + l.Error("failed to upload patch blob", "err", err, "patch_index", i) 2094 + s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 2095 + return 2096 + } 2097 + l.Info("uploaded blob", "idx", i+1, "total", len(formatPatches)) 2098 + blobs[i] = blob.Blob 2099 + } 2100 + 2101 + newStack, err := s.newStack(r.Context(), repo, user, targetBranch, pull.PullSource, formatPatches, blobs) 2102 + if err != nil { 2103 + l.Error("failed to create resubmitted stack", "err", err) 1965 2104 s.pages.Notice(w, "pull-merge-error", "Failed to merge pull request. Try again later.") 1966 2105 return 1967 2106 } ··· 1970 2109 origById := make(map[string]*models.Pull) 1971 2110 newById := make(map[string]*models.Pull) 1972 2111 for _, p := range origStack { 1973 - origById[p.ChangeId] = p 2112 + origById[p.LatestSubmission().ChangeId()] = p 1974 2113 } 1975 2114 for _, p := range newStack { 1976 - newById[p.ChangeId] = p 2115 + newById[p.LatestSubmission().ChangeId()] = p 1977 2116 } 1978 2117 1979 2118 // commits that got deleted: corresponding pull is closed ··· 1985 2124 1986 2125 // pulls in original stack but not in new one 1987 2126 for _, op := range origStack { 1988 - if _, ok := newById[op.ChangeId]; !ok { 1989 - deletions[op.ChangeId] = op 2127 + if _, ok := newById[op.LatestSubmission().ChangeId()]; !ok { 2128 + deletions[op.LatestSubmission().ChangeId()] = op 1990 2129 } 1991 2130 } 1992 2131 1993 2132 // pulls in new stack but not in original one 1994 2133 for _, np := range newStack { 1995 - if _, ok := origById[np.ChangeId]; !ok { 1996 - additions[np.ChangeId] = np 2134 + if _, ok := origById[np.LatestSubmission().ChangeId()]; !ok { 2135 + additions[np.LatestSubmission().ChangeId()] = np 1997 2136 } 1998 2137 } 1999 2138 2000 2139 // NOTE: this loop can be written in any of above blocks, 2001 2140 // but is written separately in the interest of simpler code 2002 2141 for _, np := range newStack { 2003 - if op, ok := origById[np.ChangeId]; ok { 2142 + if op, ok := origById[np.LatestSubmission().ChangeId()]; ok { 2004 2143 // pull exists in both stacks 2005 - updated[op.ChangeId] = struct{}{} 2144 + updated[op.LatestSubmission().ChangeId()] = struct{}{} 2006 2145 } 2007 2146 } 2008 2147 2148 + // NOTE: we can go through the newStack and update dependent relations and 2149 + // rkeys now that we know which ones have been updated 2150 + // update dependentOn relations for the entire stack 2151 + var parentAt *syntax.ATURI 2152 + for _, np := range newStack { 2153 + if op, ok := origById[np.LatestSubmission().ChangeId()]; ok { 2154 + // pull exists in both stacks 2155 + np.Rkey = op.Rkey 2156 + } 2157 + np.DependentOn = parentAt 2158 + x := np.AtUri() 2159 + parentAt = &x 2160 + } 2161 + 2162 + l = l.With("additions", len(additions), "deletions", len(deletions), "updates", len(updated)) 2163 + 2009 2164 tx, err := s.db.Begin() 2010 2165 if err != nil { 2011 - s.logger.Error("failed to start transaction", "err", err) 2166 + l.Error("failed to start transaction", "err", err) 2012 2167 s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.") 2013 2168 return 2014 2169 } 2015 2170 defer tx.Rollback() 2016 2171 2017 - client, err := s.oauth.AuthorizedClient(r) 2018 - if err != nil { 2019 - s.logger.Error("failed to authorize client", "err", err) 2020 - s.pages.Notice(w, "resubmit-error", "Failed to create pull request. Try again later.") 2021 - return 2022 - } 2023 - 2024 2172 // pds updates to make 2025 2173 var writes []*comatproto.RepoApplyWrites_Input_Writes_Elem 2026 2174 ··· 2031 2179 continue 2032 2180 } 2033 2181 2034 - err := db.DeletePull(tx, p.RepoAt, p.PullId) 2182 + err := db.AbandonPulls(tx, orm.FilterEq("repo_at", p.RepoAt), orm.FilterEq("at_uri", p.AtUri())) 2035 2183 if err != nil { 2036 - s.logger.Error("failed to delete pull", "err", err, "pull_id", p.PullId) 2184 + l.Error("failed to delete pull", "err", err, "pull_id", p.PullId) 2037 2185 s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.") 2038 2186 return 2039 2187 } ··· 2047 2195 2048 2196 // new pulls are created 2049 2197 for _, p := range additions { 2050 - err := db.NewPull(tx, p) 2198 + blob, err := xrpc.RepoUploadBlob(r.Context(), client, gz(p.LatestPatch()), ApplicationGzip) 2051 2199 if err != nil { 2052 - s.logger.Error("failed to create pull", "err", err, "pull_id", p.PullId) 2053 - s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.") 2200 + l.Error("failed to upload patch blob for new pull", "err", err, "change_id", p.LatestSubmission().ChangeId()) 2201 + s.pages.Notice(w, "resubmit-error", "Failed to update pull request on the PDS. Try again later.") 2054 2202 return 2055 2203 } 2204 + p.Submissions[0].Blob = *blob.Blob 2056 2205 2057 - blob, err := xrpc.RepoUploadBlob(r.Context(), client, gz(patch), ApplicationGzip) 2058 - if err != nil { 2059 - s.logger.Error("failed to upload patch blob", "err", err) 2060 - s.pages.Notice(w, "resubmit-error", "Failed to update pull request on the PDS. Try again later.") 2206 + if err = db.PutPull(tx, p); err != nil { 2207 + l.Error("failed to create pull", "err", err, "pull_id", p.PullId, "change_id", p.LatestSubmission().ChangeId()) 2208 + s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.") 2061 2209 return 2062 2210 } 2211 + 2063 2212 record := p.AsRecord() 2064 - record.PatchBlob = blob.Blob 2213 + record.Rounds = []*tangled.RepoPull_Round{ 2214 + { 2215 + CreatedAt: time.Now().Format(time.RFC3339), 2216 + PatchBlob: blob.Blob, 2217 + }, 2218 + } 2065 2219 writes = append(writes, &comatproto.RepoApplyWrites_Input_Writes_Elem{ 2066 2220 RepoApplyWrites_Create: &comatproto.RepoApplyWrites_Create{ 2067 2221 Collection: tangled.RepoPullNSID, ··· 2084 2238 } 2085 2239 2086 2240 // resubmit the new pull 2241 + np.Rkey = op.Rkey 2087 2242 pullAt := op.AtUri() 2088 2243 newRoundNumber := len(op.Submissions) 2089 2244 newPatch := np.LatestPatch() 2090 2245 combinedPatch := np.LatestSubmission().Combined 2091 2246 newSourceRev := np.LatestSha() 2092 - err := db.ResubmitPull(tx, pullAt, newRoundNumber, newPatch, combinedPatch, newSourceRev) 2247 + 2248 + blob, err := xrpc.RepoUploadBlob(r.Context(), client, gz(newPatch), ApplicationGzip) 2093 2249 if err != nil { 2094 - s.logger.Error("failed to update pull", "err", err, "pull_id", op.PullId) 2095 - s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.") 2250 + l.Error("failed to upload patch blob for update", "err", err, "change_id", id, "pull_id", op.PullId) 2251 + s.pages.Notice(w, "resubmit-error", "Failed to update pull request on the PDS. Try again later.") 2096 2252 return 2097 2253 } 2098 2254 2099 - blob, err := xrpc.RepoUploadBlob(r.Context(), client, gz(patch), ApplicationGzip) 2255 + // create new round 2256 + err = db.ResubmitPull(tx, pullAt, newRoundNumber, newPatch, combinedPatch, newSourceRev, blob.Blob) 2100 2257 if err != nil { 2101 - s.logger.Error("failed to upload patch blob", "err", err) 2102 - s.pages.Notice(w, "resubmit-error", "Failed to update pull request on the PDS. Try again later.") 2258 + l.Error("failed to update pull in database", "err", err, "pull_id", op.PullId, "round_number", newRoundNumber) 2259 + s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.") 2103 2260 return 2104 2261 } 2262 + 2263 + // update dependent-on relation 2264 + if np.DependentOn != nil { 2265 + err := db.SetDependentOn(tx, *np.DependentOn, orm.FilterEq("at_uri", np.AtUri())) 2266 + if err != nil { 2267 + l.Error("failed to update pull in database", "err", err, "pull_id", op.PullId, "round_number", newRoundNumber) 2268 + s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.") 2269 + return 2270 + } 2271 + } 2272 + 2105 2273 record := np.AsRecord() 2106 - record.PatchBlob = blob.Blob 2274 + record.Rounds = op.AsRecord().Rounds 2275 + record.Rounds = append(record.Rounds, &tangled.RepoPull_Round{ 2276 + CreatedAt: time.Now().Format(time.RFC3339), 2277 + PatchBlob: blob.Blob, 2278 + }) 2107 2279 writes = append(writes, &comatproto.RepoApplyWrites_Input_Writes_Elem{ 2108 2280 RepoApplyWrites_Update: &comatproto.RepoApplyWrites_Update{ 2109 2281 Collection: tangled.RepoPullNSID, ··· 2115 2287 }) 2116 2288 } 2117 2289 2118 - // update parent-change-id relations for the entire stack 2119 - for _, p := range newStack { 2120 - err := db.SetPullParentChangeId( 2121 - tx, 2122 - p.ParentChangeId, 2123 - // these should be enough filters to be unique per-stack 2124 - orm.FilterEq("repo_at", p.RepoAt.String()), 2125 - orm.FilterEq("owner_did", p.OwnerDid), 2126 - orm.FilterEq("change_id", p.ChangeId), 2127 - ) 2128 - 2129 - if err != nil { 2130 - s.logger.Error("failed to update pull", "err", err, "pull_id", p.PullId) 2131 - s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.") 2132 - return 2133 - } 2134 - } 2135 - 2136 - err = tx.Commit() 2137 - if err != nil { 2138 - s.logger.Error("failed to resubmit pull", "err", err) 2139 - s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.") 2140 - return 2141 - } 2142 - 2143 2290 _, err = comatproto.RepoApplyWrites(r.Context(), client, &comatproto.RepoApplyWrites_Input{ 2144 2291 Repo: user.Active.Did, 2145 2292 Writes: writes, 2146 2293 }) 2147 2294 if err != nil { 2148 - s.logger.Error("failed to create stacked pull request", "err", err) 2295 + l.Error("failed to apply writes for stacked pull request", "err", err, "writes_count", len(writes)) 2149 2296 s.pages.Notice(w, "pull", "Failed to create stacked pull request. Try again later.") 2150 2297 return 2151 2298 } 2152 2299 2300 + err = tx.Commit() 2301 + if err != nil { 2302 + l.Error("failed to commit resubmit transaction", "err", err) 2303 + s.pages.Notice(w, "pull-resubmit-error", "Failed to resubmit pull request. Try again later.") 2304 + return 2305 + } 2306 + 2153 2307 ownerSlashRepo := reporesolver.GetBaseRepoPath(r, repo) 2154 2308 s.pages.HxLocation(w, fmt.Sprintf("/%s/pulls/%d", ownerSlashRepo, pull.PullId)) 2155 2309 } 2156 2310 2157 2311 func (s *Pulls) MergePull(w http.ResponseWriter, r *http.Request) { 2312 + l := s.logger.With("handler", "MergePull") 2313 + 2158 2314 user := s.oauth.GetMultiAccountUser(r) 2315 + if user != nil && user.Active != nil { 2316 + l = l.With("user", user.Active.Did) 2317 + } 2318 + 2159 2319 f, err := s.repoResolver.Resolve(r) 2160 2320 if err != nil { 2161 - s.logger.Error("failed to resolve repo:", "err", err) 2321 + l.Error("failed to resolve repo", "err", err) 2162 2322 s.pages.Notice(w, "pull-merge-error", "Failed to merge pull request. Try again later.") 2163 2323 return 2164 2324 } 2325 + l = l.With("repo_at", f.RepoAt().String()) 2165 2326 2166 2327 pull, ok := r.Context().Value("pull").(*models.Pull) 2167 2328 if !ok { 2168 - s.logger.Error("failed to get pull") 2329 + l.Error("failed to get pull") 2169 2330 s.pages.Notice(w, "pull-merge-error", "Failed to merge patch. Try again later.") 2170 2331 return 2171 2332 } 2333 + l = l.With("pull_id", pull.PullId, "target_branch", pull.TargetBranch) 2172 2334 2173 - var pullsToMerge models.Stack 2174 - pullsToMerge = append(pullsToMerge, pull) 2175 - if pull.IsStacked() { 2176 - stack, ok := r.Context().Value("stack").(models.Stack) 2177 - if !ok { 2178 - s.logger.Error("failed to get stack") 2179 - s.pages.Notice(w, "pull-merge-error", "Failed to merge patch. Try again later.") 2180 - return 2181 - } 2182 - 2183 - // combine patches of substack 2184 - subStack := stack.StrictlyBelow(pull) 2185 - // collect the portion of the stack that is mergeable 2186 - mergeable := subStack.Mergeable() 2187 - // add to total patch 2188 - pullsToMerge = append(pullsToMerge, mergeable...) 2335 + stack, ok := r.Context().Value("stack").(models.Stack) 2336 + if !ok { 2337 + l.Error("failed to get stack") 2338 + s.pages.Notice(w, "pull-merge-error", "Failed to merge patch. Try again later.") 2339 + return 2189 2340 } 2190 2341 2342 + // combine patches of substack 2343 + subStack := stack.Below(pull) 2344 + // collect the portion of the stack that is mergeable 2345 + pullsToMerge := subStack.Mergeable() 2346 + l = l.With("pulls_to_merge", len(pullsToMerge)) 2347 + 2191 2348 patch := pullsToMerge.CombinedPatch() 2192 2349 2193 2350 ident, err := s.idResolver.ResolveIdent(r.Context(), pull.OwnerDid) 2194 2351 if err != nil { 2195 - s.logger.Error("resolving identity", "err", err) 2352 + l.Error("failed to resolve identity", "err", err, "owner_did", pull.OwnerDid) 2196 2353 w.WriteHeader(http.StatusNotFound) 2197 2354 return 2198 2355 } 2199 2356 2200 2357 email, err := db.GetPrimaryEmail(s.db, pull.OwnerDid) 2201 2358 if err != nil { 2202 - s.logger.Error("failed to get primary email", "err", err) 2359 + l.Warn("failed to get primary email", "err", err, "owner_did", pull.OwnerDid) 2203 2360 } 2204 2361 2205 2362 authorName := ident.Handle.String() ··· 2227 2384 oauth.WithDev(s.config.Core.Dev), 2228 2385 ) 2229 2386 if err != nil { 2230 - s.logger.Error("failed to connect to knot server", "err", err) 2387 + l.Error("failed to connect to knot server", "err", err, "knot", f.Knot) 2231 2388 s.pages.Notice(w, "pull-merge-error", "Failed to merge pull request. Try again later.") 2232 2389 return 2233 2390 } ··· 2240 2397 2241 2398 tx, err := s.db.Begin() 2242 2399 if err != nil { 2243 - s.logger.Error("failed to start transaction", "err", err) 2400 + l.Error("failed to start transaction", "err", err) 2244 2401 s.pages.Notice(w, "pull-merge-error", "Failed to merge pull request. Try again later.") 2245 2402 return 2246 2403 } 2247 2404 defer tx.Rollback() 2248 2405 2406 + var atUris []syntax.ATURI 2249 2407 for _, p := range pullsToMerge { 2250 - err := db.MergePull(tx, f.RepoAt(), p.PullId) 2251 - if err != nil { 2252 - s.logger.Error("failed to update pull request status in database", "err", err) 2253 - s.pages.Notice(w, "pull-merge-error", "Failed to merge pull request. Try again later.") 2254 - return 2255 - } 2408 + atUris = append(atUris, p.AtUri()) 2256 2409 p.State = models.PullMerged 2257 2410 } 2411 + err = db.MergePulls(tx, orm.FilterEq("repo_at", f.RepoAt()), orm.FilterIn("at_uri", atUris)) 2412 + if err != nil { 2413 + l.Error("failed to update pull request status in database", "err", err) 2414 + s.pages.Notice(w, "pull-merge-error", "Failed to merge pull request. Try again later.") 2415 + return 2416 + } 2258 2417 2259 2418 err = tx.Commit() 2260 2419 if err != nil { 2261 2420 // TODO: this is unsound, we should also revert the merge from the knotserver here 2262 - s.logger.Error("failed to update pull request status in database", "err", err) 2421 + l.Error("failed to commit merge transaction", "err", err) 2263 2422 s.pages.Notice(w, "pull-merge-error", "Failed to merge pull request. Try again later.") 2264 2423 return 2265 2424 } ··· 2274 2433 } 2275 2434 2276 2435 func (s *Pulls) ClosePull(w http.ResponseWriter, r *http.Request) { 2436 + l := s.logger.With("handler", "ClosePull") 2437 + 2277 2438 user := s.oauth.GetMultiAccountUser(r) 2439 + if user != nil && user.Active != nil { 2440 + l = l.With("user", user.Active.Did) 2441 + } 2278 2442 2279 2443 f, err := s.repoResolver.Resolve(r) 2280 2444 if err != nil { 2281 - s.logger.Error("malformed middleware", "err", err) 2445 + l.Error("failed to resolve repo", "err", err) 2282 2446 return 2283 2447 } 2284 2448 2285 2449 pull, ok := r.Context().Value("pull").(*models.Pull) 2286 2450 if !ok { 2287 - s.logger.Error("failed to get pull") 2451 + l.Error("failed to get pull") 2288 2452 s.pages.Notice(w, "pull-error", "Failed to edit patch. Try again later.") 2289 2453 return 2290 2454 } 2455 + l = l.With("pull_id", pull.PullId, "pull_owner", pull.OwnerDid) 2291 2456 2292 2457 // auth filter: only owner or collaborators can close 2293 2458 roles := repoinfo.RolesInRepo{Roles: s.enforcer.GetPermissionsInRepo(user.Active.Did, f.Knot, f.RepoIdentifier())} ··· 2296 2461 isPullAuthor := user.Active.Did == pull.OwnerDid 2297 2462 isCloseAllowed := isOwner || isCollaborator || isPullAuthor 2298 2463 if !isCloseAllowed { 2299 - s.logger.Warn("failed to close pull: unauthorized") 2464 + l.Error("unauthorized to close pull", "is_owner", isOwner, "is_collaborator", isCollaborator, "is_pull_author", isPullAuthor) 2300 2465 s.pages.Notice(w, "pull-close", "You are unauthorized to close this pull.") 2301 2466 return 2302 2467 } ··· 2304 2469 // Start a transaction 2305 2470 tx, err := s.db.BeginTx(r.Context(), nil) 2306 2471 if err != nil { 2307 - s.logger.Error("failed to start transaction", "err", err) 2472 + l.Error("failed to start transaction", "err", err) 2308 2473 s.pages.Notice(w, "pull-close", "Failed to close pull.") 2309 2474 return 2310 2475 } 2311 2476 defer tx.Rollback() 2312 2477 2313 - var pullsToClose []*models.Pull 2314 - pullsToClose = append(pullsToClose, pull) 2315 - 2316 - // if this PR is stacked, then we want to close all PRs below this one on the stack 2317 - if pull.IsStacked() { 2318 - stack := r.Context().Value("stack").(models.Stack) 2319 - subStack := stack.StrictlyBelow(pull) 2320 - pullsToClose = append(pullsToClose, subStack...) 2321 - } 2322 - 2478 + // if this PR is stacked, then we want to close all PRs above this one on the stack 2479 + stack := r.Context().Value("stack").(models.Stack) 2480 + pullsToClose := stack.Above(pull) 2481 + var atUris []syntax.ATURI 2323 2482 for _, p := range pullsToClose { 2324 - // Close the pull in the database 2325 - err = db.ClosePull(tx, f.RepoAt(), p.PullId) 2326 - if err != nil { 2327 - s.logger.Error("failed to close pull", "err", err) 2328 - s.pages.Notice(w, "pull-close", "Failed to close pull.") 2329 - return 2330 - } 2483 + atUris = append(atUris, p.AtUri()) 2331 2484 p.State = models.PullClosed 2332 2485 } 2486 + err = db.ClosePulls( 2487 + tx, 2488 + orm.FilterEq("repo_at", f.RepoAt()), 2489 + orm.FilterIn("at_uri", atUris), 2490 + ) 2491 + if err != nil { 2492 + l.Error("failed to close pulls in database", "err", err, "pulls_to_close", len(pullsToClose)) 2493 + s.pages.Notice(w, "pull-close", "Failed to close pull.") 2494 + } 2333 2495 2334 2496 // Commit the transaction 2335 2497 if err = tx.Commit(); err != nil { 2336 - s.logger.Error("failed to commit transaction", "err", err) 2498 + l.Error("failed to commit transaction", "err", err) 2337 2499 s.pages.Notice(w, "pull-close", "Failed to close pull.") 2338 2500 return 2339 2501 } ··· 2347 2509 } 2348 2510 2349 2511 func (s *Pulls) ReopenPull(w http.ResponseWriter, r *http.Request) { 2512 + l := s.logger.With("handler", "ReopenPull") 2513 + 2350 2514 user := s.oauth.GetMultiAccountUser(r) 2515 + if user != nil && user.Active != nil { 2516 + l = l.With("user", user.Active.Did) 2517 + } 2351 2518 2352 2519 f, err := s.repoResolver.Resolve(r) 2353 2520 if err != nil { 2354 - s.logger.Error("failed to resolve repo", "err", err) 2521 + l.Error("failed to resolve repo", "err", err) 2355 2522 s.pages.Notice(w, "pull-reopen", "Failed to reopen pull.") 2356 2523 return 2357 2524 } 2358 2525 2359 2526 pull, ok := r.Context().Value("pull").(*models.Pull) 2360 2527 if !ok { 2361 - s.logger.Error("failed to get pull") 2528 + l.Error("failed to get pull") 2362 2529 s.pages.Notice(w, "pull-error", "Failed to edit patch. Try again later.") 2363 2530 return 2364 2531 } 2532 + l = l.With("pull_id", pull.PullId, "pull_owner", pull.OwnerDid, "state", pull.State) 2365 2533 2366 2534 // auth filter: only owner or collaborators can close 2367 2535 roles := repoinfo.RolesInRepo{Roles: s.enforcer.GetPermissionsInRepo(user.Active.Did, f.Knot, f.RepoIdentifier())} ··· 2370 2538 isPullAuthor := user.Active.Did == pull.OwnerDid 2371 2539 isCloseAllowed := isOwner || isCollaborator || isPullAuthor 2372 2540 if !isCloseAllowed { 2373 - s.logger.Warn("failed to close pull: unauthorized") 2541 + l.Error("unauthorized to reopen pull", "is_owner", isOwner, "is_collaborator", isCollaborator, "is_pull_author", isPullAuthor) 2374 2542 s.pages.Notice(w, "pull-close", "You are unauthorized to close this pull.") 2375 2543 return 2376 2544 } ··· 2378 2546 // Start a transaction 2379 2547 tx, err := s.db.BeginTx(r.Context(), nil) 2380 2548 if err != nil { 2381 - s.logger.Error("failed to start transaction", "err", err) 2549 + l.Error("failed to start transaction", "err", err) 2382 2550 s.pages.Notice(w, "pull-reopen", "Failed to reopen pull.") 2383 2551 return 2384 2552 } 2385 2553 defer tx.Rollback() 2386 2554 2387 - var pullsToReopen []*models.Pull 2388 - pullsToReopen = append(pullsToReopen, pull) 2389 - 2390 2555 // if this PR is stacked, then we want to reopen all PRs above this one on the stack 2391 - if pull.IsStacked() { 2392 - stack := r.Context().Value("stack").(models.Stack) 2393 - subStack := stack.StrictlyAbove(pull) 2394 - pullsToReopen = append(pullsToReopen, subStack...) 2395 - } 2396 - 2556 + stack := r.Context().Value("stack").(models.Stack) 2557 + pullsToReopen := stack.Below(pull) 2558 + var atUris []syntax.ATURI 2397 2559 for _, p := range pullsToReopen { 2398 - // Close the pull in the database 2399 - err = db.ReopenPull(tx, f.RepoAt(), p.PullId) 2400 - if err != nil { 2401 - s.logger.Error("failed to close pull", "err", err) 2402 - s.pages.Notice(w, "pull-close", "Failed to close pull.") 2403 - return 2404 - } 2560 + atUris = append(atUris, p.AtUri()) 2405 2561 p.State = models.PullOpen 2406 2562 } 2563 + err = db.ReopenPulls( 2564 + tx, 2565 + orm.FilterEq("repo_at", f.RepoAt()), 2566 + orm.FilterIn("at_uri", atUris), 2567 + ) 2568 + if err != nil { 2569 + l.Error("failed to reopen pulls in database", "err", err, "pulls_to_reopen", len(pullsToReopen)) 2570 + s.pages.Notice(w, "pull-close", "Failed to reopen pull.") 2571 + } 2407 2572 2408 2573 // Commit the transaction 2409 2574 if err = tx.Commit(); err != nil { 2410 - s.logger.Error("failed to commit transaction", "err", err) 2575 + l.Error("failed to commit transaction", "err", err) 2411 2576 s.pages.Notice(w, "pull-reopen", "Failed to reopen pull.") 2412 2577 return 2413 2578 } ··· 2420 2585 s.pages.HxLocation(w, fmt.Sprintf("/%s/pulls/%d", ownerSlashRepo, pull.PullId)) 2421 2586 } 2422 2587 2423 - func (s *Pulls) newStack(ctx context.Context, repo *models.Repo, user *oauth.MultiAccountUser, targetBranch, patch string, pullSource *models.PullSource, stackId string) (models.Stack, error) { 2424 - formatPatches, err := patchutil.ExtractPatches(patch) 2425 - if err != nil { 2426 - return nil, fmt.Errorf("Failed to extract patches: %v", err) 2427 - } 2428 - 2429 - // must have atleast 1 patch to begin with 2430 - if len(formatPatches) == 0 { 2431 - return nil, fmt.Errorf("No patches found in the generated format-patch.") 2432 - } 2433 - 2434 - // the stack is identified by a UUID 2588 + func (s *Pulls) newStack( 2589 + ctx context.Context, 2590 + repo *models.Repo, 2591 + user *oauth.MultiAccountUser, 2592 + targetBranch string, 2593 + pullSource *models.PullSource, 2594 + formatPatches []types.FormatPatch, 2595 + blobs []*lexutil.LexBlob, 2596 + ) (models.Stack, error) { 2435 2597 var stack models.Stack 2436 - parentChangeId := "" 2437 - for _, fp := range formatPatches { 2598 + var parentAtUri *syntax.ATURI 2599 + for i, fp := range formatPatches { 2438 2600 // all patches must have a jj change-id 2439 - changeId, err := fp.ChangeId() 2601 + _, err := fp.ChangeId() 2440 2602 if err != nil { 2441 2603 return nil, fmt.Errorf("Stacking is only supported if all patches contain a change-id commit header.") 2442 2604 } ··· 2451 2613 Patch: fp.Raw, 2452 2614 SourceRev: fp.SHA, 2453 2615 Combined: fp.Raw, 2616 + Blob: *blobs[i], 2454 2617 } 2455 2618 pull := models.Pull{ 2456 2619 Title: title, ··· 2466 2629 }, 2467 2630 PullSource: pullSource, 2468 2631 Created: time.Now(), 2632 + State: models.PullOpen, 2469 2633 2470 - StackId: stackId, 2471 - ChangeId: changeId, 2472 - ParentChangeId: parentChangeId, 2634 + DependentOn: parentAtUri, 2635 + Repo: repo, 2473 2636 } 2474 2637 2475 2638 stack = append(stack, &pull) 2476 2639 2477 - parentChangeId = changeId 2640 + parent := pull.AtUri() 2641 + parentAtUri = &parent 2478 2642 } 2479 2643 2480 2644 return stack, nil
+157 -44
knotserver/ingester.go
··· 13 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 15 "github.com/bluesky-social/indigo/xrpc" 16 - "github.com/bluesky-social/jetstream/pkg/models" 16 + jmodels "github.com/bluesky-social/jetstream/pkg/models" 17 17 "tangled.org/core/api/tangled" 18 + "tangled.org/core/appview/models" 18 19 "tangled.org/core/knotserver/db" 19 20 "tangled.org/core/knotserver/git" 20 21 "tangled.org/core/log" ··· 22 23 "tangled.org/core/workflow" 23 24 ) 24 25 25 - func (h *Knot) processPublicKey(ctx context.Context, event *models.Event) error { 26 + func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error { 26 27 l := log.FromContext(ctx) 27 28 raw := json.RawMessage(event.Commit.Record) 28 29 did := event.Did ··· 44 45 return nil 45 46 } 46 47 47 - func (h *Knot) processKnotMember(ctx context.Context, event *models.Event) error { 48 + func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error { 48 49 l := log.FromContext(ctx) 49 50 raw := json.RawMessage(event.Commit.Record) 50 51 did := event.Did ··· 84 85 return nil 85 86 } 86 87 87 - func (h *Knot) processPull(ctx context.Context, event *models.Event) error { 88 - raw := json.RawMessage(event.Commit.Record) 89 - did := event.Did 90 - 91 - var record tangled.RepoPull 92 - if err := json.Unmarshal(raw, &record); err != nil { 93 - return fmt.Errorf("failed to unmarshal record: %w", err) 94 - } 88 + // returns a repo path on disk if present, and error if not 89 + type targetRepo struct { 90 + RepoPath string 91 + OwnerDid string 92 + RepoName string 93 + RepoDid string 94 + } 95 95 96 - l := log.FromContext(ctx) 97 - l = l.With("handler", "processPull") 98 - l = l.With("did", did) 99 - 96 + func (h *Knot) validatePullRecord(ctx context.Context, record *tangled.RepoPull) (*targetRepo, error) { 100 97 if record.Target == nil { 101 - return fmt.Errorf("ignoring pull record: target repo is nil") 98 + return nil, fmt.Errorf("ignoring pull record: target repo is nil") 102 99 } 103 100 104 - l = l.With("target_repo", record.Target.Repo, "target_repo_did", record.Target.RepoDid) 105 - l = l.With("target_branch", record.Target.Branch) 106 - 107 101 if record.Source == nil { 108 - return fmt.Errorf("ignoring pull record: not a branch-based pull request") 102 + return nil, fmt.Errorf("ignoring pull record: not a branch-based pull request") 109 103 } 110 104 111 105 if record.Source.Repo != nil || record.Source.RepoDid != nil { 112 - return fmt.Errorf("ignoring pull record: fork based pull") 106 + return nil, fmt.Errorf("ignoring pull record: fork based pull") 113 107 } 114 108 115 109 var repoPath, ownerDid, repoName, repoDid string ··· 119 113 var lookupErr error 120 114 repoPath, ownerDid, repoName, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 121 115 if lookupErr != nil { 122 - return fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr) 116 + return nil, fmt.Errorf("unknown target repo DID %s: %w", repoDid, lookupErr) 123 117 } 124 118 125 119 case record.Target.Repo != nil: 126 120 // TODO: get rid of this PDS fetch once all repos have DIDs 127 121 repoAt, parseErr := syntax.ParseATURI(*record.Target.Repo) 128 122 if parseErr != nil { 129 - return fmt.Errorf("failed to parse ATURI: %w", parseErr) 123 + return nil, fmt.Errorf("failed to parse ATURI: %w", parseErr) 130 124 } 131 125 132 126 ident, resolveErr := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 133 127 if resolveErr != nil || ident.Handle.IsInvalidHandle() { 134 - return fmt.Errorf("failed to resolve handle: %w", resolveErr) 128 + return nil, fmt.Errorf("failed to resolve handle: %w", resolveErr) 135 129 } 136 130 137 131 xrpcc := xrpc.Client{ ··· 140 134 141 135 resp, getErr := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 142 136 if getErr != nil { 143 - return fmt.Errorf("failed to resolve repo: %w", getErr) 137 + return nil, fmt.Errorf("failed to resolve repo: %w", getErr) 144 138 } 145 139 146 140 repo := resp.Value.Val.(*tangled.Repo) 147 141 148 142 if repo.Knot != h.c.Server.Hostname { 149 - return fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 143 + return nil, fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 150 144 } 151 145 152 146 ownerDid = ident.DID.String() ··· 154 148 155 149 repoDid, didErr := h.db.GetRepoDid(ownerDid, repoName) 156 150 if didErr != nil { 157 - return fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr) 151 + return nil, fmt.Errorf("failed to resolve repo DID for %s/%s: %w", ownerDid, repoName, didErr) 158 152 } 159 153 160 154 var lookupErr error 161 155 repoPath, _, _, lookupErr = h.db.ResolveRepoDIDOnDisk(h.c.Repo.ScanPath, repoDid) 162 156 if lookupErr != nil { 163 - return fmt.Errorf("failed to resolve repo on disk: %w", lookupErr) 157 + return nil, fmt.Errorf("failed to resolve repo on disk: %w", lookupErr) 164 158 } 165 159 166 160 default: 167 - return fmt.Errorf("ignoring pull record: target has neither repo nor repoDid") 161 + return nil, fmt.Errorf("ignoring pull record: target has neither repo nor repoDid") 162 + } 163 + 164 + _, err := git.Open(repoPath, record.Source.Branch) 165 + if err != nil { 166 + return nil, fmt.Errorf("failed to open git repository: %w", err) 167 + } 168 + 169 + return &targetRepo{ 170 + RepoPath: repoPath, 171 + OwnerDid: ownerDid, 172 + RepoName: repoName, 173 + RepoDid: repoDid, 174 + }, nil 175 + } 176 + 177 + func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) { 178 + // resolve the PR owner's identity to fetch the blob from their PDS 179 + prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did) 180 + if err != nil || prOwnerIdent.Handle.IsInvalidHandle() { 181 + return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err) 182 + } 183 + 184 + roundNumber := len(record.Rounds) - 1 185 + round := record.Rounds[roundNumber] 186 + 187 + // fetch the blob from the PR owner's PDS 188 + prOwnerPds := prOwnerIdent.PDSEndpoint() 189 + blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds)) 190 + if err != nil { 191 + return nil, fmt.Errorf("failed to construct blob URL: %w", err) 192 + } 193 + q := blobUrl.Query() 194 + q.Set("cid", round.PatchBlob.Ref.String()) 195 + q.Set("did", did) 196 + blobUrl.RawQuery = q.Encode() 197 + 198 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil) 199 + if err != nil { 200 + return nil, fmt.Errorf("failed to create blob request: %w", err) 201 + } 202 + req.Header.Set("Content-Type", "application/json") 203 + 204 + blobResp, err := http.DefaultClient.Do(req) 205 + if err != nil { 206 + return nil, fmt.Errorf("failed to fetch blob: %w", err) 207 + } 208 + defer blobResp.Body.Close() 209 + 210 + blob := io.ReadCloser(blobResp.Body) 211 + latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob) 212 + if err != nil { 213 + return nil, fmt.Errorf("failed to parse submission: %w", err) 168 214 } 169 215 170 - gr, err := git.Open(repoPath, record.Source.Sha) 216 + return latestSubmission, nil 217 + } 218 + 219 + func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) { 220 + gr, err := git.Open(repoPath, sha) 171 221 if err != nil { 172 - return fmt.Errorf("failed to open git repository: %w", err) 222 + return nil, fmt.Errorf("failed to open git repository: %w", err) 173 223 } 174 224 175 225 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 176 226 if err != nil { 177 - return fmt.Errorf("failed to open workflow directory: %w", err) 227 + return nil, fmt.Errorf("failed to open workflow directory: %w", err) 178 228 } 179 229 180 230 var pipeline workflow.RawPipeline ··· 195 245 }) 196 246 } 197 247 248 + return pipeline, nil 249 + } 250 + 251 + func (h *Knot) compilePipeline(ctx context.Context, targetRepo *targetRepo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline { 252 + l := log.FromContext(ctx) 253 + 198 254 trigger := tangled.Pipeline_PullRequestTriggerData{ 199 255 Action: "create", 200 - SourceBranch: record.Source.Branch, 201 - SourceSha: record.Source.Sha, 202 - TargetBranch: record.Target.Branch, 256 + SourceBranch: sourceBranch, 257 + SourceSha: sourceSha, 258 + TargetBranch: targetBranch, 203 259 } 204 260 205 261 compiler := workflow.Compiler{ ··· 207 263 Kind: string(workflow.TriggerKindPullRequest), 208 264 PullRequest: &trigger, 209 265 Repo: &tangled.Pipeline_TriggerRepo{ 210 - Did: ownerDid, 211 266 Knot: h.c.Server.Hostname, 212 - Repo: &repoName, 213 - RepoDid: &repoDid, 267 + RepoDid: &targetRepo.RepoDid, 268 + Did: targetRepo.OwnerDid, 269 + Repo: &targetRepo.RepoName, 214 270 }, 215 271 }, 216 272 } 217 273 218 - cp := compiler.Compile(compiler.Parse(pipeline)) 219 - eventJson, err := json.Marshal(cp) 274 + l.Info("raw", "raw", rawPipeline) 275 + parsed := compiler.Parse(rawPipeline) 276 + l.Info("parsed", "parsed", parsed) 277 + compiled := compiler.Compile(parsed) 278 + 279 + l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics) 280 + 281 + return compiled 282 + } 283 + 284 + func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error { 285 + raw := json.RawMessage(event.Commit.Record) 286 + rkey := event.Commit.RKey 287 + did := event.Did 288 + 289 + var record tangled.RepoPull 290 + if err := json.Unmarshal(raw, &record); err != nil { 291 + return fmt.Errorf("failed to unmarshal record: %w", err) 292 + } 293 + 294 + l := log.FromContext(ctx) 295 + l = l.With("handler", "processPull") 296 + l = l.With("did", did) 297 + 298 + l.Info("validating pull record") 299 + targetRepo, err := h.validatePullRecord(ctx, &record) 220 300 if err != nil { 221 - return fmt.Errorf("failed to marshal pipeline event: %w", err) 301 + l.Warn("pull record did not validate, skipping...") 302 + return err 303 + } 304 + 305 + l = l.With("target_repo", record.Target.Repo) 306 + l = l.With("target_branch", record.Target.Branch) 307 + 308 + l.Info("fetching latest submission") 309 + latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record) 310 + if err != nil { 311 + return err 222 312 } 223 313 314 + sha := latestSubmission.SourceRev 315 + if sha == "" { 316 + return fmt.Errorf("failed to extract source SHA from pull submission") 317 + } 318 + l = l.With("sha", sha) 319 + 320 + l.Info("discovering workflows", "repo_path", targetRepo.RepoPath) 321 + pipeline, err := h.discoverWorkflows(ctx, targetRepo.RepoPath, sha) 322 + if err != nil { 323 + return err 324 + } 325 + 326 + l.Info("compiling pipeline", "workflow_count", len(pipeline)) 327 + cp := h.compilePipeline(ctx, targetRepo, record.Source.Branch, sha, record.Target.Branch, pipeline) 328 + 224 329 // do not run empty pipelines 225 330 if cp.Workflows == nil { 331 + l.Info("skipping empty pipeline") 226 332 return nil 227 333 } 228 334 335 + l.Info("marshaling pipeline event") 336 + eventJson, err := json.Marshal(cp) 337 + if err != nil { 338 + return fmt.Errorf("failed to marshal pipeline event: %w", err) 339 + } 340 + 229 341 ev := db.Event{ 230 342 Rkey: TID(), 231 343 Nsid: tangled.PipelineNSID, 232 344 EventJson: string(eventJson), 233 345 } 234 346 347 + l.Info("inserting pipeline event") 235 348 return h.db.InsertEvent(ev, h.n) 236 349 } 237 350 238 351 // duplicated from add collaborator 239 - func (h *Knot) processCollaborator(ctx context.Context, event *models.Event) error { 352 + func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error { 240 353 raw := json.RawMessage(event.Commit.Record) 241 354 did := event.Did 242 355 ··· 357 470 return nil 358 471 } 359 472 360 - func (h *Knot) processMessages(ctx context.Context, event *models.Event) error { 361 - if event.Kind != models.EventKindCommit { 473 + func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error { 474 + if event.Kind != jmodels.EventKindCommit { 362 475 return nil 363 476 } 364 477
+4 -5
patchutil/patchutil.go
··· 17 17 func ExtractPatches(formatPatch string) ([]types.FormatPatch, error) { 18 18 patches := splitFormatPatch(formatPatch) 19 19 20 - result := []types.FormatPatch{} 21 - 22 - for _, patch := range patches { 20 + result := make([]types.FormatPatch, len(patches)) 21 + for i, patch := range patches { 23 22 files, headerStr, err := gitdiff.Parse(strings.NewReader(patch)) 24 23 if err != nil { 25 24 return nil, fmt.Errorf("failed to parse patch: %w", err) ··· 30 29 return nil, fmt.Errorf("failed to parse patch header: %w", err) 31 30 } 32 31 33 - result = append(result, types.FormatPatch{ 32 + result[i] = types.FormatPatch{ 34 33 Files: files, 35 34 PatchHeader: header, 36 35 Raw: patch, 37 - }) 36 + } 38 37 } 39 38 40 39 return result, nil