personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

link: port test_nonces to solstone fork

+84
+84
tests/link/test_nonces.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from __future__ import annotations 5 + 6 + from pathlib import Path 7 + 8 + from think.link.nonces import NONCE_TTL_SECONDS, Nonce, NonceStore 9 + 10 + 11 + def test_add_and_consume(tmp_path: Path) -> None: 12 + store = NonceStore(tmp_path / "nonces.json") 13 + 14 + added = store.add("abc123", "phone", now=1000) 15 + consumed = store.consume("abc123", now=1001) 16 + 17 + assert added == Nonce( 18 + value="abc123", 19 + device_label="phone", 20 + issued_at=1000, 21 + expires_at=1000 + NONCE_TTL_SECONDS, 22 + used=False, 23 + ) 24 + assert consumed == Nonce( 25 + value="abc123", 26 + device_label="phone", 27 + issued_at=1000, 28 + expires_at=1000 + NONCE_TTL_SECONDS, 29 + used=True, 30 + ) 31 + 32 + 33 + def test_consume_is_single_use(tmp_path: Path) -> None: 34 + store = NonceStore(tmp_path / "nonces.json") 35 + 36 + store.add("abc123", "phone", now=1000) 37 + 38 + assert store.consume("abc123", now=1001) is not None 39 + assert store.consume("abc123", now=1002) is None 40 + 41 + 42 + def test_expired_nonce_rejected(tmp_path: Path) -> None: 43 + store = NonceStore(tmp_path / "nonces.json") 44 + 45 + store.add("abc123", "phone", now=1000) 46 + 47 + assert store.consume("abc123", now=1000 + NONCE_TTL_SECONDS + 1) is None 48 + 49 + 50 + def test_unknown_nonce_returns_none(tmp_path: Path) -> None: 51 + store = NonceStore(tmp_path / "nonces.json") 52 + 53 + assert store.consume("never-added") is None 54 + 55 + 56 + def test_gc_removes_expired_and_used(tmp_path: Path) -> None: 57 + store = NonceStore(tmp_path / "nonces.json") 58 + 59 + store.add("live", "device", now=1000) 60 + store.add("used", "device", now=1000) 61 + store.consume("used", now=1001) 62 + 63 + removed = store.gc(now=1001) 64 + 65 + assert removed == 1 66 + assert [entry.value for entry in store.snapshot()] == ["live"] 67 + 68 + store.add("fresh", "device", now=2000) 69 + store.add("also_expired", "device", now=2000 - NONCE_TTL_SECONDS - 10) 70 + store.gc(now=2000) 71 + 72 + assert {entry.value for entry in store.snapshot()} == {"fresh"} 73 + 74 + 75 + def test_persistence_across_store_instances(tmp_path: Path) -> None: 76 + path = tmp_path / "nonces.json" 77 + first = NonceStore(path) 78 + first.add("shared", "device", now=1000) 79 + 80 + second = NonceStore(path) 81 + entry = second.consume("shared", now=1001) 82 + 83 + assert entry is not None 84 + assert entry.value == "shared"