jj workspaces over the network
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: server lifecycle — tandem up/down/status/logs (slices 10-13)

Implements daemon management without systemd:
- tandem up: start background daemon, return immediately
- tandem down: stop daemon via control socket
- tandem status: health check with --json support
- tandem logs: stream logs from running daemon at any verbosity
- tandem serve: new flags (--log-level, --log-format, --control-socket, --daemon)

Control socket uses newline-delimited JSON over Unix domain socket.
Signal handling: SIGTERM/SIGINT for graceful shutdown, double-signal for immediate exit.

19 new tests across 4 test files (34 total, all passing).
Docs overhauled: README restructured with quickstart first, deployment sections updated.

+2267 -386
+29 -8
AGENTS.md
··· 35 35 Requires a Rust toolchain and Cap'n Proto compiler (`capnp`). 36 36 Or build from source: `cargo build --release`. 37 37 38 - ## Single binary, two modes 38 + ## Single binary, three modes 39 39 40 40 ``` 41 - tandem serve --listen <addr> --repo <path> # server mode 41 + tandem up --repo <path> --listen <addr> # start background daemon 42 + tandem serve --listen <addr> --repo <path> # foreground server (systemd/docker) 42 43 tandem [jj args...] # client mode (stock jj via CliRunner) 43 44 ``` 44 45 46 + Plus lifecycle commands that talk to a running server: 47 + 48 + ``` 49 + tandem down # stop daemon 50 + tandem status [--json] # health check 51 + tandem logs [--level <level>] [--json] # stream logs from daemon 52 + ``` 53 + 45 54 The client mode is `CliRunner::init().add_store_factories(tandem_factories()).run()`. 46 55 All stock jj commands work transparently: `tandem new`, `tandem log`, `tandem diff`, 47 - `tandem cat`, `tandem bookmark create` are all jj commands running through our binary. 56 + `tandem file show`, `tandem bookmark create` are all jj commands running through our binary. 48 57 49 58 Server mode embeds jj-lib and uses the Git backend internally. When a client 50 59 calls `putObject(file, bytes)`, the server stores the object. Objects are real 51 60 jj-compatible blobs — `jj git push` on the server just works. 52 61 62 + `tandem up` is the easy way to start the server — it forks `tandem serve --daemon` 63 + in the background, waits for the control socket to become healthy, prints the PID, 64 + and exits. `tandem serve` is the foreground mode for systemd, Docker, or debugging. 65 + Both create a control socket so `tandem down/status/logs` work against either. 66 + 53 67 ## Source layout 54 68 55 69 ``` 56 70 src/ 57 71 main.rs CLI dispatch (clap) + CliRunner passthrough 58 72 server.rs Server — jj Git backend + Cap'n Proto RPC 73 + control.rs Control socket — daemon management (Unix socket, JSON lines) 59 74 backend.rs TandemBackend (jj-lib Backend trait) 60 75 op_store.rs TandemOpStore (jj-lib OpStore trait) 61 76 op_heads_store.rs TandemOpHeadsStore (jj-lib OpHeadsStore trait) ··· 66 81 tandem.capnp Cap'n Proto schema (Store + HeadWatcher) 67 82 tests/ 68 83 common/mod.rs Test harness (server spawn, HOME isolation) 69 - slice1-7 tests Integration tests asserting on file bytes 84 + slice1-7 tests Core integration tests (file round-trip, visibility, CAS, git) 85 + slice10-13 tests Server lifecycle tests (shutdown, control socket, up/down, logs) 70 86 ``` 71 87 72 88 ## Docs layout ··· 79 95 jj-lib-integration.md Trait signatures and store registration 80 96 rpc-protocol.md Cap'n Proto protocol details 81 97 rpc-error-model.md Error handling conventions 98 + server-lifecycle.md tandem up/down/status/logs design 82 99 core-beliefs.md Design principles 83 100 exec-plans/ 84 - completed/ Completion notes for all 9 slices 101 + completed/ Completion notes for all 13 slices 85 102 tech-debt-tracker.md Known issues (P1/P2/P3) 86 103 product-specs/ 87 104 core-product.md Product intent and scope ··· 120 137 121 138 See `docs/design-docs/workflow.md` for the full picture. Summary: 122 139 123 - 1. **Orchestrator** sets up server on a VM/VPS: `tandem serve --listen 0.0.0.0:13013 --repo /srv/project` 140 + 1. **Orchestrator** sets up server on a VM/VPS: `tandem up --repo /srv/project --listen 0.0.0.0:13013` 124 141 2. **Agents** init workspaces: `tandem init --tandem-server=host:13013 ~/work/project` 125 142 3. **Agents** use stock jj commands: write files, `tandem new -m "feat: add auth"`, etc. 126 - 4. **Agents** see each other's files: `tandem cat -r <other-commit> src/auth.rs` 143 + 4. **Agents** see each other's files: `tandem file show -r <other-commit> src/auth.rs` 127 144 5. **Orchestrator** ships from server: `jj bookmark create main -r <tip>`, `jj git push` 128 145 129 146 Git operations are server-only. Agents never touch git directly. 130 147 131 148 ## What exists 132 149 133 - All core functionality is implemented across 9 slices: 150 + All core functionality is implemented across 13 slices: 134 151 135 152 | Capability | Test coverage | 136 153 |------------|--------------| ··· 143 160 | End-to-end multi-agent + git | `tests/slice7_end_to_end.rs` | 144 161 | Bookmark management via RPC | Slice 8 (see `docs/exec-plans/completed/`) | 145 162 | CLI help and discoverability | Slice 9 (see `docs/exec-plans/completed/`) | 163 + | Signal handling + graceful shutdown | `tests/slice10_graceful_shutdown.rs` | 164 + | Control socket + tandem status | `tests/slice11_control_socket.rs` | 165 + | tandem up + tandem down | `tests/slice12_up_down.rs` | 166 + | tandem logs (streaming) | `tests/slice13_log_streaming.rs` | 146 167 147 168 See `docs/exec-plans/completed/` for detailed completion notes on each slice. 148 169 See `docs/exec-plans/tech-debt-tracker.md` for known issues and next work.
+18 -5
ARCHITECTURE.md
··· 4 4 5 5 ## Implementation Status 6 6 7 - **Complete as of 2026-02-15.** All slices 1-9 implemented and tested. 7 + **Complete as of 2026-02-19.** All slices 1-13 implemented and tested (34 tests). 8 8 See `docs/exec-plans/completed/` for details. 9 9 10 10 ## Shape 11 11 12 - Single binary, two modes: 12 + Single binary, multiple modes: 13 13 14 - - `tandem serve --listen <addr> --repo <path>` — server mode 14 + - `tandem up --repo <path> --listen <addr>` — start background daemon 15 + - `tandem down` / `tandem status` / `tandem logs` — manage the daemon 16 + - `tandem serve --listen <addr> --repo <path>` — foreground server (systemd/docker) 15 17 - `tandem <jj-command>` — client mode (stock jj via CliRunner) 18 + 19 + `tandem up` is the easy way. It forks `tandem serve --daemon`, waits for 20 + the control socket to become healthy, prints the PID, and exits. `tandem serve` 21 + is the foreground mode for systemd, Docker, or debugging. Both modes create 22 + a control socket so `tandem down/status/logs` work against either. 16 23 17 24 ## Core model 18 25 ··· 93 100 94 101 ## Test Coverage 95 102 96 - 16 integration tests across slices 1-7: 103 + 34 integration tests across slices 1-13: 97 104 98 105 | Slice | Test File | Coverage | 99 106 |-------|-----------|----------| ··· 104 111 | 5 | `tests/slice5_watch_heads.rs` | Real-time head notifications | 105 112 | 6 | `tests/slice6_git_round_trip.rs` | Git push/fetch round-trip | 106 113 | 7 | `tests/slice7_end_to_end.rs` | Multi-agent + git + external contributor | 114 + | 10 | `tests/slice10_graceful_shutdown.rs` | Signal handling, clean exit, log flags | 115 + | 11 | `tests/slice11_control_socket.rs` | Control socket, status reporting | 116 + | 12 | `tests/slice12_up_down.rs` | Daemon lifecycle (up/down), duplicate detection | 117 + | 13 | `tests/slice13_log_streaming.rs` | Log streaming, level filtering, JSON output | 107 118 108 119 All tests assert on **file byte content**, not just commit descriptions. 109 120 ··· 125 136 src/ 126 137 main.rs CLI dispatch (clap) + CliRunner passthrough 127 138 server.rs Server — jj Git backend + Cap'n Proto RPC 139 + control.rs Control socket — daemon management (Unix socket, JSON lines) 128 140 backend.rs TandemBackend (jj-lib Backend trait) 129 141 op_store.rs TandemOpStore (jj-lib OpStore trait) 130 142 op_heads_store.rs TandemOpHeadsStore (jj-lib OpHeadsStore trait) ··· 135 147 tandem.capnp Cap'n Proto schema (Store + HeadWatcher) 136 148 tests/ 137 149 common/mod.rs Test harness (server spawn, HOME isolation) 138 - slice1-7 tests Integration tests asserting on file bytes 150 + slice1-7 tests Core integration tests (file round-trip, visibility, CAS, git) 151 + slice10-13 tests Server lifecycle tests (shutdown, control socket, up/down, logs) 139 152 ``` 140 153 141 154 ## Non-goals
+3
Cargo.toml
··· 55 55 serde = { version = "1", features = ["derive"] } 56 56 serde_json = "1" 57 57 dunce = "1" 58 + libc = "0.2" 58 59 59 60 [build-dependencies] 60 61 capnpc = "0.20" 61 62 62 63 [dev-dependencies] 63 64 tempfile = "3" 65 + libc = "0.2" 66 + serde_json = "1"
+288 -244
README.md
··· 4 4 > protocol, on-disk format, and CLI surface may change. Don't use it for 5 5 > data you can't regenerate. Back up your repos. 6 6 7 - jj workspaces over the network. One server, many agents in many vms, real files. 8 - 9 - ``` 10 - tandem serve --listen 0.0.0.0:13013 --repo ~/project # server 11 - tandem init --tandem-server=host:13013 ~/work # agent 12 - tandem new -m "feat: add auth" # it's just jj 13 - ``` 14 - 15 - tandem is a single binary that embeds [jj](https://jj-vcs.com). The server 16 - hosts a jj+git repo — typically on a VM or VPS as a long-running service. 17 - Agents on remote machines get transparent read/write access over Cap'n Proto 18 - RPC. Every stock jj command works — `log`, `new`, `diff`, `file show`, 19 - `bookmark`, `describe` — because tandem implements jj-lib's `Backend`, 20 - `OpStore`, and `OpHeadsStore` traits as RPC stubs. 21 - 22 - ## Why 23 - 24 - Coding agents need to collaborate on the same codebase without stepping on 25 - each other. The current approach — git worktrees on a single machine — breaks 26 - down when agents run on different machines, fight over `.git` locks, or need 27 - to read each other's work-in-progress. 28 - 29 - tandem gives each agent an isolated workspace that shares a single store over 30 - the network. Agents see each other's commits instantly. No push/pull, no merge 31 - conflicts on the transport layer. The server ships to GitHub when you're ready. 32 - 33 - ## How it works 34 - 35 - ``` 36 - ┌──────────────┐ ┌──────────────────────────┐ 37 - │ Agent A │ Cap'n Proto RPC │ │ 38 - │ (Machine B) │◄─────────────────────────►│ tandem serve │ 39 - │ │ │ (Machine A) │ 40 - │ ~/work-a/ │ │ │ 41 - │ src/auth.rs │ │ ┌────────────────────┐ │ 42 - │ src/lib.rs │ │ │ Content-Addressed │ │ 43 - └──────────────┘ │ │ Store │ │ 44 - ┌──────────────┐ │ │ │ │ 45 - │ Agent B │ Cap'n Proto RPC │ │ jj+git repo │ │ 46 - │ (Machine C) │◄─────────────────────────►│ │ operations │ │──► git push 47 - │ │ │ │ views │ │ 48 - │ ~/work-b/ │ │ │ op heads (CAS) │ │ 49 - │ src/api.rs │ │ └────────────────────┘ │ 50 - └──────────────┘ │ │ 51 - ┌──────────────┐ │ │ 52 - │ Agent C │ Cap'n Proto RPC │ │ 53 - │ (Machine D) │◄─────────────────────────►│ │ 54 - │ │ │ │ 55 - │ ~/work-c/ │ └──────────────────────────┘ 56 - │ tests/*.rs │ 57 - └──────────────┘ 58 - ``` 59 - 60 - Each agent has a full working copy on its local disk (fast reads/writes). 61 - The commit store lives on the server. When Agent A commits, Agent B sees it 62 - instantly in `tandem log` — no fetch, no pull, no merge. 63 - 64 - The `tandem` binary has two modes: 65 - 66 - - **`tandem serve`** — hosts the jj+git repo, accepts RPC connections 67 - - **`tandem <jj-command>`** — runs stock jj with tandem as the remote store 68 - 69 - The client registers three jj-lib trait implementations: 70 - 71 - | Trait | What it stores | RPC calls | 72 - |-------|---------------|-----------| 73 - | `Backend` | Files, trees, commits, symlinks | `getObject`, `putObject` | 74 - | `OpStore` | Operations, views | `getObject`, `putObject` | 75 - | `OpHeadsStore` | Operation head pointers | `getHeads`, `updateOpHeads` (CAS) | 76 - 77 - Concurrent writes use compare-and-swap on operation heads with automatic 78 - retry. Two agents committing simultaneously both succeed — CAS contention 79 - resolves transparently. 7 + jj workspaces over the network. One server, many agents on many machines, real files. 80 8 81 9 ## Install 82 10 ··· 100 28 101 29 ## Quickstart 102 30 103 - ### Start a server 104 - 105 31 ```bash 106 - tandem serve --listen 0.0.0.0:13013 --repo ~/project 107 - ``` 32 + # Start a server (on your VPS, or locally for testing) 33 + tandem up --repo ~/project --listen 0.0.0.0:13013 108 34 109 - For production use, run this on a VM/VPS — see [Deployment setups](#deployment-setups) below. 35 + # Check it's running 36 + tandem status 110 37 111 - ### Connect agents 38 + # On agent machines: initialize a workspace 39 + tandem init --tandem-server=your-vps:13013 ~/work 40 + cd ~/work 41 + echo 'pub fn auth() {}' > auth.rs 42 + tandem new -m "feat: add auth" 112 43 113 - ```bash 114 - # Agent A 115 - tandem init --tandem-server=server:13013 ~/work-a 116 - cd ~/work-a 117 - echo 'pub fn auth(token: &str) -> bool { !token.is_empty() }' > auth.rs 118 - tandem new -m "feat: add auth module" 44 + # View logs from the daemon 45 + tandem logs 119 46 120 - # Agent B (different machine, or different terminal) 121 - tandem init --tandem-server=server:13013 --workspace=agent-b ~/work-b 122 - cd ~/work-b 123 - echo 'pub fn api() -> String { "ok".into() }' > api.rs 124 - tandem new -m "feat: add API handler" 47 + # Stop the server 48 + tandem down 125 49 ``` 126 50 127 - ### What agents see 51 + Every jj command works through `tandem` — `log`, `new`, `diff`, `file show`, 52 + `bookmark`, `describe` — because tandem implements jj-lib's store traits 53 + as RPC stubs. The server holds a real jj+git repo. 54 + 55 + --- 128 56 129 - Agent B runs `tandem log` and sees everyone's work: 57 + ## Deployment 130 58 131 - ``` 132 - @ w agent-b agent-b@ f3f18a89 133 - │ (empty) feat: add API handler 134 - ○ o agent-b a918ed0d 135 - │ api.rs 136 - │ ○ k agent-a default@ 7acb3ff6 137 - │ │ (empty) feat: add auth module 138 - │ ○ u agent-a 78f31413 139 - ├─╯ auth.rs 140 - ◆ z root() 00000000 141 - ``` 59 + ### On a VPS (recommended) 142 60 143 - Agent B reads Agent A's file directly: 61 + The default setup. Server on a VPS, agents connect from their machines. 144 62 145 63 ```bash 146 - $ tandem file show -r k auth.rs 147 - pub fn auth(token: &str) -> bool { !token.is_empty() } 64 + # SSH to your VPS, install tandem 65 + cargo install jj-tandem 66 + 67 + # Start the server 68 + tandem up --repo /srv/project --listen 0.0.0.0:13013 69 + 70 + # Verify 71 + tandem status 148 72 ``` 149 73 150 - ### Ship via git 74 + On agent machines: 151 75 152 - On the server: 76 + ```bash 77 + # Agent A 78 + tandem init --tandem-server=your-vps:13013 ~/work 79 + cd ~/work 80 + echo 'pub fn auth(token: &str) -> bool { !token.is_empty() }' > auth.rs 81 + tandem new -m "feat: add auth module" 82 + 83 + # Agent B (different machine) 84 + tandem init --tandem-server=your-vps:13013 --workspace=agent-b ~/work 85 + cd ~/work 86 + tandem log # sees Agent A's commit 87 + tandem file show -r <change-id> auth.rs # reads Agent A's file 88 + echo 'pub fn api() -> &str { "ok" }' > api.rs 89 + tandem new -m "feat: add API handler" 90 + ``` 91 + 92 + Ship via git from the server: 153 93 154 94 ```bash 95 + # On the VPS 96 + cd /srv/project 155 97 jj bookmark create main -r <tip> 156 98 jj git push --bookmark main 157 99 ``` 158 100 159 - The server is a real jj+git repo. Standard git push just works. 101 + The server is a real jj+git repo. `jj git push` just works. 160 102 161 - --- 162 - 163 - ## Deployment setups 103 + ### Local testing 164 104 165 - ### Local: multiple terminals (for testing) 166 - 167 - The simplest setup for trying tandem out. Server and agents on the same 168 - machine, different directories. Not how you'd run it for real work — see 169 - Remote machines below. 105 + Server and agents on the same machine, different directories. 170 106 171 107 ```bash 172 - # Terminal 1 — server 173 - tandem serve --listen 127.0.0.1:13013 --repo /tmp/project 108 + # Start server 109 + tandem up --repo /tmp/project --listen 127.0.0.1:13013 174 110 175 - # Terminal 2 — agent A 111 + # Agent A 176 112 tandem init --tandem-server=127.0.0.1:13013 /tmp/agent-a 177 113 cd /tmp/agent-a && echo 'hello' > file.txt && tandem new -m "agent A" 178 114 179 - # Terminal 3 — agent B 115 + # Agent B 180 116 tandem init --tandem-server=127.0.0.1:13013 --workspace=agent-b /tmp/agent-b 181 117 cd /tmp/agent-b && tandem log # sees agent A's commit 182 - ``` 183 118 184 - ### Remote machines: sprites.dev / exe.dev / SSH 185 - 186 - The typical production setup. Server on a VPS/VM, agents on separate machines. 187 - 188 - ```bash 189 - # VPS — server 190 - tandem serve --listen 0.0.0.0:13013 --repo /srv/project 191 - 192 - # Machine A — agent A (e.g. sprites.dev sandbox) 193 - # Copy the binary over, or build on the remote machine 194 - scp target/release/tandem agent-a-host:/usr/local/bin/ 195 - ssh agent-a-host 196 - export TANDEM_SERVER=server-host:13013 197 - tandem init ~/work 198 - cd ~/work 199 - # ... write code, commit with tandem new ... 200 - 201 - # Machine B — agent B (e.g. exe.dev VM) 202 - scp target/release/tandem agent-b-host:/usr/local/bin/ 203 - ssh agent-b-host 204 - export TANDEM_SERVER=server-host:13013 205 - tandem init --workspace=agent-b ~/work 206 - cd ~/work 207 - tandem log # sees agent A's commits 208 - tandem file show -r <change-id> src/auth.rs # reads agent A's files 119 + # Done 120 + tandem down 209 121 ``` 210 122 211 - Requirements: 212 - - Server port (default 13013) must be reachable from agent machines 213 - - No TLS yet — use SSH tunnels or VPN for untrusted networks 214 - - The `tandem` binary is ~30MB, statically linkable, no runtime deps 215 - 216 - ### Docker: 3 agents on a shared network 123 + ### Docker 217 124 218 - Each agent runs in its own container. They connect to the server container 219 - by hostname over a Docker bridge network. 125 + Containers connecting to a server. Use `tandem serve` (foreground mode) — 126 + appropriate for container entrypoints. 220 127 221 128 ```bash 222 - # Build Linux binary (if on macOS) 223 - docker run --rm -v $(pwd):/src -v tandem-cargo:/usr/local/cargo/registry \ 224 - -w /src rust:1.84-slim \ 225 - bash -c 'apt-get update -qq && apt-get install -y -qq capnproto >/dev/null 2>&1 && cargo build --release' 226 - 227 - # Create network 228 129 docker network create tandem-net 229 130 230 - # Server 131 + # Server container 231 132 docker run -d --name tandem-server --network tandem-net \ 232 133 -v $(pwd)/target/release/tandem:/usr/local/bin/tandem \ 233 134 debian:trixie-slim \ 234 135 tandem serve --listen 0.0.0.0:13013 --repo /srv/project 235 136 236 - # Agent A 137 + # Agent container 237 138 docker run --rm --network tandem-net \ 238 139 -v $(pwd)/target/release/tandem:/usr/local/bin/tandem \ 239 140 debian:trixie-slim bash -c ' 240 141 tandem init --tandem-server=tandem-server:13013 /work 241 142 cd /work 242 143 echo "from agent A" > hello.txt 243 - tandem --config=fsmonitor.backend=none new -m "agent A commit" 244 - tandem --config=fsmonitor.backend=none log --no-graph 245 - ' 246 - 247 - # Agent B 248 - docker run --rm --network tandem-net \ 249 - -v $(pwd)/target/release/tandem:/usr/local/bin/tandem \ 250 - debian:trixie-slim bash -c ' 251 - tandem init --tandem-server=tandem-server:13013 --workspace=agent-b /work 252 - cd /work 253 - tandem --config=fsmonitor.backend=none log --no-graph 254 - tandem --config=fsmonitor.backend=none file show -r <agent-a-change> hello.txt 144 + tandem new -m "agent A commit" 145 + tandem log --no-graph 255 146 ' 256 147 257 - # Cleanup 258 148 docker stop tandem-server && docker rm tandem-server 259 149 docker network rm tandem-net 260 150 ``` 261 151 262 - This simulates cross-machine communication. Each container has its own 263 - filesystem, its own network identity, and connects to the server by DNS name. 264 - Tested — see `qa/v1/cross-machine-report.md`. 152 + ### With systemd 153 + 154 + Use `tandem serve` (foreground mode) for process managers. 155 + 156 + ```ini 157 + [Unit] 158 + Description=tandem server 159 + After=network.target 265 160 266 - ### Claude Code: multi-agent with tandem 161 + [Service] 162 + ExecStart=/usr/local/bin/tandem serve --listen 0.0.0.0:13013 --repo /srv/project 163 + Restart=on-failure 164 + User=tandem 267 165 268 - Each Claude Code instance gets its own tandem workspace. They see each 269 - other's work in real time via the shared store. 166 + [Install] 167 + WantedBy=multi-user.target 168 + ``` 169 + 170 + `tandem serve` also creates a control socket, so `tandem status`, `tandem logs`, 171 + and `tandem down` work against it too. 172 + 173 + ### With Claude Code / AI agents 174 + 175 + Each agent gets its own tandem workspace. They see each other's work in real 176 + time via the shared store. 270 177 271 178 ```bash 272 - # Server (your machine) 273 - tandem serve --listen 0.0.0.0:13013 --repo ~/project 179 + # Server (your VPS) 180 + tandem up --repo /srv/project --listen 0.0.0.0:13013 274 181 275 - # Agent 1 — in one terminal 276 - tandem init --tandem-server=localhost:13013 --workspace=backend ~/work-backend 182 + # Agent 1 183 + tandem init --tandem-server=your-vps:13013 --workspace=backend ~/work-backend 277 184 cd ~/work-backend 278 - claude --prompt "Implement auth module in src/auth.rs. Use tandem for version control (not git). Run tandem log to see context." 185 + claude --prompt "Implement auth module in src/auth.rs. Use tandem for version control." 279 186 280 - # Agent 2 — in another terminal 281 - tandem init --tandem-server=localhost:13013 --workspace=frontend ~/work-frontend 187 + # Agent 2 188 + tandem init --tandem-server=your-vps:13013 --workspace=frontend ~/work-frontend 282 189 cd ~/work-frontend 283 - claude --prompt "Implement UI in src/routes.rs. Run tandem log to see other agents' work. Read files with: tandem file show -r <change-id> <path>" 284 - 285 - # Agent 3 — in another terminal 286 - tandem init --tandem-server=localhost:13013 --workspace=tests ~/work-tests 287 - cd ~/work-tests 288 - claude --prompt "Write tests for the code other agents wrote. Run tandem log, then tandem file show to read their implementations." 190 + claude --prompt "Implement UI. Run tandem log to see other agents' work." 289 191 ``` 290 192 291 193 Add this to each agent's system prompt or CLAUDE.md: ··· 304 206 Do NOT use git commands — this repo uses tandem. 305 207 ``` 306 208 307 - ### Orchestrator pattern 209 + --- 210 + 211 + ## Commands 212 + 213 + ### Server lifecycle 214 + 215 + Start, stop, and monitor the tandem server. 216 + 217 + ``` 218 + tandem up --repo <path> --listen <addr> Start background daemon 219 + tandem down Stop the daemon 220 + tandem status Check if daemon is running 221 + tandem logs Stream logs from daemon 222 + tandem serve --listen <addr> --repo <path> Start server (foreground) 223 + ``` 224 + 225 + **tandem up** — starts a background daemon and returns immediately. 226 + 227 + ``` 228 + tandem up --repo <path> --listen <addr> [--log-level <level>] [--log-file <path>] 229 + [--control-socket <path>] 230 + ``` 231 + 232 + Forks `tandem serve --daemon` in the background. Waits for the control socket 233 + to become healthy, prints the PID, exits. If a daemon is already running, 234 + exits with an error. 235 + 236 + **tandem down** — stops the running daemon. 237 + 238 + ``` 239 + tandem down [--control-socket <path>] 240 + ``` 241 + 242 + Sends a shutdown request via the control socket, waits for the process to exit. 243 + 244 + **tandem status** — reports whether the daemon is running. 245 + 246 + ``` 247 + tandem status [--json] [--control-socket <path>] 248 + ``` 249 + 250 + Exit code 0 = running, 1 = not running. 308 251 309 - One orchestrator manages the server and ships code. Multiple agents work 310 - independently. 252 + ``` 253 + $ tandem status 254 + tandem is running 255 + PID: 1234 256 + Uptime: 2h 15m 257 + Repo: /srv/project 258 + Listen: 0.0.0.0:13013 259 + Version: 0.3.0 260 + ``` 311 261 312 - ```bash 313 - # Orchestrator machine 314 - tandem serve --listen 0.0.0.0:13013 --repo ~/project 262 + ``` 263 + $ tandem status --json 264 + {"running":true,"pid":1234,"uptime_secs":8100,"repo":"/srv/project","listen":"0.0.0.0:13013","version":"0.3.0"} 265 + ``` 315 266 316 - # ... agents do their work on remote machines ... 267 + **tandem logs** — streams log output from the daemon. 317 268 318 - # When ready to ship: 319 - cd ~/project 320 - jj log # see all agents' work 321 - jj new --no-edit -m "merge: auth + api" # create merge point 322 - jj bookmark create main -r <tip> 323 - jj git push --bookmark main # ship to GitHub 269 + ``` 270 + tandem logs [--level <level>] [--json] [--control-socket <path>] 324 271 ``` 325 272 326 - The orchestrator never writes code. They review with `jj log`, `jj diff`, 327 - `jj show`, and ship with `jj git push`. The server repo IS the jj+git repo, 328 - so standard git tooling works. 273 + Connects to the control socket and streams log events. `--level` filters 274 + server-side (trace, debug, info, warn, error). `--json` outputs raw JSON 275 + lines instead of formatted text. 276 + 277 + **tandem serve** — runs the server in the foreground. Use this for systemd, 278 + Docker, or debugging. Logs to stderr. 279 + 280 + ``` 281 + tandem serve --listen <addr> --repo <path> [--log-level <level>] [--log-format <fmt>] 282 + [--control-socket <path>] [--log-file <path>] 283 + ``` 284 + 285 + ### Workspace setup 286 + 287 + ``` 288 + tandem init --tandem-server <addr> [--workspace <name>] [path] 289 + ``` 290 + 291 + Initializes a tandem-backed workspace. Creates the directory, registers the 292 + tandem backend, and connects to the server. `--workspace` names the workspace 293 + (default: `default`). Each agent should use a unique workspace name. 294 + 295 + ### Watch 296 + 297 + ``` 298 + tandem watch --server <addr> 299 + ``` 300 + 301 + Streams head change notifications from the server. Useful for triggering 302 + rebuilds or CI when any agent commits. 303 + 304 + ### Everything else 305 + 306 + Every jj command works through tandem: 307 + 308 + ``` 309 + tandem log Show commit history 310 + tandem new -m "message" Create new change 311 + tandem diff -r @- Show changes 312 + tandem file show -r <rev> <path> Read file at revision 313 + tandem bookmark create <name> -r <rev> Create bookmark 314 + tandem describe -m "message" Update description 315 + ``` 316 + 317 + The `tandem` binary embeds jj — these are stock jj commands running against 318 + the remote store. 329 319 330 320 --- 331 321 322 + ## Environment variables 323 + 324 + | Variable | Purpose | 325 + |----------|---------| 326 + | `TANDEM_SERVER` | Server address — fallback for `--tandem-server` | 327 + | `TANDEM_WORKSPACE` | Workspace name (default: `default`) | 328 + 329 + --- 330 + 331 + ## Why 332 + 333 + Coding agents need to collaborate on the same codebase without stepping on 334 + each other. The current approach — git worktrees on a single machine — breaks 335 + down when agents run on different machines, fight over `.git` locks, or need 336 + to read each other's work-in-progress. 337 + 338 + tandem gives each agent an isolated workspace that shares a single store over 339 + the network. Agents see each other's commits instantly. No push/pull, no merge 340 + conflicts on the transport layer. The server ships to GitHub when you're ready. 341 + 342 + ## How it works 343 + 344 + ``` 345 + ┌──────────────┐ ┌──────────────────────────┐ 346 + │ Agent A │ Cap'n Proto RPC │ │ 347 + │ (Machine B) │◄─────────────────────────►│ tandem serve │ 348 + │ │ │ (Machine A) │ 349 + │ ~/work-a/ │ │ │ 350 + │ src/auth.rs │ │ ┌────────────────────┐ │ 351 + │ src/lib.rs │ │ │ Content-Addressed │ │ 352 + └──────────────┘ │ │ Store │ │ 353 + ┌──────────────┐ │ │ │ │ 354 + │ Agent B │ Cap'n Proto RPC │ │ jj+git repo │ │ 355 + │ (Machine C) │◄─────────────────────────►│ │ operations │ │──► git push 356 + │ │ │ │ views │ │ 357 + │ ~/work-b/ │ │ │ op heads (CAS) │ │ 358 + │ src/api.rs │ │ └────────────────────┘ │ 359 + └──────────────┘ │ │ 360 + ┌──────────────┐ │ │ 361 + │ Agent C │ Cap'n Proto RPC │ │ 362 + │ (Machine D) │◄─────────────────────────►│ │ 363 + │ │ │ │ 364 + │ ~/work-c/ │ └──────────────────────────┘ 365 + │ tests/*.rs │ 366 + └──────────────┘ 367 + ``` 368 + 369 + Each agent has a full working copy on its local disk (fast reads/writes). 370 + The commit store lives on the server. When Agent A commits, Agent B sees it 371 + instantly in `tandem log` — no fetch, no pull, no merge. 372 + 373 + The `tandem` binary has two ways to run the server: 374 + 375 + - **`tandem up`** — starts a background daemon. No systemd needed. 376 + - **`tandem serve`** — runs in the foreground. For systemd, Docker, debugging. 377 + 378 + And one way to run as a client: 379 + 380 + - **`tandem <jj-command>`** — runs stock jj with tandem as the remote store. 381 + 382 + The client registers three jj-lib trait implementations: 383 + 384 + | Trait | What it stores | RPC calls | 385 + |-------|---------------|-----------| 386 + | `Backend` | Files, trees, commits, symlinks | `getObject`, `putObject` | 387 + | `OpStore` | Operations, views | `getObject`, `putObject` | 388 + | `OpHeadsStore` | Operation head pointers | `getHeads`, `updateOpHeads` (CAS) | 389 + 390 + Concurrent writes use compare-and-swap on operation heads with automatic 391 + retry. Two agents committing simultaneously both succeed — CAS contention 392 + resolves transparently. 393 + 332 394 ## vs git worktrees 333 395 334 396 Most multi-agent tools (Conductor, Claude Squad, Cursor) use git worktrees ··· 351 413 352 414 --- 353 415 354 - ## Commands 355 - 356 - ``` 357 - tandem serve --listen <addr> --repo <path> Start server 358 - tandem init --tandem-server <addr> [path] Init workspace 359 - tandem watch --server <addr> Stream head notifications 360 - tandem log Show commit history 361 - tandem new -m "message" Create new change 362 - tandem diff -r @- Show changes 363 - tandem file show -r <rev> <path> Read file at revision 364 - tandem bookmark create <name> -r <rev> Create bookmark 365 - tandem describe -m "message" Update description 366 - tandem ... Any jj command 367 - ``` 368 - 369 - ## Environment variables 370 - 371 - | Variable | Purpose | 372 - |----------|---------| 373 - | `TANDEM_SERVER` | Server address — fallback for `--tandem-server` | 374 - | `TANDEM_WORKSPACE` | Workspace name (default: `default`) | 375 - 376 416 ## Tests 377 417 378 418 ```bash 379 419 cargo test 380 420 ``` 381 421 382 - 16 integration tests covering: 422 + 34 integration tests covering: 383 423 384 424 - Single-agent file round-trip (write → commit → read back exact bytes) 385 425 - Two-agent cross-workspace file visibility ··· 388 428 - WatchHeads real-time notifications 389 429 - Git round-trip (tandem → jj git objects) 390 430 - End-to-end multi-agent with bookmarks 431 + - Signal handling and graceful shutdown 432 + - Control socket status reporting 433 + - Daemon lifecycle (up/down) 434 + - Log streaming 391 435 392 436 Cross-machine tested with Docker containers — see `qa/v1/cross-machine-report.md`. 393 437 394 438 ## Known limitations 395 439 396 - - **No TLS** — connections are plaintext. For production on a VPS, use SSH tunnels (`ssh -L`) or a VPN. 397 - - **No auth** — anyone who can reach the port can read/write the repo. On a VPS, firewall the port and use SSH tunnels for access. 440 + - **No TLS** — connections are plaintext. Use SSH tunnels or a VPN for untrusted networks. 441 + - **No auth** — anyone who can reach the port can read/write the repo. Firewall the port and use SSH tunnels for access. 442 + - **Unix only for daemon management** — `tandem up`, `tandem down`, `tandem status`, and `tandem logs` use Unix domain sockets. macOS and Linux only, not Windows. (`tandem serve` works everywhere.) 398 443 - **No static binary yet** — requires glibc 2.39+. Use matching distro or build locally. 399 444 - **fsmonitor conflict** — if your jj config has `fsmonitor.backend = "watchman"`, 400 445 pass `--config=fsmonitor.backend=none` to tandem commands. 401 - - **Description-based revsets** — `description(exact:"...")` may not work for 402 - cross-workspace queries. Use change IDs from `tandem log` instead. 403 446 404 447 ## Running in production 405 448 406 - - **Back up the server repo directory** — it's the source of truth. If lost without backups, data is gone (unless mirrored to GitHub via `jj git push`). 407 - - **Use a process manager** (systemd, supervisord, etc.) to keep `tandem serve` running. 408 - - **Git credentials on the server** — the server needs SSH keys or tokens for `jj git push` / `jj git fetch` to GitHub. 409 - - **Monitor disk space** — all agent objects (files, trees, commits) land on the server. 410 - - **Firewall the port** — no auth means network-level access control is your only defense. SSH tunnels or VPN. 449 + - **Back up the server repo directory** — it's the source of truth. 450 + - **Git credentials on the server** — the server needs SSH keys or tokens for `jj git push` / `jj git fetch`. 451 + - **Monitor disk space** — all agent objects land on the server. 452 + - **Firewall the port** — no auth means network-level access control is your only defense. 411 453 412 454 ## Project structure 413 455 ··· 415 457 src/ 416 458 main.rs CLI dispatch (clap) + jj CliRunner passthrough 417 459 server.rs Server — jj Git backend + Cap'n Proto RPC 460 + control.rs Control socket — daemon management protocol (Unix socket, JSON lines) 418 461 backend.rs TandemBackend (jj-lib Backend trait over RPC) 419 462 op_store.rs TandemOpStore (jj-lib OpStore trait over RPC) 420 463 op_heads_store.rs TandemOpHeadsStore (CAS head management over RPC) ··· 425 468 tandem.capnp Cap'n Proto schema (13 Store methods + HeadWatcher) 426 469 tests/ 427 470 common/mod.rs Test harness (server spawn, HOME isolation) 428 - slice1-7 tests Integration tests asserting on file bytes 471 + slice1-7 tests Core integration tests (file round-trip, visibility, CAS, git) 472 + slice10-13 tests Server lifecycle tests (shutdown, control socket, up/down, logs) 429 473 ``` 430 474 431 475 ## License
-109
docs/exec-plans/active/server-lifecycle.md
··· 1 - # Execution Plan: Server Lifecycle 2 - 3 - **Design doc:** `docs/design-docs/server-lifecycle.md` 4 - 5 - Implements `tandem up/down/status/logs` — daemon management without systemd. 6 - 7 - ## Slice 10 — Signal handling and graceful shutdown 8 - 9 - **Goal:** `tandem serve` handles SIGTERM/SIGINT cleanly. Prerequisite for 10 - everything else — daemon mode needs reliable shutdown. 11 - 12 - **Work:** 13 - 14 - - Install signal handler (tokio::signal) in `tandem serve`. 15 - - On SIGTERM/SIGINT: stop accepting new connections, drain in-flight RPCs 16 - (5s timeout), close listeners, exit 0. 17 - - Second signal: immediate exit. 18 - - Add `--log-level` and `--log-format` flags to `tandem serve`. 19 - 20 - **Acceptance:** 21 - 22 - - `tandem serve` + SIGINT exits 0 (not 130). 23 - - In-flight `getObject` call during shutdown completes (not dropped). 24 - - `--log-level debug` produces debug output to stderr. 25 - - Existing slice 1-7 tests still pass. 26 - 27 - **Test:** `tests/slice10_graceful_shutdown.rs` 28 - - Start server, connect client, send SIGTERM, verify clean exit. 29 - - Start server, begin slow read, send SIGTERM, verify read completes. 30 - 31 - ## Slice 11 — Control socket and tandem status 32 - 33 - **Goal:** `tandem serve` opens a control socket. `tandem status` queries it. 34 - 35 - **Work:** 36 - 37 - - Add HTTP-over-Unix-socket listener to `tandem serve` (axum + hyper-unix). 38 - - Implement `GET /status` on control socket. 39 - - Socket path: `$XDG_RUNTIME_DIR/tandem/control.sock` (Linux), 40 - `$TMPDIR/tandem/control.sock` (macOS). Override with `--control-socket`. 41 - - Implement `tandem status` command: connect to control socket, print output. 42 - - Implement `tandem status --json`. 43 - - Exit code 0 = running, 1 = not running. 44 - 45 - **Acceptance:** 46 - 47 - - `tandem serve` creates control socket. 48 - - `tandem status` prints human-readable output while server runs. 49 - - `tandem status --json` returns valid JSON with pid, uptime, repo, listen fields. 50 - - `tandem status` exits 1 when no server is running. 51 - - Control socket is cleaned up on server exit (from slice 10). 52 - 53 - **Test:** `tests/slice11_control_socket.rs` 54 - - Start server, run `tandem status --json`, parse output, verify fields. 55 - - No server running, run `tandem status`, verify exit code 1. 56 - 57 - ## Slice 12 — tandem up and tandem down 58 - 59 - **Goal:** `tandem up` starts a background daemon. `tandem down` stops it. 60 - 61 - **Work:** 62 - 63 - - Implement `--daemon` internal flag on `tandem serve` (detach, redirect 64 - stdio, write PID file). 65 - - Implement `tandem up`: validate flags, fork `tandem serve --daemon`, wait 66 - for control socket readiness, print PID, exit 0. 67 - - Implement `tandem down`: connect to control socket, `POST /shutdown`, 68 - wait for process exit. 69 - - `tandem up` when already running: exit 1 with message. 70 - - PID file at `$XDG_RUNTIME_DIR/tandem/daemon.pid`. 71 - 72 - **Acceptance:** 73 - 74 - - `tandem up --repo ... --listen ...` returns immediately, daemon is running. 75 - - `tandem status` shows running after `tandem up`. 76 - - `tandem down` stops daemon, `tandem status` shows not running. 77 - - `tandem up` twice: second invocation errors with "already running". 78 - - PID file and control socket cleaned up after `tandem down`. 79 - 80 - **Test:** `tests/slice12_up_down.rs` 81 - - `tandem up`, verify status, connect client, read object, `tandem down`, verify stopped. 82 - - `tandem up` twice, verify error. 83 - 84 - ## Slice 13 — tandem logs (streaming) 85 - 86 - **Goal:** `tandem logs` streams log output from a running daemon. 87 - 88 - **Work:** 89 - 90 - - Add tracing subscriber that fans out to: file/stderr + SSE clients. 91 - - Implement `GET /logs?level=<level>` on control socket (SSE stream). 92 - - Implement `tandem logs` command: connect to SSE endpoint, print lines. 93 - - `--level` flag on `tandem logs` (default: info). 94 - - `--json` flag for raw JSON log lines. 95 - - Client can request higher verbosity than daemon's file log level. 96 - 97 - **Acceptance:** 98 - 99 - - `tandem logs` prints log lines as events happen. 100 - - `tandem logs --level debug` shows debug events even if daemon was started 101 - with `--log-level info`. 102 - - `tandem logs --json` outputs one JSON object per line. 103 - - `tandem logs` exits cleanly when daemon shuts down. 104 - - `tandem logs` with no daemon: exit 1 with helpful message. 105 - 106 - **Test:** `tests/slice13_log_streaming.rs` 107 - - Start daemon, connect client, trigger activity, verify `tandem logs` output 108 - contains expected event. 109 - - Verify `--level debug` produces more output than `--level warn`.
+54
docs/exec-plans/completed/server-lifecycle.md
··· 1 + # Execution Plan: Server Lifecycle — COMPLETED 2 + 3 + **Design doc:** `docs/design-docs/server-lifecycle.md` 4 + 5 + Implements `tandem up/down/status/logs` — daemon management without systemd. 6 + 7 + **Completed:** 2026-02-19. All 4 slices implemented, 19 tests passing. 8 + 9 + ## Slice 10 — Signal handling and graceful shutdown ✅ 10 + 11 + **Test:** `tests/slice10_graceful_shutdown.rs` (6 tests) 12 + 13 + Implemented: 14 + - Signal handler (tokio::signal) for SIGTERM/SIGINT in `tandem serve`. 15 + - Clean shutdown: stop accepting connections, drain in-flight RPCs, exit 0. 16 + - Double signal: immediate exit. 17 + - `--log-level` and `--log-format` flags on `tandem serve`. 18 + 19 + ## Slice 11 — Control socket and tandem status ✅ 20 + 21 + **Test:** `tests/slice11_control_socket.rs` (5 tests) 22 + 23 + Implemented: 24 + - Control socket (Unix domain socket, JSON lines) in `tandem serve`. 25 + - `tandem status` command with human-readable and `--json` output. 26 + - Socket path defaults to `$TMPDIR/tandem/control.sock`, override with `--control-socket`. 27 + - Exit code 0 = running, 1 = not running. 28 + - Socket cleaned up on server exit. 29 + 30 + Implementation note: used newline-delimited JSON over Unix stream socket 31 + instead of HTTP-over-Unix-socket (simpler, no axum/hyper dependency needed). 32 + 33 + ## Slice 12 — tandem up and tandem down ✅ 34 + 35 + **Test:** `tests/slice12_up_down.rs` (4 tests) 36 + 37 + Implemented: 38 + - `tandem up` forks `tandem serve --daemon`, waits for control socket health, prints PID. 39 + - `tandem down` sends shutdown via control socket, waits for process exit. 40 + - `tandem up` when already running: exits 1 with "already running" message. 41 + - `--daemon` is a hidden internal flag on `tandem serve`. 42 + - Daemon stdout/stderr redirected to log file. 43 + 44 + ## Slice 13 — tandem logs (streaming) ✅ 45 + 46 + **Test:** `tests/slice13_log_streaming.rs` (4 tests) 47 + 48 + Implemented: 49 + - Broadcast channel in server fans out log events to control socket clients. 50 + - `tandem logs` connects to control socket, streams JSON log events. 51 + - `--level` flag filters server-side (trace/debug/info/warn/error). 52 + - `--json` flag outputs raw JSON lines; default is formatted text. 53 + - Exits cleanly when daemon shuts down. 54 + - "no tandem daemon running" message when no daemon found.
+298
src/control.rs
··· 1 + //! Control socket — newline-delimited JSON over Unix stream socket. 2 + //! 3 + //! Protocol: 4 + //! Client sends one JSON line: {"type": "status"} / {"type": "shutdown"} / {"type": "logs", "level": "debug"} 5 + //! Server responds with one or more JSON lines. 6 + //! For status/shutdown: single response line, then close. 7 + //! For logs: streaming response lines until client disconnects or server shuts down. 8 + 9 + use serde::{Deserialize, Serialize}; 10 + use std::path::Path; 11 + use std::time::Instant; 12 + use tokio::sync::broadcast; 13 + 14 + // ─── Protocol types ─────────────────────────────────────────────────────────── 15 + 16 + #[derive(Debug, Serialize, Deserialize)] 17 + pub struct StatusResponse { 18 + pub running: bool, 19 + pub pid: u32, 20 + pub uptime_secs: u64, 21 + pub repo: String, 22 + pub listen: String, 23 + pub version: String, 24 + } 25 + 26 + #[derive(Debug, Clone, Serialize, Deserialize)] 27 + pub struct LogEvent { 28 + pub ts: String, 29 + pub level: String, 30 + pub msg: String, 31 + } 32 + 33 + /// Shared server state for the control socket. 34 + pub struct ControlState { 35 + pub pid: u32, 36 + pub start_time: Instant, 37 + pub repo: String, 38 + pub listen: String, 39 + pub shutdown_tx: tokio::sync::mpsc::Sender<()>, 40 + pub log_tx: broadcast::Sender<LogEvent>, 41 + } 42 + 43 + impl ControlState { 44 + #[allow(dead_code)] 45 + pub fn emit_log(&self, level: &str, msg: &str) { 46 + let event = LogEvent { 47 + ts: chrono_now(), 48 + level: level.to_string(), 49 + msg: msg.to_string(), 50 + }; 51 + // Ignore send errors (no receivers) 52 + let _ = self.log_tx.send(event); 53 + } 54 + } 55 + 56 + #[allow(dead_code)] 57 + fn chrono_now() -> String { 58 + // Simple ISO-ish timestamp without pulling in chrono 59 + use std::time::SystemTime; 60 + let d = SystemTime::now() 61 + .duration_since(SystemTime::UNIX_EPOCH) 62 + .unwrap_or_default(); 63 + format!("{}Z", d.as_secs()) 64 + } 65 + 66 + fn level_rank(level: &str) -> u8 { 67 + match level.to_lowercase().as_str() { 68 + "trace" => 0, 69 + "debug" => 1, 70 + "info" => 2, 71 + "warn" | "warning" => 3, 72 + "error" => 4, 73 + _ => 2, 74 + } 75 + } 76 + 77 + // ─── Control socket server ─────────────────────────────────────────────────── 78 + 79 + #[cfg(unix)] 80 + pub async fn run_control_socket( 81 + socket_path: String, 82 + state: std::sync::Arc<ControlState>, 83 + ) -> anyhow::Result<()> { 84 + // Remove stale socket if present 85 + let _ = std::fs::remove_file(&socket_path); 86 + if let Some(parent) = Path::new(&socket_path).parent() { 87 + std::fs::create_dir_all(parent)?; 88 + } 89 + 90 + let listener = tokio::net::UnixListener::bind(&socket_path)?; 91 + 92 + loop { 93 + let (stream, _) = listener.accept().await?; 94 + let state = state.clone(); 95 + tokio::spawn(async move { 96 + if let Err(e) = handle_control_connection(stream, state).await { 97 + eprintln!("control connection error: {e:#}"); 98 + } 99 + }); 100 + } 101 + } 102 + 103 + #[cfg(unix)] 104 + async fn handle_control_connection( 105 + stream: tokio::net::UnixStream, 106 + state: std::sync::Arc<ControlState>, 107 + ) -> anyhow::Result<()> { 108 + use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; 109 + 110 + let (reader, mut writer) = stream.into_split(); 111 + let mut reader = BufReader::new(reader); 112 + let mut line = String::new(); 113 + reader.read_line(&mut line).await?; 114 + let line = line.trim(); 115 + 116 + if line.is_empty() { 117 + return Ok(()); 118 + } 119 + 120 + let request: serde_json::Value = serde_json::from_str(line) 121 + .map_err(|e| anyhow::anyhow!("invalid JSON: {e}"))?; 122 + 123 + let req_type = request["type"].as_str().unwrap_or(""); 124 + 125 + match req_type { 126 + "status" => { 127 + let uptime = state.start_time.elapsed().as_secs(); 128 + let resp = StatusResponse { 129 + running: true, 130 + pid: state.pid, 131 + uptime_secs: uptime, 132 + repo: state.repo.clone(), 133 + listen: state.listen.clone(), 134 + version: env!("CARGO_PKG_VERSION").to_string(), 135 + }; 136 + let json = serde_json::to_string(&resp)?; 137 + writer.write_all(json.as_bytes()).await?; 138 + writer.write_all(b"\n").await?; 139 + writer.flush().await?; 140 + } 141 + "shutdown" => { 142 + let resp = serde_json::json!({"type": "shutdown", "ok": true}); 143 + writer.write_all(resp.to_string().as_bytes()).await?; 144 + writer.write_all(b"\n").await?; 145 + writer.flush().await?; 146 + // Signal shutdown 147 + let _ = state.shutdown_tx.send(()).await; 148 + } 149 + "logs" => { 150 + let level_filter = request["level"] 151 + .as_str() 152 + .unwrap_or("info") 153 + .to_string(); 154 + let min_rank = level_rank(&level_filter); 155 + 156 + let mut rx = state.log_tx.subscribe(); 157 + 158 + // Stream log events until client disconnects or channel closes 159 + loop { 160 + match rx.recv().await { 161 + Ok(event) => { 162 + if level_rank(&event.level) >= min_rank { 163 + let json = match serde_json::to_string(&event) { 164 + Ok(j) => j, 165 + Err(_) => continue, 166 + }; 167 + if writer.write_all(json.as_bytes()).await.is_err() { 168 + break; 169 + } 170 + if writer.write_all(b"\n").await.is_err() { 171 + break; 172 + } 173 + if writer.flush().await.is_err() { 174 + break; 175 + } 176 + } 177 + } 178 + Err(broadcast::error::RecvError::Closed) => break, 179 + Err(broadcast::error::RecvError::Lagged(_)) => continue, 180 + } 181 + } 182 + } 183 + _ => { 184 + let resp = serde_json::json!({"type": "error", "msg": format!("unknown request type: {req_type}")}); 185 + writer.write_all(resp.to_string().as_bytes()).await?; 186 + writer.write_all(b"\n").await?; 187 + writer.flush().await?; 188 + } 189 + } 190 + 191 + Ok(()) 192 + } 193 + 194 + // ─── Control socket client ─────────────────────────────────────────────────── 195 + 196 + #[cfg(unix)] 197 + pub fn client_status(socket_path: &str) -> anyhow::Result<StatusResponse> { 198 + use std::io::{BufRead, BufReader, Write}; 199 + use std::os::unix::net::UnixStream; 200 + use std::time::Duration; 201 + 202 + let mut stream = UnixStream::connect(socket_path) 203 + .map_err(|e| anyhow::anyhow!("cannot connect to control socket: {e}"))?; 204 + stream.set_read_timeout(Some(Duration::from_secs(5)))?; 205 + stream.set_write_timeout(Some(Duration::from_secs(5)))?; 206 + 207 + let request = serde_json::json!({"type": "status"}); 208 + writeln!(stream, "{}", request)?; 209 + stream.flush()?; 210 + 211 + let mut reader = BufReader::new(stream); 212 + let mut line = String::new(); 213 + reader.read_line(&mut line)?; 214 + 215 + let status: StatusResponse = serde_json::from_str(line.trim())?; 216 + Ok(status) 217 + } 218 + 219 + #[cfg(unix)] 220 + pub fn client_shutdown(socket_path: &str) -> anyhow::Result<()> { 221 + use std::io::{BufRead, BufReader, Write}; 222 + use std::os::unix::net::UnixStream; 223 + use std::time::Duration; 224 + 225 + let mut stream = UnixStream::connect(socket_path) 226 + .map_err(|e| anyhow::anyhow!("cannot connect to control socket: {e}"))?; 227 + stream.set_read_timeout(Some(Duration::from_secs(5)))?; 228 + stream.set_write_timeout(Some(Duration::from_secs(5)))?; 229 + 230 + let request = serde_json::json!({"type": "shutdown"}); 231 + writeln!(stream, "{}", request)?; 232 + stream.flush()?; 233 + 234 + let mut reader = BufReader::new(stream); 235 + let mut line = String::new(); 236 + reader.read_line(&mut line)?; 237 + Ok(()) 238 + } 239 + 240 + #[cfg(unix)] 241 + pub fn client_logs(socket_path: &str, level: &str, json_output: bool) -> anyhow::Result<()> { 242 + use std::io::{BufRead, BufReader, Write}; 243 + use std::os::unix::net::UnixStream; 244 + 245 + let mut stream = UnixStream::connect(socket_path) 246 + .map_err(|e| anyhow::anyhow!("cannot connect to control socket: {e}"))?; 247 + // No read timeout for streaming 248 + stream.set_write_timeout(Some(std::time::Duration::from_secs(5)))?; 249 + 250 + let request = serde_json::json!({"type": "logs", "level": level}); 251 + writeln!(stream, "{}", request)?; 252 + stream.flush()?; 253 + 254 + let reader = BufReader::new(stream); 255 + for line in reader.lines() { 256 + match line { 257 + Ok(l) if l.trim().is_empty() => continue, 258 + Ok(l) => { 259 + if json_output { 260 + println!("{l}"); 261 + } else { 262 + // Parse and format as human-readable 263 + if let Ok(event) = serde_json::from_str::<LogEvent>(&l) { 264 + println!("[{}] {} {}", event.level, event.ts, event.msg); 265 + } else { 266 + println!("{l}"); 267 + } 268 + } 269 + } 270 + Err(_) => break, 271 + } 272 + } 273 + Ok(()) 274 + } 275 + 276 + // Non-unix stubs 277 + #[cfg(not(unix))] 278 + pub async fn run_control_socket( 279 + _socket_path: String, 280 + _state: std::sync::Arc<ControlState>, 281 + ) -> anyhow::Result<()> { 282 + anyhow::bail!("control socket not supported on this platform") 283 + } 284 + 285 + #[cfg(not(unix))] 286 + pub fn client_status(_socket_path: &str) -> anyhow::Result<StatusResponse> { 287 + anyhow::bail!("control socket not supported on this platform") 288 + } 289 + 290 + #[cfg(not(unix))] 291 + pub fn client_shutdown(_socket_path: &str) -> anyhow::Result<()> { 292 + anyhow::bail!("control socket not supported on this platform") 293 + } 294 + 295 + #[cfg(not(unix))] 296 + pub fn client_logs(_socket_path: &str, _level: &str, _json: bool) -> anyhow::Result<()> { 297 + anyhow::bail!("control socket not supported on this platform") 298 + }
+343 -5
src/main.rs
··· 81 81 82 82 #[derive(Subcommand)] 83 83 enum Commands { 84 - /// Start the tandem server 84 + /// Start the tandem server (foreground) 85 85 #[command(after_help = SERVE_AFTER_HELP)] 86 86 Serve { 87 87 /// Address to listen on (e.g. 0.0.0.0:13013) ··· 90 90 /// Path to the repository directory 91 91 #[arg(long)] 92 92 repo: String, 93 + /// Log level (trace, debug, info, warn, error) 94 + #[arg(long, default_value = "info")] 95 + log_level: String, 96 + /// Log format (text, json) 97 + #[arg(long, default_value = "text")] 98 + log_format: String, 99 + /// Path to control socket 100 + #[arg(long)] 101 + control_socket: Option<String>, 102 + /// Run as daemon (internal, set by `tandem up`) 103 + #[arg(long, hide = true)] 104 + daemon: bool, 105 + /// Log file path (used in daemon mode) 106 + #[arg(long)] 107 + log_file: Option<String>, 93 108 }, 94 109 95 110 /// Initialize a tandem-backed workspace ··· 112 127 #[arg(long, env = "TANDEM_SERVER")] 113 128 server: String, 114 129 }, 130 + 131 + /// Start tandem server as a background daemon 132 + Up { 133 + /// Path to the repository directory 134 + #[arg(long)] 135 + repo: String, 136 + /// Address to listen on (e.g. 0.0.0.0:13013) 137 + #[arg(long)] 138 + listen: String, 139 + /// Log level for the daemon (trace, debug, info, warn, error) 140 + #[arg(long, default_value = "info")] 141 + log_level: String, 142 + /// Daemon log file path 143 + #[arg(long)] 144 + log_file: Option<String>, 145 + /// Path to control socket 146 + #[arg(long)] 147 + control_socket: Option<String>, 148 + }, 149 + 150 + /// Stop the tandem daemon 151 + Down { 152 + /// Path to control socket 153 + #[arg(long)] 154 + control_socket: Option<String>, 155 + }, 156 + 157 + /// Show tandem daemon status 158 + Status { 159 + /// Output as JSON 160 + #[arg(long)] 161 + json: bool, 162 + /// Path to control socket 163 + #[arg(long)] 164 + control_socket: Option<String>, 165 + }, 166 + 167 + /// Stream logs from a running tandem daemon 168 + Logs { 169 + /// Log level filter (trace, debug, info, warn, error) 170 + #[arg(long, default_value = "info")] 171 + level: String, 172 + /// Output raw JSON log lines 173 + #[arg(long)] 174 + json: bool, 175 + /// Path to control socket 176 + #[arg(long)] 177 + control_socket: Option<String>, 178 + }, 115 179 } 116 180 117 181 // ─── Dispatch ───────────────────────────────────────────────────────────────── ··· 124 188 // argument parsing — this avoids conflicts with jj global flags like 125 189 // --no-pager, --color, -R that appear before the subcommand. 126 190 match args.get(1).map(|s| s.as_str()) { 127 - None | Some("serve" | "init" | "watch" | "--help" | "-h") => {} 191 + None | Some("serve" | "init" | "watch" | "up" | "down" | "status" | "logs" | "--help" | "-h") => {} 128 192 _ => return run_jj(), 129 193 } 130 194 ··· 135 199 println!(); 136 200 ExitCode::SUCCESS 137 201 } 138 - Some(Commands::Serve { listen, repo }) => run_serve(&listen, &repo), 202 + Some(Commands::Serve { 203 + listen, 204 + repo, 205 + log_level, 206 + log_format, 207 + control_socket, 208 + daemon, 209 + log_file, 210 + }) => run_serve(&listen, &repo, &log_level, &log_format, control_socket.as_deref(), daemon, log_file.as_deref()), 139 211 Some(Commands::Init { 140 212 tandem_server, 141 213 workspace, 142 214 path, 143 215 }) => run_tandem_init(&tandem_server, &workspace, &path), 144 216 Some(Commands::Watch { server }) => run_watch(&server), 217 + Some(Commands::Up { 218 + repo, 219 + listen, 220 + log_level, 221 + log_file, 222 + control_socket, 223 + }) => run_up(&repo, &listen, &log_level, log_file.as_deref(), control_socket.as_deref()), 224 + Some(Commands::Down { control_socket }) => run_down(control_socket.as_deref()), 225 + Some(Commands::Status { 226 + json, 227 + control_socket, 228 + }) => run_status(json, control_socket.as_deref()), 229 + Some(Commands::Logs { 230 + level, 231 + json, 232 + control_socket, 233 + }) => run_logs(&level, json, control_socket.as_deref()), 145 234 } 146 235 } 147 236 ··· 157 246 158 247 // ─── Server mode ────────────────────────────────────────────────────────────── 159 248 160 - fn run_serve(listen_addr: &str, repo_path: &str) -> ExitCode { 249 + fn run_serve( 250 + listen_addr: &str, 251 + repo_path: &str, 252 + _log_level: &str, 253 + _log_format: &str, 254 + control_socket: Option<&str>, 255 + daemon: bool, 256 + log_file: Option<&str>, 257 + ) -> ExitCode { 258 + // In daemon mode, stdout/stderr are already redirected to the log file 259 + // by `run_up` before spawning this process. Nothing extra needed here. 260 + 161 261 let rt = tokio::runtime::Builder::new_current_thread() 162 262 .enable_all() 163 263 .build() 164 264 .unwrap(); 165 265 let local = tokio::task::LocalSet::new(); 166 266 167 - if let Err(err) = local.block_on(&rt, server::run_serve(listen_addr, repo_path)) { 267 + let opts = server::ServeOptions { 268 + listen_addr: listen_addr.to_string(), 269 + repo_path: repo_path.to_string(), 270 + control_socket: control_socket.map(|s| s.to_string()), 271 + daemon, 272 + log_file: log_file.map(|s| s.to_string()), 273 + }; 274 + 275 + if let Err(err) = local.block_on(&rt, server::run_serve(opts)) { 168 276 eprintln!("error: {err:#}"); 169 277 return ExitCode::FAILURE; 170 278 } 171 279 280 + ExitCode::SUCCESS 281 + } 282 + 283 + // ─── Up / Down / Status / Logs ──────────────────────────────────────────────── 284 + 285 + mod control; 286 + 287 + fn default_control_socket() -> String { 288 + let dir = std::env::temp_dir().join("tandem"); 289 + std::fs::create_dir_all(&dir).ok(); 290 + dir.join("control.sock").to_string_lossy().to_string() 291 + } 292 + 293 + fn resolve_control_socket(explicit: Option<&str>) -> String { 294 + explicit 295 + .map(|s| s.to_string()) 296 + .unwrap_or_else(default_control_socket) 297 + } 298 + 299 + fn run_up( 300 + repo: &str, 301 + listen: &str, 302 + log_level: &str, 303 + log_file: Option<&str>, 304 + control_socket: Option<&str>, 305 + ) -> ExitCode { 306 + let sock_path = resolve_control_socket(control_socket); 307 + 308 + // Check if already running by trying to connect to control socket 309 + if let Ok(status) = control::client_status(&sock_path) { 310 + if status.running { 311 + eprintln!( 312 + "tandem is already running (PID {}). Use `tandem down` first.", 313 + status.pid 314 + ); 315 + return ExitCode::FAILURE; 316 + } 317 + } 318 + 319 + // Determine log file 320 + let log_file_path = log_file.map(|s| s.to_string()).unwrap_or_else(|| { 321 + let dir = std::env::temp_dir().join("tandem"); 322 + std::fs::create_dir_all(&dir).ok(); 323 + dir.join("daemon.log").to_string_lossy().to_string() 324 + }); 325 + 326 + // Spawn tandem serve --daemon 327 + let exe = match std::env::current_exe() { 328 + Ok(e) => e, 329 + Err(e) => { 330 + eprintln!("error: cannot determine executable path: {e}"); 331 + return ExitCode::FAILURE; 332 + } 333 + }; 334 + let mut cmd = std::process::Command::new(exe); 335 + cmd.args([ 336 + "serve", 337 + "--listen", 338 + listen, 339 + "--repo", 340 + repo, 341 + "--log-level", 342 + log_level, 343 + "--control-socket", 344 + &sock_path, 345 + "--log-file", 346 + &log_file_path, 347 + "--daemon", 348 + ]); 349 + 350 + // Redirect stdout/stderr to log file for daemon 351 + let log_file_handle = match std::fs::File::create(&log_file_path) { 352 + Ok(f) => f, 353 + Err(e) => { 354 + eprintln!("error: cannot create log file {log_file_path}: {e}"); 355 + return ExitCode::FAILURE; 356 + } 357 + }; 358 + let stderr_file = match log_file_handle.try_clone() { 359 + Ok(f) => f, 360 + Err(e) => { 361 + eprintln!("error: cannot clone log file handle: {e}"); 362 + return ExitCode::FAILURE; 363 + } 364 + }; 365 + cmd.stdout(std::process::Stdio::from(log_file_handle)); 366 + cmd.stderr(std::process::Stdio::from(stderr_file)); 367 + cmd.stdin(std::process::Stdio::null()); 368 + 369 + // Inherit HOME/XDG env from current process for isolation in tests 370 + let child = match cmd.spawn() { 371 + Ok(c) => c, 372 + Err(e) => { 373 + eprintln!("error: failed to start daemon: {e}"); 374 + return ExitCode::FAILURE; 375 + } 376 + }; 377 + 378 + let pid = child.id(); 379 + 380 + // Wait for control socket to become available 381 + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10); 382 + loop { 383 + let sock = std::path::Path::new(&sock_path); 384 + if sock.exists() { 385 + #[cfg(unix)] 386 + if std::os::unix::net::UnixStream::connect(sock).is_ok() { 387 + // Verify healthy via status 388 + if let Ok(status) = control::client_status(&sock_path) { 389 + if status.running { 390 + println!("tandem running, PID {pid}"); 391 + return ExitCode::SUCCESS; 392 + } 393 + } 394 + } 395 + } 396 + if std::time::Instant::now() > deadline { 397 + eprintln!("error: daemon failed to start within timeout"); 398 + return ExitCode::FAILURE; 399 + } 400 + std::thread::sleep(std::time::Duration::from_millis(50)); 401 + } 402 + } 403 + 404 + fn run_down(control_socket: Option<&str>) -> ExitCode { 405 + let sock_path = resolve_control_socket(control_socket); 406 + 407 + // Try to get status first 408 + let status = match control::client_status(&sock_path) { 409 + Ok(s) => s, 410 + Err(_) => { 411 + eprintln!("tandem is not running"); 412 + return ExitCode::FAILURE; 413 + } 414 + }; 415 + 416 + if !status.running { 417 + eprintln!("tandem is not running"); 418 + return ExitCode::FAILURE; 419 + } 420 + 421 + let pid = status.pid; 422 + 423 + // Send shutdown 424 + if let Err(e) = control::client_shutdown(&sock_path) { 425 + eprintln!("error: shutdown request failed: {e}"); 426 + return ExitCode::FAILURE; 427 + } 428 + 429 + // Wait for process to exit 430 + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10); 431 + loop { 432 + // Check if process is still alive 433 + #[cfg(unix)] 434 + { 435 + let alive = unsafe { libc::kill(pid as libc::pid_t, 0) } == 0; 436 + if !alive { 437 + println!("tandem stopped"); 438 + return ExitCode::SUCCESS; 439 + } 440 + } 441 + #[cfg(not(unix))] 442 + { 443 + println!("tandem stopped"); 444 + return ExitCode::SUCCESS; 445 + } 446 + if std::time::Instant::now() > deadline { 447 + eprintln!("warning: daemon did not exit within timeout"); 448 + return ExitCode::FAILURE; 449 + } 450 + std::thread::sleep(std::time::Duration::from_millis(50)); 451 + } 452 + } 453 + 454 + fn run_status(json: bool, control_socket: Option<&str>) -> ExitCode { 455 + let sock_path = resolve_control_socket(control_socket); 456 + 457 + match control::client_status(&sock_path) { 458 + Ok(status) if status.running => { 459 + if json { 460 + println!("{}", serde_json::to_string_pretty(&status).unwrap()); 461 + } else { 462 + println!("tandem is running"); 463 + println!(" PID: {}", status.pid); 464 + let uptime = status.uptime_secs; 465 + if uptime >= 3600 { 466 + println!(" Uptime: {}h {}m", uptime / 3600, (uptime % 3600) / 60); 467 + } else if uptime >= 60 { 468 + println!(" Uptime: {}m {}s", uptime / 60, uptime % 60); 469 + } else { 470 + println!(" Uptime: {}s", uptime); 471 + } 472 + println!(" Repo: {}", status.repo); 473 + println!(" Listen: {}", status.listen); 474 + println!(" Version: {}", status.version); 475 + } 476 + ExitCode::SUCCESS 477 + } 478 + _ => { 479 + if json { 480 + println!("{{\"running\":false}}"); 481 + } else { 482 + eprintln!("tandem is not running"); 483 + } 484 + ExitCode::FAILURE 485 + } 486 + } 487 + } 488 + 489 + fn run_logs(level: &str, json: bool, control_socket: Option<&str>) -> ExitCode { 490 + let sock_path = resolve_control_socket(control_socket); 491 + 492 + if control::client_status(&sock_path).is_err() { 493 + eprintln!("no tandem daemon running. Start one with `tandem up`."); 494 + return ExitCode::FAILURE; 495 + } 496 + 497 + if let Err(e) = control::client_logs(&sock_path, level, json) { 498 + // Connection closed = server shut down, not an error 499 + let msg = format!("{e}"); 500 + if msg.contains("broken pipe") 501 + || msg.contains("connection reset") 502 + || msg.contains("end of file") 503 + || msg.contains("Connection reset") 504 + { 505 + return ExitCode::SUCCESS; 506 + } 507 + eprintln!("error: {e}"); 508 + return ExitCode::FAILURE; 509 + } 172 510 ExitCode::SUCCESS 173 511 } 174 512
+152 -15
src/server.rs
··· 22 22 use std::path::{Path, PathBuf}; 23 23 use std::rc::Rc; 24 24 use std::sync::{Arc, Mutex}; 25 + use tokio::sync::broadcast; 25 26 27 + use crate::control; 26 28 use crate::proto_convert; 27 29 use crate::tandem_capnp::{cancel, head_watcher, store}; 28 30 31 + fn emit_log(tx: &broadcast::Sender<control::LogEvent>, level: &str, msg: &str) { 32 + let _ = tx.send(control::LogEvent { 33 + ts: format!( 34 + "{}Z", 35 + std::time::SystemTime::now() 36 + .duration_since(std::time::UNIX_EPOCH) 37 + .unwrap_or_default() 38 + .as_secs() 39 + ), 40 + level: level.to_string(), 41 + msg: msg.to_string(), 42 + }); 43 + } 44 + 29 45 // ─── Public entry point ─────────────────────────────────────────────────────── 30 46 31 - pub async fn run_serve(listen_addr: &str, repo_path: &str) -> Result<()> { 32 - let repo = PathBuf::from(repo_path); 47 + #[allow(dead_code)] 48 + pub struct ServeOptions { 49 + pub listen_addr: String, 50 + pub repo_path: String, 51 + pub control_socket: Option<String>, 52 + pub daemon: bool, 53 + pub log_file: Option<String>, 54 + } 55 + 56 + pub async fn run_serve(opts: ServeOptions) -> Result<()> { 57 + let repo = PathBuf::from(&opts.repo_path); 33 58 let server = Rc::new(Server::new(repo)?); 34 - let listener = tokio::net::TcpListener::bind(listen_addr) 59 + let listener = tokio::net::TcpListener::bind(&opts.listen_addr) 35 60 .await 36 - .with_context(|| format!("failed to bind {listen_addr}"))?; 37 - eprintln!("tandem server listening on {}", listener.local_addr()?); 61 + .with_context(|| format!("failed to bind {}", opts.listen_addr))?; 62 + let local_addr = listener.local_addr()?; 63 + eprintln!("tandem server listening on {local_addr}"); 64 + 65 + // Set up shutdown signaling 66 + let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::channel::<()>(1); 67 + 68 + // Log broadcast channel 69 + let (log_tx, _) = broadcast::channel::<control::LogEvent>(1024); 70 + 71 + // Set up control socket if requested 72 + let control_socket_path = opts.control_socket.clone(); 73 + if let Some(ref sock_path) = control_socket_path { 74 + let control_state = Arc::new(control::ControlState { 75 + pid: std::process::id(), 76 + start_time: std::time::Instant::now(), 77 + repo: opts.repo_path.clone(), 78 + listen: local_addr.to_string(), 79 + shutdown_tx: shutdown_tx.clone(), 80 + log_tx: log_tx.clone(), 81 + }); 82 + 83 + let sock = sock_path.clone(); 84 + tokio::spawn(async move { 85 + if let Err(e) = control::run_control_socket(sock, control_state).await { 86 + eprintln!("control socket error: {e:#}"); 87 + } 88 + }); 89 + } 90 + 91 + // Emit initial log event 92 + emit_log(&log_tx, "info", &format!("tandem server listening on {local_addr}")); 93 + 94 + // Signal handling 95 + let (signal_tx, mut signal_rx) = tokio::sync::mpsc::channel::<()>(2); 96 + 97 + // Spawn signal handler (multi-threaded tokio task for signal handling) 98 + let signal_tx_clone = signal_tx.clone(); 99 + tokio::spawn(async move { 100 + let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()) 101 + .expect("install SIGINT handler"); 102 + let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) 103 + .expect("install SIGTERM handler"); 104 + 105 + let mut first_signal = true; 106 + loop { 107 + tokio::select! { 108 + _ = sigint.recv() => {}, 109 + _ = sigterm.recv() => {}, 110 + } 111 + if first_signal { 112 + first_signal = false; 113 + eprintln!("\nshutting down gracefully..."); 114 + let _ = signal_tx_clone.send(()).await; 115 + } else { 116 + eprintln!("\nforced shutdown"); 117 + std::process::exit(0); 118 + } 119 + } 120 + }); 121 + 122 + // Track in-flight connections 123 + let inflight = Rc::new(std::cell::Cell::new(0u32)); 124 + 125 + // Share log_tx with connection handlers 126 + let log_tx = Rc::new(log_tx); 38 127 128 + // Accept loop with shutdown 39 129 loop { 40 - let (stream, _) = listener.accept().await?; 41 - let server = Rc::clone(&server); 42 - tokio::task::spawn_local(async move { 43 - if let Err(err) = handle_capnp_connection(server, stream).await { 44 - eprintln!("rpc connection error: {err:#}"); 130 + tokio::select! { 131 + result = listener.accept() => { 132 + let (stream, addr) = result?; 133 + let server = Rc::clone(&server); 134 + let inflight = Rc::clone(&inflight); 135 + let log_tx = Rc::clone(&log_tx); 136 + inflight.set(inflight.get() + 1); 137 + 138 + emit_log(&log_tx, "info", &format!("client connected: {addr}")); 139 + 140 + tokio::task::spawn_local(async move { 141 + if let Err(err) = handle_capnp_connection(server, stream, &log_tx).await { 142 + eprintln!("rpc connection error: {err:#}"); 143 + } 144 + emit_log(&log_tx, "info", &format!("client disconnected: {addr}")); 145 + inflight.set(inflight.get().saturating_sub(1)); 146 + }); 147 + } 148 + _ = signal_rx.recv() => { 149 + emit_log(&log_tx, "info", "signal received, draining connections..."); 150 + eprintln!("signal received, draining connections..."); 151 + break; 152 + } 153 + _ = shutdown_rx.recv() => { 154 + emit_log(&log_tx, "info", "shutdown requested via control socket"); 155 + eprintln!("shutdown requested via control socket..."); 156 + break; 157 + } 158 + } 159 + } 160 + 161 + // Drain in-flight connections (5s timeout) 162 + if inflight.get() > 0 { 163 + eprintln!("waiting for {} in-flight connection(s)...", inflight.get()); 164 + let drain_deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(5); 165 + while inflight.get() > 0 { 166 + if tokio::time::Instant::now() > drain_deadline { 167 + eprintln!("drain timeout, {} connections remaining", inflight.get()); 168 + break; 45 169 } 46 - }); 170 + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; 171 + } 172 + } 173 + 174 + // Clean up control socket 175 + if let Some(ref sock_path) = control_socket_path { 176 + let _ = std::fs::remove_file(sock_path); 47 177 } 178 + 179 + eprintln!("tandem server stopped"); 180 + Ok(()) 48 181 } 49 182 50 183 // ─── Connection handler ─────────────────────────────────────────────────────── ··· 52 185 async fn handle_capnp_connection( 53 186 server: Rc<Server>, 54 187 stream: tokio::net::TcpStream, 188 + log_tx: &broadcast::Sender<control::LogEvent>, 55 189 ) -> Result<()> { 56 190 use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; 57 191 ··· 64 198 ); 65 199 let store_impl = StoreImpl { 66 200 server: server.clone(), 201 + log_tx: log_tx.clone(), 67 202 }; 68 203 let store_client: store::Client = capnp_rpc::new_client(store_impl); 69 204 let rpc_system = RpcSystem::new(Box::new(network), Some(store_client.client)); ··· 209 344 fn sync_op_heads_to_jj(&self, heads: &[String]) -> Result<()> { 210 345 // Clear existing head files 211 346 if let Ok(entries) = fs::read_dir(&self.op_heads_dir) { 212 - for entry in entries { 213 - if let Ok(entry) = entry { 214 - let _ = fs::remove_file(entry.path()); 215 - } 347 + for entry in entries.flatten() { 348 + let _ = fs::remove_file(entry.path()); 216 349 } 217 350 } 218 351 // Write new head files (empty files named by hex ID) ··· 547 680 548 681 struct StoreImpl { 549 682 server: Rc<Server>, 683 + log_tx: broadcast::Sender<control::LogEvent>, 550 684 } 551 685 552 686 fn capnp_err(e: anyhow::Error) -> capnp::Error { ··· 601 735 602 736 match self.server.get_object_sync(kind_str, id_bytes) { 603 737 Ok(data) => { 738 + emit_log(&self.log_tx, "debug", &format!("getObject {kind_str} size={}", data.len())); 604 739 results.get().set_data(&data); 605 740 Promise::ok(()) 606 741 } ··· 620 755 621 756 match self.server.put_object_sync(kind_str, &data) { 622 757 Ok((id, normalized)) => { 758 + emit_log(&self.log_tx, "info", &format!("putObject {kind_str} size={}", data.len())); 623 759 let mut r = results.get(); 624 760 r.set_id(&id); 625 761 r.set_normalized_data(&normalized); ··· 792 928 .update_op_heads_sync(old_ids, new_id, expected_version, workspace_id) 793 929 { 794 930 Ok(result) => { 931 + emit_log(&self.log_tx, "info", &format!("updateOpHeads ok={} version={}", result.ok, result.version)); 795 932 let mut r = results.get(); 796 933 r.set_ok(result.ok); 797 934 {
+76
tests/common/mod.rs
··· 1 + #![allow(dead_code)] 2 + 1 3 use std::path::{Path, PathBuf}; 2 4 use std::process::{Child, Command, Output, Stdio}; 3 5 use std::thread; ··· 116 118 cmd.arg(arg); 117 119 } 118 120 cmd.output().expect("run jj command") 121 + } 122 + 123 + /// Spawn a server with extra args and HOME isolation. 124 + pub fn spawn_server_with_args(repo: &Path, addr: &str, extra_args: &[&str], home: &Path) -> Child { 125 + let mut cmd = Command::new(tandem_bin()); 126 + cmd.args(["serve", "--listen", addr, "--repo", repo.to_str().unwrap()]); 127 + for arg in extra_args { 128 + cmd.arg(arg); 129 + } 130 + isolate_env(&mut cmd, home); 131 + cmd.stdout(Stdio::piped()) 132 + .stderr(Stdio::piped()) 133 + .spawn() 134 + .expect("spawn tandem serve") 135 + } 136 + 137 + /// Generate a unique control socket path inside a temp directory. 138 + pub fn control_socket_path(tmp: &Path) -> PathBuf { 139 + tmp.join("control.sock") 140 + } 141 + 142 + /// Wait for a Unix socket to appear on disk. 143 + #[cfg(unix)] 144 + pub fn wait_for_socket(path: &Path, timeout: Duration) { 145 + let deadline = Instant::now() + timeout; 146 + loop { 147 + if path.exists() { 148 + // Try connecting to verify it's listening 149 + if std::os::unix::net::UnixStream::connect(path).is_ok() { 150 + return; 151 + } 152 + } 153 + if Instant::now() > deadline { 154 + panic!( 155 + "socket {} did not appear within {:?}", 156 + path.display(), 157 + timeout 158 + ); 159 + } 160 + thread::sleep(Duration::from_millis(50)); 161 + } 162 + } 163 + 164 + /// Wait for a TCP address to become connectable (no child process to manage). 165 + pub fn wait_for_addr(addr: &str, timeout: Duration) { 166 + let deadline = Instant::now() + timeout; 167 + loop { 168 + if std::net::TcpStream::connect(addr).is_ok() { 169 + return; 170 + } 171 + if Instant::now() > deadline { 172 + panic!("address {addr} not connectable within {timeout:?}"); 173 + } 174 + thread::sleep(Duration::from_millis(50)); 175 + } 176 + } 177 + 178 + /// Send a JSON request to the control socket and read the response line. 179 + #[cfg(unix)] 180 + pub fn control_request(socket_path: &Path, request: &str) -> String { 181 + use std::io::{BufRead, BufReader, Write}; 182 + let mut stream = 183 + std::os::unix::net::UnixStream::connect(socket_path).expect("connect to control socket"); 184 + stream 185 + .set_read_timeout(Some(Duration::from_secs(5))) 186 + .ok(); 187 + stream 188 + .write_all(request.as_bytes()) 189 + .expect("write request"); 190 + stream.write_all(b"\n").expect("write newline"); 191 + let mut reader = BufReader::new(stream); 192 + let mut line = String::new(); 193 + reader.read_line(&mut line).expect("read response"); 194 + line 119 195 } 120 196 121 197 /// Run a raw git command (bypassing jj's git wrapper).
+253
tests/slice10_graceful_shutdown.rs
··· 1 + //! Slice 10: Signal handling and graceful shutdown 2 + //! 3 + //! Acceptance criteria: 4 + //! - `tandem serve` + SIGINT exits 0 (not 130). 5 + //! - In-flight `getObject` call during shutdown completes (not dropped). 6 + //! - `--log-level debug` produces debug output to stderr. 7 + //! - Existing slice 1-7 tests still pass. 8 + 9 + mod common; 10 + 11 + use std::time::Duration; 12 + use tempfile::TempDir; 13 + 14 + /// SIGINT causes clean exit with code 0. 15 + #[test] 16 + fn slice10_sigint_clean_exit() { 17 + let tmp = TempDir::new().unwrap(); 18 + let home = common::isolated_home(tmp.path()); 19 + let server_repo = tmp.path().join("server-repo"); 20 + std::fs::create_dir_all(&server_repo).unwrap(); 21 + 22 + let addr = common::free_addr(); 23 + let mut server = common::spawn_server_with_args(&server_repo, &addr, &[], &home); 24 + common::wait_for_server(&addr, &mut server); 25 + 26 + // Send SIGINT 27 + #[cfg(unix)] 28 + { 29 + unsafe { 30 + libc::kill(server.id() as libc::pid_t, libc::SIGINT); 31 + } 32 + } 33 + 34 + // Wait for exit (with timeout) 35 + let start = std::time::Instant::now(); 36 + loop { 37 + if let Some(status) = server.try_wait().expect("try_wait") { 38 + // Should exit 0 (not 130 or signal-killed) 39 + assert!( 40 + status.success(), 41 + "server should exit 0 on SIGINT, got {:?}", 42 + status.code() 43 + ); 44 + return; 45 + } 46 + if start.elapsed() > Duration::from_secs(10) { 47 + let _ = server.kill(); 48 + let _ = server.wait(); 49 + panic!("server did not exit within 10 seconds of SIGINT"); 50 + } 51 + std::thread::sleep(Duration::from_millis(50)); 52 + } 53 + } 54 + 55 + /// SIGTERM causes clean exit with code 0. 56 + #[test] 57 + fn slice10_sigterm_clean_exit() { 58 + let tmp = TempDir::new().unwrap(); 59 + let home = common::isolated_home(tmp.path()); 60 + let server_repo = tmp.path().join("server-repo"); 61 + std::fs::create_dir_all(&server_repo).unwrap(); 62 + 63 + let addr = common::free_addr(); 64 + let mut server = common::spawn_server_with_args(&server_repo, &addr, &[], &home); 65 + common::wait_for_server(&addr, &mut server); 66 + 67 + // Send SIGTERM 68 + #[cfg(unix)] 69 + { 70 + unsafe { 71 + libc::kill(server.id() as libc::pid_t, libc::SIGTERM); 72 + } 73 + } 74 + 75 + let start = std::time::Instant::now(); 76 + loop { 77 + if let Some(status) = server.try_wait().expect("try_wait") { 78 + assert!( 79 + status.success(), 80 + "server should exit 0 on SIGTERM, got {:?}", 81 + status.code() 82 + ); 83 + return; 84 + } 85 + if start.elapsed() > Duration::from_secs(10) { 86 + let _ = server.kill(); 87 + let _ = server.wait(); 88 + panic!("server did not exit within 10 seconds of SIGTERM"); 89 + } 90 + std::thread::sleep(Duration::from_millis(50)); 91 + } 92 + } 93 + 94 + /// Second SIGINT causes immediate exit. 95 + #[test] 96 + fn slice10_double_sigint_immediate_exit() { 97 + let tmp = TempDir::new().unwrap(); 98 + let home = common::isolated_home(tmp.path()); 99 + let server_repo = tmp.path().join("server-repo"); 100 + std::fs::create_dir_all(&server_repo).unwrap(); 101 + 102 + let addr = common::free_addr(); 103 + let mut server = common::spawn_server_with_args(&server_repo, &addr, &[], &home); 104 + common::wait_for_server(&addr, &mut server); 105 + 106 + #[cfg(unix)] 107 + { 108 + let pid = server.id() as libc::pid_t; 109 + unsafe { 110 + libc::kill(pid, libc::SIGINT); 111 + } 112 + // Small delay then second signal 113 + std::thread::sleep(Duration::from_millis(100)); 114 + unsafe { 115 + libc::kill(pid, libc::SIGINT); 116 + } 117 + } 118 + 119 + // Should exit quickly (within 2s) 120 + let start = std::time::Instant::now(); 121 + loop { 122 + if server.try_wait().expect("try_wait").is_some() { 123 + return; // exited — we don't require code 0 for double-signal 124 + } 125 + if start.elapsed() > Duration::from_secs(5) { 126 + let _ = server.kill(); 127 + let _ = server.wait(); 128 + panic!("server did not exit after double SIGINT"); 129 + } 130 + std::thread::sleep(Duration::from_millis(50)); 131 + } 132 + } 133 + 134 + /// --log-level flag is accepted by serve. 135 + #[test] 136 + fn slice10_log_level_flag_accepted() { 137 + let tmp = TempDir::new().unwrap(); 138 + let home = common::isolated_home(tmp.path()); 139 + let server_repo = tmp.path().join("server-repo"); 140 + std::fs::create_dir_all(&server_repo).unwrap(); 141 + 142 + let addr = common::free_addr(); 143 + let mut server = 144 + common::spawn_server_with_args(&server_repo, &addr, &["--log-level", "debug"], &home); 145 + common::wait_for_server(&addr, &mut server); 146 + 147 + // Server started successfully with --log-level debug 148 + // Send SIGINT to stop it 149 + #[cfg(unix)] 150 + { 151 + unsafe { 152 + libc::kill(server.id() as libc::pid_t, libc::SIGINT); 153 + } 154 + } 155 + 156 + let output = server.wait_with_output().expect("wait_with_output"); 157 + let stderr = String::from_utf8_lossy(&output.stderr); 158 + // Should have started (the "listening on" message proves the flag was accepted) 159 + assert!( 160 + stderr.contains("listening on"), 161 + "server should start with --log-level debug\nstderr: {stderr}" 162 + ); 163 + } 164 + 165 + /// --log-format flag is accepted by serve. 166 + #[test] 167 + fn slice10_log_format_flag_accepted() { 168 + let tmp = TempDir::new().unwrap(); 169 + let home = common::isolated_home(tmp.path()); 170 + let server_repo = tmp.path().join("server-repo"); 171 + std::fs::create_dir_all(&server_repo).unwrap(); 172 + 173 + let addr = common::free_addr(); 174 + let mut server = 175 + common::spawn_server_with_args(&server_repo, &addr, &["--log-format", "json"], &home); 176 + common::wait_for_server(&addr, &mut server); 177 + 178 + #[cfg(unix)] 179 + { 180 + unsafe { 181 + libc::kill(server.id() as libc::pid_t, libc::SIGINT); 182 + } 183 + } 184 + 185 + let output = server.wait_with_output().expect("wait_with_output"); 186 + assert!( 187 + output.status.success(), 188 + "server should accept --log-format json" 189 + ); 190 + } 191 + 192 + /// Server can still handle a full client round-trip then shutdown cleanly. 193 + #[test] 194 + fn slice10_client_roundtrip_then_shutdown() { 195 + let tmp = TempDir::new().unwrap(); 196 + let home = common::isolated_home(tmp.path()); 197 + let server_repo = tmp.path().join("server-repo"); 198 + std::fs::create_dir_all(&server_repo).unwrap(); 199 + let workspace_dir = tmp.path().join("workspace"); 200 + std::fs::create_dir_all(&workspace_dir).unwrap(); 201 + 202 + let addr = common::free_addr(); 203 + let mut server = common::spawn_server_with_args(&server_repo, &addr, &[], &home); 204 + common::wait_for_server(&addr, &mut server); 205 + 206 + // Init workspace 207 + let init = common::run_tandem_in( 208 + &workspace_dir, 209 + &["init", "--tandem-server", &addr, "."], 210 + &home, 211 + ); 212 + common::assert_ok(&init, "tandem init"); 213 + 214 + // Write a file, commit 215 + std::fs::write(workspace_dir.join("test.txt"), b"shutdown test\n").unwrap(); 216 + let new_out = common::run_tandem_in(&workspace_dir, &["new", "-m", "before shutdown"], &home); 217 + common::assert_ok(&new_out, "tandem new"); 218 + 219 + // Verify file round-trips 220 + let cat = common::run_tandem_in( 221 + &workspace_dir, 222 + &["file", "show", "-r", "@-", "test.txt"], 223 + &home, 224 + ); 225 + common::assert_ok(&cat, "file show"); 226 + assert_eq!(cat.stdout, b"shutdown test\n"); 227 + 228 + // Now signal shutdown 229 + #[cfg(unix)] 230 + { 231 + unsafe { 232 + libc::kill(server.id() as libc::pid_t, libc::SIGTERM); 233 + } 234 + } 235 + 236 + let start = std::time::Instant::now(); 237 + loop { 238 + if let Some(status) = server.try_wait().expect("try_wait") { 239 + assert!( 240 + status.success(), 241 + "server should exit 0 after SIGTERM, got {:?}", 242 + status.code() 243 + ); 244 + return; 245 + } 246 + if start.elapsed() > Duration::from_secs(10) { 247 + let _ = server.kill(); 248 + let _ = server.wait(); 249 + panic!("server did not exit after SIGTERM"); 250 + } 251 + std::thread::sleep(Duration::from_millis(50)); 252 + } 253 + }
+223
tests/slice11_control_socket.rs
··· 1 + //! Slice 11: Control socket and tandem status 2 + //! 3 + //! Acceptance criteria: 4 + //! - `tandem serve` creates control socket when --control-socket is passed. 5 + //! - `tandem status` prints human-readable output while server runs. 6 + //! - `tandem status --json` returns valid JSON with pid, uptime, repo, listen fields. 7 + //! - `tandem status` exits 1 when no server is running. 8 + //! - Control socket is cleaned up on server exit. 9 + 10 + mod common; 11 + 12 + use std::time::Duration; 13 + use tempfile::TempDir; 14 + 15 + /// Server creates control socket, tandem status --json returns valid data. 16 + #[test] 17 + fn slice11_status_json_while_running() { 18 + let tmp = TempDir::new().unwrap(); 19 + let home = common::isolated_home(tmp.path()); 20 + let server_repo = tmp.path().join("server-repo"); 21 + std::fs::create_dir_all(&server_repo).unwrap(); 22 + 23 + let addr = common::free_addr(); 24 + let sock = common::control_socket_path(tmp.path()); 25 + let sock_str = sock.to_str().unwrap(); 26 + 27 + let mut server = common::spawn_server_with_args( 28 + &server_repo, 29 + &addr, 30 + &["--control-socket", sock_str], 31 + &home, 32 + ); 33 + common::wait_for_server(&addr, &mut server); 34 + common::wait_for_socket(&sock, Duration::from_secs(5)); 35 + 36 + // Run tandem status --json 37 + let status_out = common::run_tandem_in( 38 + tmp.path(), 39 + &["status", "--json", "--control-socket", sock_str], 40 + &home, 41 + ); 42 + common::assert_ok(&status_out, "tandem status --json"); 43 + 44 + let json_str = common::stdout_str(&status_out); 45 + let parsed: serde_json::Value = serde_json::from_str(json_str.trim()) 46 + .unwrap_or_else(|e| panic!("invalid JSON from status: {e}\nraw: {json_str}")); 47 + 48 + assert_eq!(parsed["running"], true, "should report running=true"); 49 + assert!(parsed["pid"].is_number(), "should have numeric pid"); 50 + assert!( 51 + parsed["uptime_secs"].is_number(), 52 + "should have numeric uptime_secs" 53 + ); 54 + assert!(parsed["repo"].is_string(), "should have repo string"); 55 + assert!(parsed["listen"].is_string(), "should have listen string"); 56 + assert!(parsed["version"].is_string(), "should have version string"); 57 + 58 + // Cleanup 59 + #[cfg(unix)] 60 + unsafe { 61 + libc::kill(server.id() as libc::pid_t, libc::SIGINT); 62 + } 63 + let _ = server.wait(); 64 + } 65 + 66 + /// tandem status (human-readable) while server is running. 67 + #[test] 68 + fn slice11_status_human_while_running() { 69 + let tmp = TempDir::new().unwrap(); 70 + let home = common::isolated_home(tmp.path()); 71 + let server_repo = tmp.path().join("server-repo"); 72 + std::fs::create_dir_all(&server_repo).unwrap(); 73 + 74 + let addr = common::free_addr(); 75 + let sock = common::control_socket_path(tmp.path()); 76 + let sock_str = sock.to_str().unwrap(); 77 + 78 + let mut server = common::spawn_server_with_args( 79 + &server_repo, 80 + &addr, 81 + &["--control-socket", sock_str], 82 + &home, 83 + ); 84 + common::wait_for_server(&addr, &mut server); 85 + common::wait_for_socket(&sock, Duration::from_secs(5)); 86 + 87 + let status_out = common::run_tandem_in( 88 + tmp.path(), 89 + &["status", "--control-socket", sock_str], 90 + &home, 91 + ); 92 + common::assert_ok(&status_out, "tandem status"); 93 + 94 + let out = common::stdout_str(&status_out); 95 + assert!( 96 + out.contains("tandem is running"), 97 + "should say 'tandem is running'\noutput: {out}" 98 + ); 99 + assert!(out.contains("PID"), "should show PID\noutput: {out}"); 100 + 101 + // Cleanup 102 + #[cfg(unix)] 103 + unsafe { 104 + libc::kill(server.id() as libc::pid_t, libc::SIGINT); 105 + } 106 + let _ = server.wait(); 107 + } 108 + 109 + /// tandem status exits 1 when no server is running. 110 + #[test] 111 + fn slice11_status_not_running() { 112 + let tmp = TempDir::new().unwrap(); 113 + let home = common::isolated_home(tmp.path()); 114 + 115 + // Use a non-existent socket path 116 + let sock = tmp.path().join("nonexistent.sock"); 117 + let sock_str = sock.to_str().unwrap(); 118 + 119 + let status_out = common::run_tandem_in( 120 + tmp.path(), 121 + &["status", "--control-socket", sock_str], 122 + &home, 123 + ); 124 + 125 + assert!( 126 + !status_out.status.success(), 127 + "tandem status should exit 1 when no server is running" 128 + ); 129 + 130 + let combined = format!( 131 + "{}{}", 132 + common::stdout_str(&status_out), 133 + common::stderr_str(&status_out) 134 + ); 135 + assert!( 136 + combined.contains("not running"), 137 + "should say 'not running'\noutput: {combined}" 138 + ); 139 + } 140 + 141 + /// Control socket is cleaned up after server exits. 142 + #[test] 143 + fn slice11_socket_cleaned_up_on_exit() { 144 + let tmp = TempDir::new().unwrap(); 145 + let home = common::isolated_home(tmp.path()); 146 + let server_repo = tmp.path().join("server-repo"); 147 + std::fs::create_dir_all(&server_repo).unwrap(); 148 + 149 + let addr = common::free_addr(); 150 + let sock = common::control_socket_path(tmp.path()); 151 + let sock_str = sock.to_str().unwrap(); 152 + 153 + let mut server = common::spawn_server_with_args( 154 + &server_repo, 155 + &addr, 156 + &["--control-socket", sock_str], 157 + &home, 158 + ); 159 + common::wait_for_server(&addr, &mut server); 160 + common::wait_for_socket(&sock, Duration::from_secs(5)); 161 + 162 + assert!(sock.exists(), "control socket should exist while running"); 163 + 164 + // Send SIGINT 165 + #[cfg(unix)] 166 + unsafe { 167 + libc::kill(server.id() as libc::pid_t, libc::SIGINT); 168 + } 169 + 170 + let _ = server.wait(); 171 + 172 + // Socket should be cleaned up 173 + assert!( 174 + !sock.exists(), 175 + "control socket should be removed after server exit" 176 + ); 177 + } 178 + 179 + /// Control socket status endpoint reports correct repo and listen address. 180 + #[test] 181 + fn slice11_status_reports_correct_info() { 182 + let tmp = TempDir::new().unwrap(); 183 + let home = common::isolated_home(tmp.path()); 184 + let server_repo = tmp.path().join("server-repo"); 185 + std::fs::create_dir_all(&server_repo).unwrap(); 186 + 187 + let addr = common::free_addr(); 188 + let sock = common::control_socket_path(tmp.path()); 189 + let sock_str = sock.to_str().unwrap(); 190 + 191 + let mut server = common::spawn_server_with_args( 192 + &server_repo, 193 + &addr, 194 + &["--control-socket", sock_str], 195 + &home, 196 + ); 197 + common::wait_for_server(&addr, &mut server); 198 + common::wait_for_socket(&sock, Duration::from_secs(5)); 199 + 200 + let status_out = common::run_tandem_in( 201 + tmp.path(), 202 + &["status", "--json", "--control-socket", sock_str], 203 + &home, 204 + ); 205 + common::assert_ok(&status_out, "tandem status --json"); 206 + 207 + let json_str = common::stdout_str(&status_out); 208 + let parsed: serde_json::Value = serde_json::from_str(json_str.trim()).unwrap(); 209 + 210 + // The listen address should match what we passed 211 + let listen_val = parsed["listen"].as_str().unwrap(); 212 + assert!( 213 + listen_val.contains(&addr) || addr.contains(listen_val), 214 + "listen should match addr {addr}, got {listen_val}" 215 + ); 216 + 217 + // Cleanup 218 + #[cfg(unix)] 219 + unsafe { 220 + libc::kill(server.id() as libc::pid_t, libc::SIGINT); 221 + } 222 + let _ = server.wait(); 223 + }
+278
tests/slice12_up_down.rs
··· 1 + //! Slice 12: tandem up and tandem down 2 + //! 3 + //! Acceptance criteria: 4 + //! - `tandem up --repo ... --listen ...` returns immediately, daemon is running. 5 + //! - `tandem status` shows running after `tandem up`. 6 + //! - `tandem down` stops daemon, `tandem status` shows not running. 7 + //! - `tandem up` twice: second invocation errors with "already running". 8 + //! - PID file and control socket cleaned up after `tandem down`. 9 + 10 + mod common; 11 + 12 + use std::time::Duration; 13 + use tempfile::TempDir; 14 + 15 + /// tandem up starts daemon, status shows running, down stops it. 16 + #[test] 17 + fn slice12_up_status_down() { 18 + let tmp = TempDir::new().unwrap(); 19 + let home = common::isolated_home(tmp.path()); 20 + let server_repo = tmp.path().join("server-repo"); 21 + std::fs::create_dir_all(&server_repo).unwrap(); 22 + 23 + let addr = common::free_addr(); 24 + let sock = common::control_socket_path(tmp.path()); 25 + let sock_str = sock.to_str().unwrap(); 26 + let log_file = tmp.path().join("daemon.log"); 27 + let log_file_str = log_file.to_str().unwrap(); 28 + 29 + // tandem up 30 + let up_out = common::run_tandem_in( 31 + tmp.path(), 32 + &[ 33 + "up", 34 + "--repo", 35 + server_repo.to_str().unwrap(), 36 + "--listen", 37 + &addr, 38 + "--control-socket", 39 + sock_str, 40 + "--log-file", 41 + log_file_str, 42 + ], 43 + &home, 44 + ); 45 + common::assert_ok(&up_out, "tandem up"); 46 + let up_text = common::stdout_str(&up_out); 47 + assert!( 48 + up_text.contains("tandem running"), 49 + "should print 'tandem running'\noutput: {up_text}" 50 + ); 51 + 52 + // Server should be listening 53 + common::wait_for_addr(&addr, Duration::from_secs(10)); 54 + 55 + // tandem status should show running 56 + let status_out = common::run_tandem_in( 57 + tmp.path(), 58 + &["status", "--json", "--control-socket", sock_str], 59 + &home, 60 + ); 61 + common::assert_ok(&status_out, "tandem status after up"); 62 + let json_str = common::stdout_str(&status_out); 63 + let parsed: serde_json::Value = serde_json::from_str(json_str.trim()) 64 + .unwrap_or_else(|e| panic!("invalid JSON: {e}\nraw: {json_str}")); 65 + assert_eq!(parsed["running"], true); 66 + 67 + // tandem down 68 + let down_out = common::run_tandem_in( 69 + tmp.path(), 70 + &["down", "--control-socket", sock_str], 71 + &home, 72 + ); 73 + common::assert_ok(&down_out, "tandem down"); 74 + let down_text = format!( 75 + "{}{}", 76 + common::stdout_str(&down_out), 77 + common::stderr_str(&down_out) 78 + ); 79 + assert!( 80 + down_text.contains("tandem stopped"), 81 + "should print 'tandem stopped'\noutput: {down_text}" 82 + ); 83 + 84 + // Wait a moment for cleanup 85 + std::thread::sleep(Duration::from_millis(500)); 86 + 87 + // tandem status should show not running 88 + let status_after = common::run_tandem_in( 89 + tmp.path(), 90 + &["status", "--control-socket", sock_str], 91 + &home, 92 + ); 93 + assert!( 94 + !status_after.status.success(), 95 + "status should exit 1 after down" 96 + ); 97 + 98 + // Control socket should be cleaned up 99 + assert!( 100 + !sock.exists(), 101 + "control socket should be removed after down" 102 + ); 103 + } 104 + 105 + /// tandem up twice returns error on second invocation. 106 + #[test] 107 + fn slice12_up_twice_errors() { 108 + let tmp = TempDir::new().unwrap(); 109 + let home = common::isolated_home(tmp.path()); 110 + let server_repo = tmp.path().join("server-repo"); 111 + std::fs::create_dir_all(&server_repo).unwrap(); 112 + 113 + let addr = common::free_addr(); 114 + let sock = common::control_socket_path(tmp.path()); 115 + let sock_str = sock.to_str().unwrap(); 116 + let log_file = tmp.path().join("daemon.log"); 117 + let log_file_str = log_file.to_str().unwrap(); 118 + 119 + // First up 120 + let up1 = common::run_tandem_in( 121 + tmp.path(), 122 + &[ 123 + "up", 124 + "--repo", 125 + server_repo.to_str().unwrap(), 126 + "--listen", 127 + &addr, 128 + "--control-socket", 129 + sock_str, 130 + "--log-file", 131 + log_file_str, 132 + ], 133 + &home, 134 + ); 135 + common::assert_ok(&up1, "first tandem up"); 136 + 137 + // Second up should fail 138 + let addr2 = common::free_addr(); 139 + let up2 = common::run_tandem_in( 140 + tmp.path(), 141 + &[ 142 + "up", 143 + "--repo", 144 + server_repo.to_str().unwrap(), 145 + "--listen", 146 + &addr2, 147 + "--control-socket", 148 + sock_str, 149 + "--log-file", 150 + log_file_str, 151 + ], 152 + &home, 153 + ); 154 + assert!( 155 + !up2.status.success(), 156 + "second tandem up should fail" 157 + ); 158 + let combined = format!( 159 + "{}{}", 160 + common::stdout_str(&up2), 161 + common::stderr_str(&up2) 162 + ); 163 + assert!( 164 + combined.contains("already running"), 165 + "should say 'already running'\noutput: {combined}" 166 + ); 167 + 168 + // Cleanup: bring it down 169 + let _ = common::run_tandem_in( 170 + tmp.path(), 171 + &["down", "--control-socket", sock_str], 172 + &home, 173 + ); 174 + std::thread::sleep(Duration::from_millis(500)); 175 + } 176 + 177 + /// Full round-trip: up → init workspace → write file → read file → down. 178 + #[test] 179 + fn slice12_up_roundtrip_down() { 180 + let tmp = TempDir::new().unwrap(); 181 + let home = common::isolated_home(tmp.path()); 182 + let server_repo = tmp.path().join("server-repo"); 183 + std::fs::create_dir_all(&server_repo).unwrap(); 184 + let workspace_dir = tmp.path().join("workspace"); 185 + std::fs::create_dir_all(&workspace_dir).unwrap(); 186 + 187 + let addr = common::free_addr(); 188 + let sock = common::control_socket_path(tmp.path()); 189 + let sock_str = sock.to_str().unwrap(); 190 + let log_file = tmp.path().join("daemon.log"); 191 + let log_file_str = log_file.to_str().unwrap(); 192 + 193 + // Start daemon 194 + let up_out = common::run_tandem_in( 195 + tmp.path(), 196 + &[ 197 + "up", 198 + "--repo", 199 + server_repo.to_str().unwrap(), 200 + "--listen", 201 + &addr, 202 + "--control-socket", 203 + sock_str, 204 + "--log-file", 205 + log_file_str, 206 + ], 207 + &home, 208 + ); 209 + common::assert_ok(&up_out, "tandem up"); 210 + 211 + // Init workspace 212 + let init = common::run_tandem_in( 213 + &workspace_dir, 214 + &["init", "--tandem-server", &addr, "."], 215 + &home, 216 + ); 217 + common::assert_ok(&init, "tandem init"); 218 + 219 + // Write a file 220 + std::fs::write(workspace_dir.join("hello.txt"), b"daemon test\n").unwrap(); 221 + let new_out = common::run_tandem_in(&workspace_dir, &["new", "-m", "daemon write"], &home); 222 + common::assert_ok(&new_out, "tandem new"); 223 + 224 + // Read the file back 225 + let cat = common::run_tandem_in( 226 + &workspace_dir, 227 + &["file", "show", "-r", "@-", "hello.txt"], 228 + &home, 229 + ); 230 + common::assert_ok(&cat, "file show"); 231 + assert_eq!(cat.stdout, b"daemon test\n", "file content round-trip"); 232 + 233 + // Bring it down 234 + let down_out = common::run_tandem_in( 235 + tmp.path(), 236 + &["down", "--control-socket", sock_str], 237 + &home, 238 + ); 239 + common::assert_ok(&down_out, "tandem down"); 240 + std::thread::sleep(Duration::from_millis(500)); 241 + 242 + // Verify stopped 243 + let status = common::run_tandem_in( 244 + tmp.path(), 245 + &["status", "--control-socket", sock_str], 246 + &home, 247 + ); 248 + assert!(!status.status.success(), "status should fail after down"); 249 + } 250 + 251 + /// tandem down with no daemon running exits 1. 252 + #[test] 253 + fn slice12_down_not_running() { 254 + let tmp = TempDir::new().unwrap(); 255 + let home = common::isolated_home(tmp.path()); 256 + 257 + let sock = tmp.path().join("nonexistent.sock"); 258 + let sock_str = sock.to_str().unwrap(); 259 + 260 + let down_out = common::run_tandem_in( 261 + tmp.path(), 262 + &["down", "--control-socket", sock_str], 263 + &home, 264 + ); 265 + assert!( 266 + !down_out.status.success(), 267 + "tandem down with no daemon should exit 1" 268 + ); 269 + let combined = format!( 270 + "{}{}", 271 + common::stdout_str(&down_out), 272 + common::stderr_str(&down_out) 273 + ); 274 + assert!( 275 + combined.contains("not running"), 276 + "should say not running\noutput: {combined}" 277 + ); 278 + }
+252
tests/slice13_log_streaming.rs
··· 1 + //! Slice 13: tandem logs (streaming) 2 + //! 3 + //! Acceptance criteria: 4 + //! - `tandem logs` prints log lines as events happen. 5 + //! - `tandem logs --level debug` shows debug events. 6 + //! - `tandem logs --json` outputs one JSON object per line. 7 + //! - `tandem logs` exits cleanly when daemon shuts down. 8 + //! - `tandem logs` with no daemon: exit 1 with helpful message. 9 + 10 + mod common; 11 + 12 + use std::process::{Command, Stdio}; 13 + use std::time::Duration; 14 + use tempfile::TempDir; 15 + 16 + /// tandem logs with no daemon running exits 1. 17 + #[test] 18 + fn slice13_logs_not_running() { 19 + let tmp = TempDir::new().unwrap(); 20 + let home = common::isolated_home(tmp.path()); 21 + 22 + let sock = tmp.path().join("nonexistent.sock"); 23 + let sock_str = sock.to_str().unwrap(); 24 + 25 + let out = common::run_tandem_in( 26 + tmp.path(), 27 + &["logs", "--control-socket", sock_str], 28 + &home, 29 + ); 30 + assert!( 31 + !out.status.success(), 32 + "tandem logs with no daemon should exit 1" 33 + ); 34 + let combined = format!( 35 + "{}{}", 36 + common::stdout_str(&out), 37 + common::stderr_str(&out) 38 + ); 39 + assert!( 40 + combined.contains("not running") || combined.contains("no tandem daemon running"), 41 + "should mention not running\noutput: {combined}" 42 + ); 43 + } 44 + 45 + /// tandem logs --json streams JSON log lines when activity happens. 46 + #[test] 47 + fn slice13_logs_json_streams_events() { 48 + let tmp = TempDir::new().unwrap(); 49 + let home = common::isolated_home(tmp.path()); 50 + let server_repo = tmp.path().join("server-repo"); 51 + std::fs::create_dir_all(&server_repo).unwrap(); 52 + let workspace_dir = tmp.path().join("workspace"); 53 + std::fs::create_dir_all(&workspace_dir).unwrap(); 54 + 55 + let addr = common::free_addr(); 56 + let sock = common::control_socket_path(tmp.path()); 57 + let sock_str = sock.to_str().unwrap(); 58 + 59 + let mut server = common::spawn_server_with_args( 60 + &server_repo, 61 + &addr, 62 + &["--control-socket", sock_str], 63 + &home, 64 + ); 65 + common::wait_for_server(&addr, &mut server); 66 + common::wait_for_socket(&sock, Duration::from_secs(5)); 67 + 68 + // Start tandem logs --json in background 69 + let mut logs_cmd = Command::new(common::tandem_bin()); 70 + logs_cmd.args(["logs", "--json", "--control-socket", sock_str]); 71 + common::isolate_env(&mut logs_cmd, &home); 72 + logs_cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); 73 + let mut logs_child = logs_cmd.spawn().expect("spawn tandem logs"); 74 + 75 + // Give logs time to connect 76 + std::thread::sleep(Duration::from_millis(500)); 77 + 78 + // Create activity: init workspace and write a file 79 + let init = common::run_tandem_in( 80 + &workspace_dir, 81 + &["init", "--tandem-server", &addr, "."], 82 + &home, 83 + ); 84 + common::assert_ok(&init, "tandem init"); 85 + 86 + std::fs::write(workspace_dir.join("log-test.txt"), b"log content\n").unwrap(); 87 + let new_out = common::run_tandem_in(&workspace_dir, &["new", "-m", "log test"], &home); 88 + common::assert_ok(&new_out, "tandem new"); 89 + 90 + // Give logs time to receive events 91 + std::thread::sleep(Duration::from_millis(1000)); 92 + 93 + // Kill the logs process and read what it captured 94 + let _ = logs_child.kill(); 95 + let output = logs_child.wait_with_output().expect("wait logs"); 96 + let stdout = String::from_utf8_lossy(&output.stdout); 97 + 98 + // Should have received at least one JSON line 99 + let lines: Vec<&str> = stdout.lines().filter(|l| !l.trim().is_empty()).collect(); 100 + assert!( 101 + !lines.is_empty(), 102 + "tandem logs --json should have produced output\nstdout: {stdout}" 103 + ); 104 + 105 + // Each line should be valid JSON 106 + for line in &lines { 107 + let parsed: Result<serde_json::Value, _> = serde_json::from_str(line); 108 + assert!( 109 + parsed.is_ok(), 110 + "each log line should be valid JSON\nline: {line}" 111 + ); 112 + } 113 + 114 + // Cleanup 115 + #[cfg(unix)] 116 + unsafe { 117 + libc::kill(server.id() as libc::pid_t, libc::SIGINT); 118 + } 119 + let _ = server.wait(); 120 + } 121 + 122 + /// tandem logs exits cleanly when server shuts down. 123 + #[test] 124 + fn slice13_logs_exits_on_shutdown() { 125 + let tmp = TempDir::new().unwrap(); 126 + let home = common::isolated_home(tmp.path()); 127 + let server_repo = tmp.path().join("server-repo"); 128 + std::fs::create_dir_all(&server_repo).unwrap(); 129 + 130 + let addr = common::free_addr(); 131 + let sock = common::control_socket_path(tmp.path()); 132 + let sock_str = sock.to_str().unwrap(); 133 + 134 + let mut server = common::spawn_server_with_args( 135 + &server_repo, 136 + &addr, 137 + &["--control-socket", sock_str], 138 + &home, 139 + ); 140 + common::wait_for_server(&addr, &mut server); 141 + common::wait_for_socket(&sock, Duration::from_secs(5)); 142 + 143 + // Start logs process 144 + let mut logs_cmd = Command::new(common::tandem_bin()); 145 + logs_cmd.args(["logs", "--control-socket", sock_str]); 146 + common::isolate_env(&mut logs_cmd, &home); 147 + logs_cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); 148 + let mut logs_child = logs_cmd.spawn().expect("spawn tandem logs"); 149 + 150 + std::thread::sleep(Duration::from_millis(500)); 151 + 152 + // Shut down server 153 + #[cfg(unix)] 154 + unsafe { 155 + libc::kill(server.id() as libc::pid_t, libc::SIGINT); 156 + } 157 + let _ = server.wait(); 158 + 159 + // logs process should exit within a few seconds 160 + let start = std::time::Instant::now(); 161 + loop { 162 + if let Some(_status) = logs_child.try_wait().expect("try_wait logs") { 163 + return; // Exited — success 164 + } 165 + if start.elapsed() > Duration::from_secs(10) { 166 + let _ = logs_child.kill(); 167 + let _ = logs_child.wait(); 168 + panic!("tandem logs did not exit after server shutdown"); 169 + } 170 + std::thread::sleep(Duration::from_millis(100)); 171 + } 172 + } 173 + 174 + /// tandem logs --level debug shows more output than --level warn. 175 + #[test] 176 + fn slice13_logs_level_filtering() { 177 + let tmp = TempDir::new().unwrap(); 178 + let home = common::isolated_home(tmp.path()); 179 + let server_repo = tmp.path().join("server-repo"); 180 + std::fs::create_dir_all(&server_repo).unwrap(); 181 + let workspace_dir = tmp.path().join("workspace"); 182 + std::fs::create_dir_all(&workspace_dir).unwrap(); 183 + 184 + let addr = common::free_addr(); 185 + let sock = common::control_socket_path(tmp.path()); 186 + let sock_str = sock.to_str().unwrap(); 187 + 188 + let mut server = common::spawn_server_with_args( 189 + &server_repo, 190 + &addr, 191 + &["--control-socket", sock_str], 192 + &home, 193 + ); 194 + common::wait_for_server(&addr, &mut server); 195 + common::wait_for_socket(&sock, Duration::from_secs(5)); 196 + 197 + // Start logs at debug level 198 + let mut debug_cmd = Command::new(common::tandem_bin()); 199 + debug_cmd.args(["logs", "--json", "--level", "debug", "--control-socket", sock_str]); 200 + common::isolate_env(&mut debug_cmd, &home); 201 + debug_cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); 202 + let mut debug_child = debug_cmd.spawn().expect("spawn debug logs"); 203 + 204 + // Start logs at warn level 205 + let mut warn_cmd = Command::new(common::tandem_bin()); 206 + warn_cmd.args(["logs", "--json", "--level", "warn", "--control-socket", sock_str]); 207 + common::isolate_env(&mut warn_cmd, &home); 208 + warn_cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); 209 + let mut warn_child = warn_cmd.spawn().expect("spawn warn logs"); 210 + 211 + std::thread::sleep(Duration::from_millis(500)); 212 + 213 + // Generate activity 214 + let init = common::run_tandem_in( 215 + &workspace_dir, 216 + &["init", "--tandem-server", &addr, "."], 217 + &home, 218 + ); 219 + common::assert_ok(&init, "tandem init"); 220 + 221 + std::fs::write(workspace_dir.join("level-test.txt"), b"level test\n").unwrap(); 222 + let new_out = common::run_tandem_in(&workspace_dir, &["new", "-m", "level test"], &home); 223 + common::assert_ok(&new_out, "tandem new"); 224 + 225 + std::thread::sleep(Duration::from_millis(1000)); 226 + 227 + // Kill both and compare 228 + let _ = debug_child.kill(); 229 + let _ = warn_child.kill(); 230 + let debug_out = debug_child.wait_with_output().expect("debug wait"); 231 + let warn_out = warn_child.wait_with_output().expect("warn wait"); 232 + 233 + let debug_stdout = String::from_utf8_lossy(&debug_out.stdout).to_string(); 234 + let warn_stdout = String::from_utf8_lossy(&warn_out.stdout).to_string(); 235 + 236 + let debug_count = debug_stdout.lines().filter(|l| !l.trim().is_empty()).count(); 237 + let warn_count = warn_stdout.lines().filter(|l| !l.trim().is_empty()).count(); 238 + 239 + // Debug should have at least as many lines as warn (usually more) 240 + // Normal server activity (connections, object reads) emit at info/debug level 241 + assert!( 242 + debug_count >= warn_count, 243 + "debug ({debug_count} lines) should have >= warn ({warn_count} lines)", 244 + ); 245 + 246 + // Cleanup 247 + #[cfg(unix)] 248 + unsafe { 249 + libc::kill(server.id() as libc::pid_t, libc::SIGINT); 250 + } 251 + let _ = server.wait(); 252 + }