···11+# Tack
22+33+- Use `go doc` to find documentation for Go packages. For example:
44+ `go doc github.com/mitchellh/go-libghostty`. Full syntax is
55+ `go doc [<pkg>.][<sym>.]<methodOrField>` for full help output.
66+77+## Style Guide
88+99+- Comment heavily, but not redundantly. Explain the "why" behind decisions
1010+ clearly, don't repeat the "what."
1111+- Try to limit line length below 100 characters, but don't be afraid to break
1212+ this rule if it improves readability.
1313+ - Do not make lines too short to follow this rule either. Try
1414+ to use as much of the max characters as possible without sacrificing
1515+ readability.
1616+1717+## Go Guide
1818+1919+- Add compile-time interface checks whenever a type implements an interface
2020+ we care about. For example: `var _ io.Reader = (*MyType)(nil)`.
+53
README.md
···11+# tack - Connect Tangled to your CI
22+33+Tack is a custom [Tangled](https://tangled.org) spindle that runs
44+CI on alternate providers and reports their results back
55+to Tangled using standard ATProto records so they show up natively
66+in Tangled's UI.
77+88+## What it does
99+1010+Tack is a drop-in alternative to the stock `spindle` runner. You run
1111+`tack` and [register it using the standard UI](https://tangled.org/settings/spindles).
1212+1313+Instead of executing workflows in local containers, tack translates each
1414+Tangled pipeline trigger into a 3rd party CI build, and reports build state
1515+back to Tangled using the existing `sh.tangled.pipeline.status` wire format.
1616+1717+This makes even 3rd party CIs integrate first class into Tangled so their
1818+status, counts, etc. can show up inline in things like pull requests.
1919+2020+```
2121+ sh.tangled.pipeline
2222+ Jetstream ───────────────────────▶ tack
2323+ │
2424+ │ Create Build
2525+ ▼
2626+ Buildkite
2727+ │
2828+ │ webhooks
2929+ ▼
3030+ tack ──── /events (WebSocket) ────▶ Tangled appview
3131+ sh.tangled.pipeline.status
3232+```
3333+3434+```sh
3535+go run . -addr :8080
3636+```
3737+3838+## Endpoints (planned)
3939+4040+- `GET /events` — WebSocket stream of pipeline status events,
4141+ consumed by the Tangled appview.
4242+- `POST /webhooks/buildkite` — Buildkite webhook receiver.
4343+- `POST /xrpc/sh.tangled.pipeline.cancelPipeline` — cancel a running build.
4444+4545+## Configuration (planned)
4646+4747+| Env var | Description |
4848+| ---------------------- | ------------------------------------ |
4949+| `TACK_BUILDKITE_TOKEN` | Buildkite API token |
5050+| `TACK_BUILDKITE_ORG` | Buildkite organization slug |
5151+| `TACK_JETSTREAM_URL` | Tangled Jetstream WebSocket URL |
5252+| `TACK_DB_PATH` | Local SQLite path for the event log |
5353+| `TACK_OWNER_DID` | DID of the spindle operator |
···11+package main
22+33+// HTTP surface of the spindle.
44+//
55+// Three roles to keep in mind:
66+//
77+// 1. Verification: the Tangled appview hits /xrpc/sh.tangled.owner during
88+// spindle registration to confirm the operator owns this instance.
99+// 2. Event stream: the appview holds a long-lived websocket against
1010+// /events to receive sh.tangled.pipeline.status frames as builds
1111+// progress. Today this is just a keep-alive; payloads land once the
1212+// Buildkite webhook receiver is wired up.
1313+// 3. Webhooks: Buildkite POSTs build/job state changes to
1414+// /webhooks/buildkite, which we'll translate into pipeline.status
1515+// events on (2).
1616+1717+import (
1818+ "context"
1919+ "encoding/json"
2020+ "errors"
2121+ "fmt"
2222+ "log/slog"
2323+ "net/http"
2424+ "time"
2525+2626+ "github.com/gorilla/websocket"
2727+ "tangled.org/core/api/tangled"
2828+)
2929+3030+// runHTTP starts the spindle's HTTP server and blocks until ctx is
3131+// cancelled or the listener returns a fatal error. On ctx cancellation it
3232+// performs a graceful shutdown with a bounded timeout.
3333+//
3434+// The logger is read from ctx via loggerFrom.
3535+func runHTTP(ctx context.Context, cfg config) error {
3636+ logger := loggerFrom(ctx)
3737+3838+ mux := http.NewServeMux()
3939+ mux.HandleFunc("GET /", rootHandler())
4040+ mux.HandleFunc("GET /events", eventsHandler(logger))
4141+ mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID))
4242+ mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler())
4343+4444+ srv := &http.Server{
4545+ Addr: cfg.Addr,
4646+ Handler: mux,
4747+ ReadHeaderTimeout: 5 * time.Second,
4848+ }
4949+5050+ // Run ListenAndServe on a goroutine so we can race it against ctx.Done.
5151+ errCh := make(chan error, 1)
5252+ go func() {
5353+ logger.Info("listening", "addr", cfg.Addr, "owner", cfg.OwnerDID)
5454+ errCh <- srv.ListenAndServe()
5555+ }()
5656+5757+ select {
5858+ case <-ctx.Done():
5959+ logger.Info("shutting down")
6060+ case err := <-errCh:
6161+ // ErrServerClosed means we shut ourselves down cleanly elsewhere
6262+ // — anything else is a real failure to report.
6363+ if err != nil && !errors.Is(err, http.ErrServerClosed) {
6464+ return fmt.Errorf("http server: %w", err)
6565+ }
6666+ }
6767+6868+ shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
6969+ defer cancel()
7070+ return srv.Shutdown(shutdownCtx)
7171+}
7272+7373+// rootHandler responds at "/" with a small identifier. Mainly useful as a
7474+// liveness check during deployment.
7575+func rootHandler() http.HandlerFunc {
7676+ return func(w http.ResponseWriter, r *http.Request) {
7777+ fmt.Fprintln(w, "tack: a Tangled spindle backed by Buildkite")
7878+ }
7979+}
8080+8181+// ownerHandler implements sh.tangled.owner so the Tangled appview can verify
8282+// this spindle's owner during registration.
8383+func ownerHandler(logger *slog.Logger, owner string) http.HandlerFunc {
8484+ return func(w http.ResponseWriter, r *http.Request) {
8585+ w.Header().Set("Content-Type", "application/json")
8686+ if err := json.NewEncoder(w).Encode(tangled.Owner_Output{Owner: owner}); err != nil {
8787+ logger.Error("encode owner response", "err", err)
8888+ }
8989+ }
9090+}
9191+9292+// buildkiteWebhookHandler is a placeholder until we implement Buildkite ->
9393+// pipeline.status translation.
9494+func buildkiteWebhookHandler() http.HandlerFunc {
9595+ return func(w http.ResponseWriter, r *http.Request) {
9696+ http.Error(w, "not implemented", http.StatusNotImplemented)
9797+ }
9898+}
9999+100100+// eventsHandler upgrades to a WebSocket and emits no events yet. It exists
101101+// so the Tangled appview can connect; once we wire up Buildkite webhooks
102102+// this is where sh.tangled.pipeline.status frames will be sent.
103103+//
104104+// We send a periodic ping to keep intermediaries (load balancers, tunnels)
105105+// from idling the connection, and watch for client reads to detect a
106106+// disconnect.
107107+func eventsHandler(logger *slog.Logger) http.HandlerFunc {
108108+ upgrader := websocket.Upgrader{
109109+ ReadBufferSize: 1024,
110110+ WriteBufferSize: 1024,
111111+ }
112112+ return func(w http.ResponseWriter, r *http.Request) {
113113+ conn, err := upgrader.Upgrade(w, r, nil)
114114+ if err != nil {
115115+ logger.Error("websocket upgrade failed", "err", err)
116116+ return
117117+ }
118118+ defer conn.Close()
119119+ logger.Debug("events client connected", "remote", r.RemoteAddr)
120120+121121+ ctx, cancel := context.WithCancel(r.Context())
122122+ defer cancel()
123123+124124+ // Detect client disconnect by trying to read; we don't expect any
125125+ // payloads from the client, so any read result (including EOF)
126126+ // signals the connection has gone away.
127127+ go func() {
128128+ for {
129129+ if _, _, err := conn.NextReader(); err != nil {
130130+ cancel()
131131+ return
132132+ }
133133+ }
134134+ }()
135135+136136+ ticker := time.NewTicker(30 * time.Second)
137137+ defer ticker.Stop()
138138+ for {
139139+ select {
140140+ case <-ctx.Done():
141141+ logger.Debug("events client disconnected", "remote", r.RemoteAddr)
142142+ return
143143+ case <-ticker.C:
144144+ if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(time.Second)); err != nil {
145145+ logger.Debug("events ping failed", "err", err)
146146+ return
147147+ }
148148+ }
149149+ }
150150+ }
151151+}
+119
jetstream.go
···11+package main
22+33+// This file wires tack into the AT Protocol firehose via Bluesky's
44+// "jetstream" — a JSON projection of the firehose served over a websocket
55+// (see https://github.com/bluesky-social/jetstream). Tangled rides on top of
66+// AT Proto: things like "this user is a spindle member", "this repo wants
77+// this spindle", etc. are all atproto records published to users' PDSes,
88+// and jetstream is how a service like a spindle observes them in real time.
99+//
1010+// As a spindle, the records we care about are:
1111+//
1212+// - sh.tangled.spindle.member — owner authorizes a DID to use us
1313+// - sh.tangled.repo — a repo declares us as its spindle
1414+// - sh.tangled.repo.collaborator — collaborators on those repos
1515+//
1616+// (Pipeline trigger records, sh.tangled.pipeline, do *not* come over
1717+// jetstream; they're delivered by the knot servers via a separate event
1818+// stream. That is plumbed in separately.)
1919+2020+import (
2121+ "context"
2222+ "fmt"
2323+ "time"
2424+2525+ "github.com/bluesky-social/jetstream/pkg/client"
2626+ "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
2727+ jsmodels "github.com/bluesky-social/jetstream/pkg/models"
2828+ "tangled.org/core/api/tangled"
2929+)
3030+3131+// startJetstream dials the configured jetstream endpoint and spawns a
3232+// background goroutine that consumes events for the lifetime of ctx. It
3333+// returns once the client is constructed; connection errors surface in
3434+// logs, not return values, because the read loop is expected to reconnect
3535+// on its own.
3636+//
3737+// The logger is pulled from ctx (see log.go); falls back to slog.Default()
3838+// if none is attached.
3939+func startJetstream(ctx context.Context, cfg config) error {
4040+ logger := loggerFrom(ctx).With("component", "jetstream")
4141+4242+ // `wantedCollections` is a server-side filter: jetstream will only send
4343+ // us commit events whose record collection (NSID) is in this list. The
4444+ // NSIDs come from tangled-core's generated lexicon types so they stay
4545+ // in sync with whatever the appview/knots are publishing.
4646+ collections := []string{
4747+ tangled.SpindleMemberNSID,
4848+ tangled.RepoNSID,
4949+ tangled.RepoCollaboratorNSID,
5050+ }
5151+5252+ // Configure our JetStream client.
5353+ clientCfg := client.DefaultClientConfig()
5454+ clientCfg.WebsocketURL = cfg.JetstreamURL
5555+ clientCfg.WantedCollections = collections
5656+5757+ // Re-attach the component-scoped logger so handleJetstreamEvent — which
5858+ // the scheduler invokes with the ctx we pass to ConnectAndRead — can
5959+ // pull it back out via loggerFrom.
6060+ ctx = loggerInto(ctx, logger)
6161+6262+ // The sequential scheduler processes events one-at-a-time in arrival
6363+ // order. That's the right default for a spindle: ordering matters
6464+ // (e.g. a member-added event must apply before any record from that
6565+ // member is processed), and our event volume is tiny.
6666+ c, err := client.NewClient(
6767+ clientCfg,
6868+ logger,
6969+ sequential.NewScheduler(
7070+ "tack",
7171+ logger,
7272+ handleJetstreamEvent),
7373+ )
7474+ if err != nil {
7575+ return fmt.Errorf("new jetstream client: %w", err)
7676+ }
7777+7878+ // Reconnect loop. ConnectAndRead blocks on the websocket and returns
7979+ // either when the connection drops (transient network error, server
8080+ // restart, etc.) or when ctx is cancelled. On error we sleep briefly
8181+ // and reconnect; on ctx cancellation we exit cleanly.
8282+ //
8383+ // TODO: pass a *cursor here once we persist one, so we resume from the
8484+ // last seen event instead of "now" after a restart.
8585+ go func() {
8686+ for {
8787+ if err := c.ConnectAndRead(ctx, nil); err != nil {
8888+ if ctx.Err() != nil {
8989+ return
9090+ }
9191+ logger.Error("jetstream read loop", "err", err)
9292+ time.Sleep(2 * time.Second)
9393+ continue
9494+ }
9595+ if ctx.Err() != nil {
9696+ return
9797+ }
9898+ }
9999+ }()
100100+101101+ return nil
102102+}
103103+104104+// handleJetstreamEvent is the per-event callback for the JetStream.
105105+func handleJetstreamEvent(ctx context.Context, evt *jsmodels.Event) error {
106106+ // We only care about commits, which are the actual record CRUD operations
107107+ // on a user's PDS.
108108+ if evt.Kind != jsmodels.EventKindCommit || evt.Commit == nil {
109109+ return nil
110110+ }
111111+112112+ loggerFrom(ctx).Debug("event",
113113+ "did", evt.Did,
114114+ "collection", evt.Commit.Collection,
115115+ "op", evt.Commit.Operation,
116116+ "rkey", evt.Commit.RKey,
117117+ )
118118+ return nil
119119+}
+32
log.go
···11+package main
22+33+// Tiny helpers for stashing a *slog.Logger on a context.Context. The stdlib
44+// doesn't define a context key for slog, so by convention every app rolls
55+// its own — the unexported key type ensures we can't collide with anyone
66+// else's value, and FromContext falls back to slog.Default() so callers
77+// don't need to special-case a missing logger.
88+99+import (
1010+ "context"
1111+ "log/slog"
1212+)
1313+1414+// loggerCtxKey is unexported so only this package can write to the slot.
1515+type loggerCtxKey struct{}
1616+1717+// loggerInto returns a copy of ctx that carries logger.
1818+func loggerInto(ctx context.Context, logger *slog.Logger) context.Context {
1919+ return context.WithValue(ctx, loggerCtxKey{}, logger)
2020+}
2121+2222+// loggerFrom retrieves the logger attached with loggerInto, or slog.Default
2323+// if none has been set.
2424+func loggerFrom(ctx context.Context) *slog.Logger {
2525+ if ctx == nil {
2626+ return slog.Default()
2727+ }
2828+ if l, ok := ctx.Value(loggerCtxKey{}).(*slog.Logger); ok {
2929+ return l
3030+ }
3131+ return slog.Default()
3232+}
+90
main.go
···11+// Tack is a custom Tangled spindle that translates sh.tangled.pipeline
22+// trigger records into Buildkite builds, and publishes Buildkite job state
33+// back as sh.tangled.pipeline.status events on a WebSocket stream that the
44+// Tangled appview can consume.
55+package main
66+77+import (
88+ "context"
99+ "errors"
1010+ "flag"
1111+ "log/slog"
1212+ "os"
1313+ "os/signal"
1414+ "syscall"
1515+1616+ charmlog "github.com/charmbracelet/log"
1717+)
1818+1919+// config is the runtime configuration, sourced from environment variables and
2020+// flags. Env vars match the README so this can be swapped in for the stock
2121+// spindle without surprises.
2222+type config struct {
2323+ Addr string
2424+ OwnerDID string
2525+ JetstreamURL string
2626+}
2727+2828+func loadConfig() (config, error) {
2929+ cfg := config{
3030+ Addr: envOr("TACK_LISTEN_ADDR", ":8080"),
3131+ OwnerDID: os.Getenv("TACK_OWNER_DID"),
3232+ JetstreamURL: envOr("TACK_JETSTREAM_URL", "wss://jetstream1.us-west.bsky.network/subscribe"),
3333+ }
3434+ addrFlag := flag.String("addr", cfg.Addr, "HTTP listen address (overrides TACK_LISTEN_ADDR)")
3535+ flag.Parse()
3636+ cfg.Addr = *addrFlag
3737+3838+ if cfg.OwnerDID == "" {
3939+ return cfg, errors.New("TACK_OWNER_DID is required")
4040+ }
4141+ return cfg, nil
4242+}
4343+4444+func envOr(key, def string) string {
4545+ if v := os.Getenv(key); v != "" {
4646+ return v
4747+ }
4848+ return def
4949+}
5050+5151+func main() {
5252+ // Logging setup. charmbracelet/log implements slog.Handler, so we wrap
5353+ // it in slog.New to share the same backend with libraries that expect
5454+ // a *slog.Logger (notably the jetstream client).
5555+ charmHandler := charmlog.NewWithOptions(os.Stderr, charmlog.Options{
5656+ Level: charmlog.DebugLevel,
5757+ ReportTimestamp: true,
5858+ })
5959+ logger := slog.New(charmHandler)
6060+ slog.SetDefault(logger)
6161+6262+ // Config loading
6363+ cfg, err := loadConfig()
6464+ if err != nil {
6565+ logger.Error("invalid configuration", "err", err)
6666+ os.Exit(2)
6767+ }
6868+6969+ // Root context: cancelled on SIGINT/SIGTERM, with the logger attached
7070+ // so any function we hand it to can pull it back out via loggerFrom.
7171+ ctx, stop := signal.NotifyContext(
7272+ context.Background(),
7373+ os.Interrupt, syscall.SIGTERM,
7474+ )
7575+ defer stop()
7676+ ctx = loggerInto(ctx, logger)
7777+7878+ // Start the JetStream listener in the background.
7979+ if err := startJetstream(ctx, cfg); err != nil {
8080+ logger.Error("failed to start jetstream consumer", "err", err)
8181+ os.Exit(1)
8282+ }
8383+8484+ // Run the HTTP server. This blocks until ctx is cancelled or the
8585+ // listener errors.
8686+ if err := runHTTP(ctx, cfg); err != nil {
8787+ logger.Error("http server error", "err", err)
8888+ os.Exit(1)
8989+ }
9090+}