forked from
tangled.org/core
Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cloudflare"
17 "tangled.org/core/appview/config"
18 "tangled.org/core/appview/db"
19 "tangled.org/core/appview/indexer"
20 "tangled.org/core/appview/mentions"
21 "tangled.org/core/appview/models"
22 "tangled.org/core/appview/notify"
23 dbnotify "tangled.org/core/appview/notify/db"
24 lognotify "tangled.org/core/appview/notify/logging"
25 phnotify "tangled.org/core/appview/notify/posthog"
26 whnotify "tangled.org/core/appview/notify/webhook"
27 "tangled.org/core/appview/oauth"
28 "tangled.org/core/appview/pages"
29 "tangled.org/core/appview/reporesolver"
30 "tangled.org/core/appview/validator"
31 xrpcclient "tangled.org/core/appview/xrpcclient"
32 "tangled.org/core/consts"
33 "tangled.org/core/eventconsumer"
34 "tangled.org/core/idresolver"
35 "tangled.org/core/jetstream"
36 "tangled.org/core/log"
37 tlog "tangled.org/core/log"
38 "tangled.org/core/orm"
39 "tangled.org/core/rbac"
40 "tangled.org/core/tid"
41
42 comatproto "github.com/bluesky-social/indigo/api/atproto"
43 "github.com/bluesky-social/indigo/atproto/atclient"
44 "github.com/bluesky-social/indigo/atproto/syntax"
45 lexutil "github.com/bluesky-social/indigo/lex/util"
46 "github.com/bluesky-social/indigo/xrpc"
47
48 "github.com/go-chi/chi/v5"
49 "github.com/posthog/posthog-go"
50)
51
52type State struct {
53 db *db.DB
54 notifier notify.Notifier
55 indexer *indexer.Indexer
56 oauth *oauth.OAuth
57 enforcer *rbac.Enforcer
58 pages *pages.Pages
59 idResolver *idresolver.Resolver
60 mentionsResolver *mentions.Resolver
61 posthog posthog.Client
62 jc *jetstream.JetstreamClient
63 config *config.Config
64 repoResolver *reporesolver.RepoResolver
65 knotstream *eventconsumer.Consumer
66 spindlestream *eventconsumer.Consumer
67 logger *slog.Logger
68 validator *validator.Validator
69 cfClient *cloudflare.Client
70}
71
72func Make(ctx context.Context, config *config.Config) (*State, error) {
73 logger := tlog.FromContext(ctx)
74
75 d, err := db.Make(ctx, config.Core.DbPath)
76 if err != nil {
77 return nil, fmt.Errorf("failed to create db: %w", err)
78 }
79
80 indexer := indexer.New(log.SubLogger(logger, "indexer"), d)
81 err = indexer.Init(ctx)
82 if err != nil {
83 return nil, fmt.Errorf("failed to create indexer: %w", err)
84 }
85
86 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
87 if err != nil {
88 return nil, fmt.Errorf("failed to create enforcer: %w", err)
89 }
90
91 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
92 if err != nil {
93 logger.Error("failed to create redis resolver", "err", err)
94 res = idresolver.DefaultResolver(config.Plc.PLCURL)
95 }
96
97 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
98 if err != nil {
99 return nil, fmt.Errorf("failed to create posthog client: %w", err)
100 }
101
102 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
103 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
104 if err != nil {
105 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
106 }
107 validator := validator.New(d, res, enforcer)
108
109 repoResolver := reporesolver.New(config, enforcer, d)
110
111 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
112
113 wrapper := db.DbWrapper{Execer: d}
114 jc, err := jetstream.NewJetstreamClient(
115 config.Jetstream.Endpoint,
116 "appview",
117 []string{
118 tangled.GraphFollowNSID,
119 tangled.FeedStarNSID,
120 tangled.PublicKeyNSID,
121 tangled.RepoArtifactNSID,
122 tangled.ActorProfileNSID,
123 tangled.KnotMemberNSID,
124 tangled.SpindleMemberNSID,
125 tangled.SpindleNSID,
126 tangled.KnotNSID,
127 tangled.StringNSID,
128 tangled.RepoIssueNSID,
129 tangled.RepoIssueCommentNSID,
130 tangled.LabelDefinitionNSID,
131 tangled.LabelOpNSID,
132 },
133 nil,
134 tlog.SubLogger(logger, "jetstream"),
135 wrapper,
136 false,
137
138 // in-memory filter is inapplicable to appview so
139 // we'll never log dids anyway.
140 false,
141 )
142 if err != nil {
143 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
144 }
145
146 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
147 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
148 }
149
150 ingester := appview.Ingester{
151 Db: wrapper,
152 Enforcer: enforcer,
153 IdResolver: res,
154 Config: config,
155 Logger: log.SubLogger(logger, "ingester"),
156 Validator: validator,
157 }
158 err = jc.StartJetstream(ctx, ingester.Ingest())
159 if err != nil {
160 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
161 }
162
163 var notifiers []notify.Notifier
164
165 // Always add the database notifier
166 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
167
168 // Add other notifiers in production only
169 if !config.Core.Dev {
170 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
171 }
172 notifiers = append(notifiers, indexer)
173
174 notifiers = append(notifiers, whnotify.NewNotifier(d))
175
176 notifier := notify.NewMergedNotifier(notifiers)
177 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
178
179 var cfClient *cloudflare.Client
180 if config.Cloudflare.ApiToken != "" {
181 cfClient, err = cloudflare.New(config)
182 if err != nil {
183 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
184 cfClient = nil
185 }
186 }
187
188 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient)
189 if err != nil {
190 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
191 }
192 knotstream.Start(ctx)
193
194 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
195 if err != nil {
196 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
197 }
198 spindlestream.Start(ctx)
199
200 state := &State{
201 db: d,
202 notifier: notifier,
203 indexer: indexer,
204 oauth: oauth,
205 enforcer: enforcer,
206 pages: pages,
207 idResolver: res,
208 mentionsResolver: mentionsResolver,
209 posthog: posthog,
210 jc: jc,
211 config: config,
212 repoResolver: repoResolver,
213 knotstream: knotstream,
214 spindlestream: spindlestream,
215 logger: logger,
216 validator: validator,
217 cfClient: cfClient,
218 }
219
220 // fetch initial bluesky posts if configured
221 go fetchBskyPosts(ctx, res, config, d, logger)
222
223 return state, nil
224}
225
226func (s *State) Close() error {
227 // other close up logic goes here
228 return s.db.Close()
229}
230
231func (s *State) SecurityTxt(w http.ResponseWriter, r *http.Request) {
232 w.Header().Set("Content-Type", "text/plain")
233 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
234
235 securityTxt := `Contact: mailto:security@tangled.org
236Preferred-Languages: en
237Canonical: https://tangled.org/.well-known/security.txt
238Expires: 2030-01-01T21:59:00.000Z
239`
240 w.Write([]byte(securityTxt))
241}
242
243func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
244 w.Header().Set("Content-Type", "text/plain")
245 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
246
247 robotsTxt := `# Hello, Tanglers!
248User-agent: *
249Allow: /
250Disallow: /*/*/settings
251Disallow: /settings
252Disallow: /*/*/compare
253Disallow: /*/*/fork
254
255Crawl-delay: 1
256`
257 w.Write([]byte(robotsTxt))
258}
259
260func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
261 user := s.oauth.GetMultiAccountUser(r)
262 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
263 LoggedInUser: user,
264 })
265}
266
267func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
268 user := s.oauth.GetMultiAccountUser(r)
269 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
270 LoggedInUser: user,
271 })
272}
273
274func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
275 user := s.oauth.GetMultiAccountUser(r)
276 s.pages.Brand(w, pages.BrandParams{
277 LoggedInUser: user,
278 })
279}
280
281func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
282 user := s.oauth.GetMultiAccountUser(r)
283 if user == nil {
284 return
285 }
286
287 l := s.logger.With("handler", "UpgradeBanner")
288 l = l.With("did", user.Active.Did)
289
290 regs, err := db.GetRegistrations(
291 s.db,
292 orm.FilterEq("did", user.Active.Did),
293 orm.FilterEq("needs_upgrade", 1),
294 )
295 if err != nil {
296 l.Error("non-fatal: failed to get registrations", "err", err)
297 }
298
299 spindles, err := db.GetSpindles(
300 r.Context(),
301 s.db,
302 orm.FilterEq("owner", user.Active.Did),
303 orm.FilterEq("needs_upgrade", 1),
304 )
305 if err != nil {
306 l.Error("non-fatal: failed to get spindles", "err", err)
307 }
308
309 if regs == nil && spindles == nil {
310 return
311 }
312
313 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
314 Registrations: regs,
315 Spindles: spindles,
316 })
317}
318
319func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
320 user := chi.URLParam(r, "user")
321 user = strings.TrimPrefix(user, "@")
322
323 if user == "" {
324 w.WriteHeader(http.StatusBadRequest)
325 return
326 }
327
328 id, err := s.idResolver.ResolveIdent(r.Context(), user)
329 if err != nil {
330 w.WriteHeader(http.StatusInternalServerError)
331 return
332 }
333
334 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
335 if err != nil {
336 s.logger.Error("failed to get public keys", "err", err)
337 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
338 return
339 }
340
341 if len(pubKeys) == 0 {
342 w.WriteHeader(http.StatusNoContent)
343 return
344 }
345
346 for _, k := range pubKeys {
347 key := strings.TrimRight(k.Key, "\n")
348 fmt.Fprintln(w, key)
349 }
350}
351
352func validateRepoName(name string) error {
353 // check for path traversal attempts
354 if name == "." || name == ".." ||
355 strings.Contains(name, "/") || strings.Contains(name, "\\") {
356 return fmt.Errorf("Repository name contains invalid path characters")
357 }
358
359 // check for sequences that could be used for traversal when normalized
360 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
361 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
362 return fmt.Errorf("Repository name contains invalid path sequence")
363 }
364
365 // then continue with character validation
366 for _, char := range name {
367 if !((char >= 'a' && char <= 'z') ||
368 (char >= 'A' && char <= 'Z') ||
369 (char >= '0' && char <= '9') ||
370 char == '-' || char == '_' || char == '.') {
371 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
372 }
373 }
374
375 // additional check to prevent multiple sequential dots
376 if strings.Contains(name, "..") {
377 return fmt.Errorf("Repository name cannot contain sequential dots")
378 }
379
380 // if all checks pass
381 return nil
382}
383
384func stripGitExt(name string) string {
385 return strings.TrimSuffix(name, ".git")
386}
387
388func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
389 switch r.Method {
390 case http.MethodGet:
391 user := s.oauth.GetMultiAccountUser(r)
392 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
393 if err != nil {
394 s.pages.Notice(w, "repo", "Invalid user account.")
395 return
396 }
397
398 s.pages.NewRepo(w, pages.NewRepoParams{
399 LoggedInUser: user,
400 Knots: knots,
401 })
402
403 case http.MethodPost:
404 l := s.logger.With("handler", "NewRepo")
405
406 user := s.oauth.GetMultiAccountUser(r)
407 l = l.With("did", user.Active.Did)
408
409 // form validation
410 domain := r.FormValue("domain")
411 if domain == "" {
412 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
413 return
414 }
415 l = l.With("knot", domain)
416
417 repoName := r.FormValue("name")
418 if repoName == "" {
419 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
420 return
421 }
422
423 if err := validateRepoName(repoName); err != nil {
424 s.pages.Notice(w, "repo", err.Error())
425 return
426 }
427 repoName = stripGitExt(repoName)
428 l = l.With("repoName", repoName)
429
430 defaultBranch := r.FormValue("branch")
431 if defaultBranch == "" {
432 defaultBranch = "main"
433 }
434 l = l.With("defaultBranch", defaultBranch)
435
436 description := r.FormValue("description")
437 if len([]rune(description)) > 140 {
438 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
439 return
440 }
441
442 // ACL validation
443 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
444 if err != nil || !ok {
445 l.Info("unauthorized")
446 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
447 return
448 }
449
450 // Check for existing repos
451 existingRepo, err := db.GetRepo(
452 s.db,
453 orm.FilterEq("did", user.Active.Did),
454 orm.FilterEq("name", repoName),
455 )
456 if err == nil && existingRepo != nil {
457 l.Info("repo exists")
458 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
459 return
460 }
461
462 rkey := tid.TID()
463
464 client, err := s.oauth.ServiceClient(
465 r,
466 oauth.WithService(domain),
467 oauth.WithLxm(tangled.RepoCreateNSID),
468 oauth.WithDev(s.config.Core.Dev),
469 )
470 if err != nil {
471 l.Error("service auth failed", "err", err)
472 s.pages.Notice(w, "repo", "Failed to reach knot server.")
473 return
474 }
475
476 input := &tangled.RepoCreate_Input{
477 Rkey: rkey,
478 Name: repoName,
479 DefaultBranch: &defaultBranch,
480 }
481 createResp, xe := tangled.RepoCreate(
482 r.Context(),
483 client,
484 input,
485 )
486 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
487 l.Error("xrpc error", "xe", xe)
488 s.pages.Notice(w, "repo", err.Error())
489 return
490 }
491
492 var repoDid string
493 if createResp != nil && createResp.RepoDid != nil {
494 repoDid = *createResp.RepoDid
495 }
496 if repoDid == "" {
497 l.Error("knot returned empty repo DID")
498 s.pages.Notice(w, "repo", "Knot failed to mint a repo DID. The knot may need to be upgraded.")
499 return
500 }
501
502 repo := &models.Repo{
503 Did: user.Active.Did,
504 Name: repoName,
505 Knot: domain,
506 Rkey: rkey,
507 Description: description,
508 Created: time.Now(),
509 Labels: s.config.Label.DefaultLabelDefs,
510 RepoDid: repoDid,
511 }
512 record := repo.AsRecord()
513
514 cleanupKnot := func() {
515 go func() {
516 delays := []time.Duration{0, 2 * time.Second, 5 * time.Second}
517 for attempt, delay := range delays {
518 time.Sleep(delay)
519 deleteClient, dErr := s.oauth.ServiceClient(
520 r,
521 oauth.WithService(domain),
522 oauth.WithLxm(tangled.RepoDeleteNSID),
523 oauth.WithDev(s.config.Core.Dev),
524 )
525 if dErr != nil {
526 l.Error("failed to create delete client for knot cleanup", "attempt", attempt+1, "err", dErr)
527 continue
528 }
529 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
530 if dErr := tangled.RepoDelete(ctx, deleteClient, &tangled.RepoDelete_Input{
531 Did: user.Active.Did,
532 Name: repoName,
533 Rkey: rkey,
534 }); dErr != nil {
535 cancel()
536 l.Error("failed to clean up repo on knot after rollback", "attempt", attempt+1, "err", dErr)
537 continue
538 }
539 cancel()
540 l.Info("successfully cleaned up repo on knot after rollback", "attempt", attempt+1)
541 return
542 }
543 l.Error("exhausted retries for knot cleanup, repo may be orphaned",
544 "did", user.Active.Did, "repo", repoName, "knot", domain)
545 }()
546 }
547
548 atpClient, err := s.oauth.AuthorizedClient(r)
549 if err != nil {
550 l.Info("PDS write failed", "err", err)
551 cleanupKnot()
552 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
553 return
554 }
555
556 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
557 Collection: tangled.RepoNSID,
558 Repo: user.Active.Did,
559 Rkey: rkey,
560 Record: &lexutil.LexiconTypeDecoder{
561 Val: &record,
562 },
563 })
564 if err != nil {
565 l.Info("PDS write failed", "err", err)
566 cleanupKnot()
567 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
568 return
569 }
570
571 aturi := atresp.Uri
572 l = l.With("aturi", aturi)
573 l.Info("wrote to PDS")
574
575 tx, err := s.db.BeginTx(r.Context(), nil)
576 if err != nil {
577 l.Info("txn failed", "err", err)
578 s.pages.Notice(w, "repo", "Failed to save repository information.")
579 return
580 }
581
582 rollback := func() {
583 err1 := tx.Rollback()
584 err2 := s.enforcer.E.LoadPolicy()
585 err3 := rollbackRecord(context.Background(), aturi, atpClient)
586
587 if errors.Is(err1, sql.ErrTxDone) {
588 err1 = nil
589 }
590
591 if errs := errors.Join(err1, err2, err3); errs != nil {
592 l.Error("failed to rollback changes", "errs", errs)
593 }
594
595 if aturi != "" {
596 cleanupKnot()
597 }
598 }
599 defer rollback()
600
601 err = db.AddRepo(tx, repo)
602 if err != nil {
603 l.Error("db write failed", "err", err)
604 s.pages.Notice(w, "repo", "Failed to save repository information.")
605 return
606 }
607
608 rbacPath := repo.RepoIdentifier()
609 err = s.enforcer.AddRepo(user.Active.Did, domain, rbacPath)
610 if err != nil {
611 l.Error("acl setup failed", "err", err)
612 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
613 return
614 }
615
616 err = tx.Commit()
617 if err != nil {
618 l.Error("txn commit failed", "err", err)
619 http.Error(w, err.Error(), http.StatusInternalServerError)
620 return
621 }
622
623 err = s.enforcer.E.SavePolicy()
624 if err != nil {
625 l.Error("acl save failed", "err", err)
626 http.Error(w, err.Error(), http.StatusInternalServerError)
627 return
628 }
629
630 aturi = ""
631
632 s.notifier.NewRepo(r.Context(), repo)
633 if repoDid != "" {
634 s.pages.HxLocation(w, fmt.Sprintf("/%s", repoDid))
635 } else {
636 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
637 }
638 }
639}
640
641// this is used to rollback changes made to the PDS
642//
643// it is a no-op if the provided ATURI is empty
644func rollbackRecord(ctx context.Context, aturi string, client *atclient.APIClient) error {
645 if aturi == "" {
646 return nil
647 }
648
649 parsed := syntax.ATURI(aturi)
650
651 collection := parsed.Collection().String()
652 repo := parsed.Authority().String()
653 rkey := parsed.RecordKey().String()
654
655 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
656 Collection: collection,
657 Repo: repo,
658 Rkey: rkey,
659 })
660 return err
661}
662
663func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
664 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
665 if err != nil {
666 return err
667 }
668 // already present
669 if len(defaultLabels) == len(defaults) {
670 return nil
671 }
672
673 labelDefs, err := models.FetchLabelDefs(r, defaults)
674 if err != nil {
675 return err
676 }
677
678 // Insert each label definition to the database
679 for _, labelDef := range labelDefs {
680 _, err = db.AddLabelDefinition(e, &labelDef)
681 if err != nil {
682 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
683 }
684 }
685
686 return nil
687}
688
689func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
690 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
691 if err != nil {
692 logger.Error("failed to resolve tangled.org DID", "err", err)
693 return
694 }
695
696 pdsEndpoint := resolved.PDSEndpoint()
697 if pdsEndpoint == "" {
698 logger.Error("no PDS endpoint found for tangled.sh DID")
699 return
700 }
701
702 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
703 if err != nil {
704 logger.Error("failed to create appassword session... skipping fetch", "err", err)
705 return
706 }
707
708 client := xrpc.Client{
709 Auth: &xrpc.AuthInfo{
710 AccessJwt: session.AccessJwt,
711 Did: session.Did,
712 },
713 Host: session.PdsEndpoint,
714 }
715
716 l := log.SubLogger(logger, "bluesky")
717
718 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
719 defer ticker.Stop()
720
721 for {
722 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
723 if err != nil {
724 l.Error("failed to fetch bluesky posts", "err", err)
725 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
726 l.Error("failed to insert bluesky posts", "err", err)
727 } else {
728 l.Info("inserted bluesky posts", "count", len(posts))
729 }
730
731 select {
732 case <-ticker.C:
733 case <-ctx.Done():
734 l.Info("stopping bluesky updater")
735 return
736 }
737 }
738}