Rockbox open source high quality audio player as a Music Player Daemon
mpris rockbox mpd libadwaita audio rust zig deno
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 43 lines 1.7 kB view raw
1"""Bluetooth pairing and discovery (Linux-only on the server).""" 2 3from __future__ import annotations 4 5from ..transport import HttpTransport 6from ..types import BluetoothDevice 7from ._fragments import BLUETOOTH_DEVICE_FIELDS 8 9 10class BluetoothApi: 11 def __init__(self, http: HttpTransport) -> None: 12 self._http = http 13 14 async def devices(self) -> list[BluetoothDevice]: 15 """List paired/known Bluetooth devices.""" 16 data = await self._http.execute( 17 f"{BLUETOOTH_DEVICE_FIELDS} " 18 "query BluetoothDevices { bluetoothDevices { ...BluetoothDeviceFields } }" 19 ) 20 return [BluetoothDevice.model_validate(d) for d in data.get("bluetoothDevices", [])] 21 22 async def scan(self, timeout_secs: int | None = None) -> list[BluetoothDevice]: 23 """Trigger a scan for nearby devices and return discoveries.""" 24 data = await self._http.execute( 25 f"{BLUETOOTH_DEVICE_FIELDS} " 26 "mutation BluetoothScan($timeoutSecs: Int) " 27 "{ bluetoothScan(timeoutSecs: $timeoutSecs) { ...BluetoothDeviceFields } }", 28 {"timeoutSecs": timeout_secs}, 29 ) 30 return [BluetoothDevice.model_validate(d) for d in data.get("bluetoothScan", [])] 31 32 async def connect(self, address: str) -> None: 33 await self._http.execute( 34 "mutation BluetoothConnect($address: String!) { bluetoothConnect(address: $address) }", 35 {"address": address}, 36 ) 37 38 async def disconnect(self, address: str) -> None: 39 await self._http.execute( 40 "mutation BluetoothDisconnect($address: String!) " 41 "{ bluetoothDisconnect(address: $address) }", 42 {"address": address}, 43 )