personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

pairing: add shared bearer auth helper

+282
+108
convey/auth.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Shared pairing auth helpers.""" 5 + 6 + from __future__ import annotations 7 + 8 + import logging 9 + from functools import wraps 10 + from typing import Any, Callable, TypeVar, cast 11 + 12 + from flask import g, jsonify, request, session 13 + from werkzeug.security import check_password_hash 14 + 15 + from think.pairing.devices import Device, find_device_by_session_key_hash 16 + from think.pairing.keys import hash_session_key, mask_session_key 17 + from think.utils import get_config 18 + 19 + logger = logging.getLogger(__name__) 20 + 21 + F = TypeVar("F", bound=Callable[..., Any]) 22 + 23 + 24 + def extract_bearer_token() -> str | None: 25 + header = request.headers.get("Authorization") 26 + if not isinstance(header, str): 27 + return None 28 + scheme, _, value = header.partition(" ") 29 + if scheme.lower() != "bearer": 30 + return None 31 + token = value.strip() 32 + return token or None 33 + 34 + 35 + def resolve_paired_device() -> Device | None: 36 + token = extract_bearer_token() 37 + if token is None: 38 + return None 39 + masked = mask_session_key(token) 40 + session_key_hash = hash_session_key(token) 41 + device = find_device_by_session_key_hash(session_key_hash) 42 + if device is None: 43 + logger.debug("paired device not found session_key=%s", masked) 44 + return None 45 + logger.debug("paired device resolved id=%s session_key=%s", device["id"], masked) 46 + return device 47 + 48 + 49 + def _check_basic_auth() -> bool: 50 + auth = request.authorization 51 + if not auth or auth.type != "basic": 52 + return False 53 + password_hash = str(get_config().get("convey", {}).get("password_hash", "") or "") 54 + if not password_hash: 55 + return False 56 + return check_password_hash(password_hash, auth.password or "") 57 + 58 + 59 + def _is_setup_complete() -> bool: 60 + return bool(get_config().get("setup", {}).get("completed_at")) 61 + 62 + 63 + def is_owner_authed() -> bool: 64 + if session.get("logged_in"): 65 + return True 66 + if _check_basic_auth(): 67 + return True 68 + if not _is_setup_complete(): 69 + return False 70 + config = get_config() 71 + if not config.get("convey", {}).get("trust_localhost", False): 72 + return False 73 + remote_addr = request.remote_addr 74 + is_localhost = remote_addr in ("127.0.0.1", "::1", "localhost") 75 + proxy_headers = ( 76 + request.headers.get("X-Forwarded-For") 77 + or request.headers.get("X-Real-IP") 78 + or request.headers.get("X-Forwarded-Host") 79 + ) 80 + return bool(is_localhost and not proxy_headers) 81 + 82 + 83 + def require_paired_device(func: F) -> F: 84 + @wraps(func) 85 + def wrapped(*args: Any, **kwargs: Any) -> Any: 86 + device = resolve_paired_device() 87 + if device is None: 88 + return ( 89 + jsonify( 90 + { 91 + "error": "paired device required", 92 + "reason": "auth_required", 93 + } 94 + ), 95 + 401, 96 + ) 97 + g.paired_device = device 98 + return func(*args, **kwargs) 99 + 100 + return cast(F, wrapped) 101 + 102 + 103 + __all__ = [ 104 + "extract_bearer_token", 105 + "is_owner_authed", 106 + "require_paired_device", 107 + "resolve_paired_device", 108 + ]
+174
tests/test_pairing_auth.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from __future__ import annotations 5 + 6 + import base64 7 + import json 8 + 9 + from flask import Flask, g, jsonify, session 10 + from werkzeug.security import generate_password_hash 11 + 12 + from convey.auth import ( 13 + extract_bearer_token, 14 + is_owner_authed, 15 + require_paired_device, 16 + resolve_paired_device, 17 + ) 18 + from think.pairing.devices import register_device 19 + from think.pairing.keys import hash_session_key 20 + 21 + 22 + def _write_config(journal_copy, payload: dict) -> None: 23 + (journal_copy / "config" / "journal.json").write_text( 24 + json.dumps(payload), encoding="utf-8" 25 + ) 26 + 27 + 28 + def _read_config(journal_copy) -> dict: 29 + return json.loads((journal_copy / "config" / "journal.json").read_text("utf-8")) 30 + 31 + 32 + def _create_app() -> Flask: 33 + app = Flask(__name__) 34 + app.secret_key = "test-secret" 35 + app.config["TESTING"] = True 36 + 37 + @app.get("/bearer") 38 + def bearer_view(): 39 + return jsonify( 40 + { 41 + "token": extract_bearer_token(), 42 + "device_id": (resolve_paired_device() or {}).get("id"), 43 + } 44 + ) 45 + 46 + @app.get("/owner") 47 + def owner_view(): 48 + return jsonify({"owner_authed": is_owner_authed()}) 49 + 50 + @app.get("/paired") 51 + @require_paired_device 52 + def paired_view(): 53 + return jsonify({"device_id": g.paired_device["id"]}) 54 + 55 + @app.get("/session-login") 56 + def session_login(): 57 + session["logged_in"] = True 58 + session.permanent = True 59 + return jsonify({"ok": True}) 60 + 61 + return app 62 + 63 + 64 + def test_extract_bearer_token_handles_missing_and_malformed_headers(journal_copy): 65 + app = _create_app() 66 + client = app.test_client() 67 + 68 + assert client.get("/bearer").get_json() == {"token": None, "device_id": None} 69 + assert client.get("/bearer", headers={"Authorization": "Bearer"}).get_json() == { 70 + "token": None, 71 + "device_id": None, 72 + } 73 + assert client.get("/bearer", headers={"Authorization": "Basic abc"}).get_json() == { 74 + "token": None, 75 + "device_id": None, 76 + } 77 + 78 + 79 + def test_resolve_paired_device_requires_matching_hash(journal_copy): 80 + app = _create_app() 81 + client = app.test_client() 82 + register_device( 83 + name="Phone", 84 + platform="ios", 85 + public_key="ssh-ed25519 AAAAauth", 86 + session_key_hash=hash_session_key("dsk_real"), 87 + bundle_id="org.solpbc.solstone-swift", 88 + app_version="0.1.0", 89 + paired_at="2026-04-20T15:31:02Z", 90 + ) 91 + 92 + missing = client.get("/bearer", headers={"Authorization": "Bearer dsk_wrong"}) 93 + found = client.get("/bearer", headers={"Authorization": "Bearer dsk_real"}) 94 + 95 + assert missing.get_json() == {"token": "dsk_wrong", "device_id": None} 96 + assert found.get_json()["token"] == "dsk_real" 97 + assert found.get_json()["device_id"].startswith("dev_") 98 + 99 + 100 + def test_is_owner_authed_via_basic_auth(journal_copy): 101 + payload = _read_config(journal_copy) 102 + payload["convey"]["password_hash"] = generate_password_hash("test123") 103 + payload["setup"] = {"completed_at": 1700000000000} 104 + _write_config(journal_copy, payload) 105 + 106 + app = _create_app() 107 + client = app.test_client() 108 + creds = base64.b64encode(b":test123").decode("ascii") 109 + 110 + response = client.get( 111 + "/owner", 112 + headers={ 113 + "Authorization": f"Basic {creds}", 114 + "X-Forwarded-For": "1.2.3.4", 115 + }, 116 + ) 117 + 118 + assert response.get_json() == {"owner_authed": True} 119 + 120 + 121 + def test_is_owner_authed_via_session_cookie(journal_copy): 122 + app = _create_app() 123 + client = app.test_client() 124 + 125 + client.get("/session-login") 126 + response = client.get("/owner") 127 + 128 + assert response.get_json() == {"owner_authed": True} 129 + 130 + 131 + def test_is_owner_authed_via_trust_localhost(journal_copy): 132 + payload = _read_config(journal_copy) 133 + payload["convey"]["trust_localhost"] = True 134 + payload["setup"] = {"completed_at": 1700000000000} 135 + payload["convey"].pop("password_hash", None) 136 + _write_config(journal_copy, payload) 137 + 138 + app = _create_app() 139 + client = app.test_client() 140 + response = client.get("/owner") 141 + 142 + assert response.get_json() == {"owner_authed": True} 143 + 144 + 145 + def test_require_paired_device_returns_401_json_without_bearer(journal_copy): 146 + app = _create_app() 147 + client = app.test_client() 148 + 149 + response = client.get("/paired") 150 + 151 + assert response.status_code == 401 152 + assert response.get_json() == { 153 + "error": "paired device required", 154 + "reason": "auth_required", 155 + } 156 + 157 + 158 + def test_require_paired_device_sets_g_paired_device(journal_copy): 159 + device = register_device( 160 + name="Phone", 161 + platform="ios", 162 + public_key="ssh-ed25519 AAAApaired", 163 + session_key_hash=hash_session_key("dsk_paired"), 164 + bundle_id="org.solpbc.solstone-swift", 165 + app_version="0.1.0", 166 + paired_at="2026-04-20T15:31:02Z", 167 + ) 168 + app = _create_app() 169 + client = app.test_client() 170 + 171 + response = client.get("/paired", headers={"Authorization": "Bearer dsk_paired"}) 172 + 173 + assert response.status_code == 200 174 + assert response.get_json() == {"device_id": device["id"]}