this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

cartool diff

+112 -78
+1
pyproject.toml
··· 22 22 "dag-cbor", 23 23 "multiformats", 24 24 "more-itertools", 25 + "lru-dict", 25 26 ] 26 27 27 28 [project.optional-dependencies]
+63 -62
src/atmst/cartool.py
··· 2 2 import os 3 3 import base64 4 4 import json 5 + from typing import Tuple, Any 5 6 6 7 import dag_cbor 7 8 from multiformats import CID, varint 8 9 9 10 from .blockstore.car_file import ReadOnlyCARBlockStore 11 + from .blockstore import OverlayBlockStore 10 12 from .mst.node_store import NodeStore 11 13 from .mst.node_walker import NodeWalker 14 + from .mst.diff import mst_diff, record_diff 12 15 13 16 14 17 class ATJsonEncoder(json.JSONEncoder): ··· 22 25 def prettify_record(record) -> str: 23 26 return json.dumps(record, indent=" ", cls=ATJsonEncoder) 24 27 28 + def open_car(car_path: str) -> Tuple[ReadOnlyCARBlockStore, Any, NodeWalker]: 29 + carfile = open(car_path, "rb") 30 + bs = ReadOnlyCARBlockStore(carfile) 31 + commit = dag_cbor.decode(bs.get_block(bytes(bs.car_root))) 32 + nw = NodeWalker(NodeStore(bs), commit["data"]) 33 + return bs, commit, nw 34 + 25 35 def print_info(car_path: str) -> None: 26 36 print(f"Reading {car_path!r}") 27 37 print(f"Size on disk: {os.stat(car_path).st_size} bytes") 28 - with open(car_path, "rb") as carfile: 29 - bs = ReadOnlyCARBlockStore(carfile) 30 - print("Total CAR blocks:", len(bs.block_offsets)) 31 - print("Root CID:", bs.car_root.encode("base32")) 32 - commit = dag_cbor.decode(bs.get_block(bytes(bs.car_root))) 33 - print() 34 - print("ATProto commit info:") 35 - print("Version:", commit["version"]) 36 - if commit["version"] != 3: 37 - print(f"Error: only v3 repo format is supported. Got:", commit["version"]) 38 - return 39 - print("Repo:", commit["did"]) 40 - print("Rev:", commit["rev"]) 41 - print("Sig:", base64.urlsafe_b64encode(commit["sig"]).decode()) 42 - print("MST root:", commit["data"].encode("base32")) 38 + bs, commit, _ = open_car(car_path) 39 + print("Total CAR blocks:", len(bs.block_offsets)) 40 + print("Root CID:", bs.car_root.encode("base32")) 41 + print() 42 + print("ATProto commit info:") 43 + print("Version:", commit["version"]) 44 + if commit["version"] != 3: 45 + print(f"Error: only v3 repo format is supported. Got:", commit["version"]) 46 + return 47 + print("Repo:", commit["did"]) 48 + print("Rev:", commit["rev"]) 49 + print("Sig:", base64.urlsafe_b64encode(commit["sig"]).decode()) 50 + print("MST root:", commit["data"].encode("base32")) 43 51 44 52 def print_all_records(car_path: str, to_json: bool) -> None: 45 - with open(car_path, "rb") as carfile: 46 - bs = ReadOnlyCARBlockStore(carfile) 47 - ns = NodeStore(bs) 48 - commit = dag_cbor.decode(bs.get_block(bytes(bs.car_root))) 49 - nw = NodeWalker(ns, commit["data"]) 50 - for k, v in nw.iter_kv(): 51 - if to_json: 52 - record = dag_cbor.decode(bs.get_block(bytes(v))) 53 - print(f"{k} -> {prettify_record(record)}") 54 - else: 55 - print(f"{k} -> {v.encode('base32')}") 53 + bs, _, nw = open_car(car_path) 54 + for k, v in nw.iter_kv(): 55 + if to_json: 56 + record = dag_cbor.decode(bs.get_block(bytes(v))) 57 + print(f"{json.dumps(k)} -> {prettify_record(record)}") 58 + else: 59 + print(f"{json.dumps(k)} -> {v.encode('base32')}") 56 60 57 61 def list_all(car_path: str): 58 62 print_all_records(car_path, to_json=False) ··· 62 66 print_all_records(car_path, to_json=True) 63 67 64 68 def dump_record(car_path: str, key: str): 65 - with open(car_path, "rb") as carfile: 66 - bs = ReadOnlyCARBlockStore(carfile) 67 - ns = NodeStore(bs) 68 - commit = dag_cbor.decode(bs.get_block(bytes(bs.car_root))) 69 - nw = NodeWalker(ns, commit["data"]) 70 - val = nw.find_value(key) 71 - if val is None: 72 - print("Record not found!", file=sys.stderr) 73 - sys.exit(-1) 74 - record = dag_cbor.decode(bs.get_block(bytes(val))) 75 - print(prettify_record(record)) 69 + bs, _, nw = open_car(car_path) 70 + val = nw.find_value(key) 71 + if val is None: 72 + print("Record not found!", file=sys.stderr) 73 + sys.exit(-1) 74 + record = dag_cbor.decode(bs.get_block(bytes(val))) 75 + print(prettify_record(record)) 76 76 77 77 def write_block(file, data): 78 78 file.write(varint.encode(len(data))) 79 79 file.write(data) 80 80 81 81 def compact(car_in: str, car_out: str): 82 - with open(car_in, "rb") as carfile_in: 83 - with open(car_out, "wb") as carfile_out: 84 - bs = ReadOnlyCARBlockStore(carfile_in) 85 - 86 - new_header = dag_cbor.encode({ 87 - "version": 1, 88 - "roots": [bs.car_root] 89 - }) 90 - write_block(carfile_out, new_header) 91 - 92 - commit_blob = bs.get_block(bytes(bs.car_root)) 93 - commit = dag_cbor.decode(commit_blob) 94 - 95 - write_block(carfile_out, bytes(bs.car_root) + commit_blob) 96 - dedup = {bs.car_root} 82 + bs, commit, nw = open_car(car_in) 83 + with open(car_out, "wb") as carfile_out: 84 + new_header = dag_cbor.encode({ 85 + "version": 1, 86 + "roots": [bs.car_root] 87 + }) 88 + write_block(carfile_out, new_header) 89 + write_block(carfile_out, bytes(bs.car_root) + dag_cbor.encode(commit)) 90 + dedup = {bs.car_root} 97 91 98 - ns = NodeStore(bs) 99 - nw = NodeWalker(ns, commit["data"]) 92 + for node in nw.iter_nodes(): 93 + if node.cid not in dedup: 94 + write_block(carfile_out, bytes(node.cid) + node.serialised) 95 + dedup.add(node.cid) 96 + for v in node.vals: 97 + if v not in dedup: 98 + write_block(carfile_out, bytes(v) + bs.get_block(bytes(v))) 99 + dedup.add(v) 100 100 101 - for node in nw.iter_nodes(): 102 - if node.cid not in dedup: 103 - write_block(carfile_out, bytes(node.cid) + node.serialised) 104 - dedup.add(node.cid) 105 - for v in node.vals: 106 - if v not in dedup: 107 - write_block(carfile_out, bytes(v) + bs.get_block(bytes(v))) 108 - dedup.add(v) 101 + def print_record_diff(car_a: str, car_b: str): 102 + bs_a, commit_a, _ = open_car(car_a) 103 + bs_b, commit_b, _ = open_car(car_b) 104 + bs = OverlayBlockStore(bs_a, bs_b) 105 + ns = NodeStore(bs) 106 + mst_created, mst_deleted = mst_diff(ns, commit_a["data"], commit_b["data"]) 107 + for delta in record_diff(ns, mst_created, mst_deleted): 108 + print(delta) 109 109 110 110 COMMANDS = { 111 111 "info": (print_info, "print CAR header and repo info"), ··· 113 113 "dump": (dump_all, "dump all records in the CAR (values as JSON)"), 114 114 "dump_record": (dump_record, "dump a single record, keyed on ('collection/rkey')"), 115 115 "compact": (compact, "rewrite the whole CAR, dropping any duplicated or unreferenced blocks"), 116 + "diff": (print_record_diff, "list the record diff between two CAR files") 116 117 } 117 118 118 119 def print_help():
+42 -13
src/atmst/mst/diff.py
··· 1 1 import operator 2 - from typing import Tuple, Set, Iterable 2 + from typing import Tuple, Set, Dict, Iterable, Optional 3 + from enum import Enum 4 + from dataclasses import dataclass 3 5 from functools import reduce 6 + import json 4 7 5 8 from multiformats import CID 6 9 ··· 8 11 from .node_store import NodeStore 9 12 from .node_walker import NodeWalker 10 13 14 + class DeltaType(Enum): 15 + CREATED = 1 16 + UPDATED = 2 17 + DELETED = 3 11 18 12 - def record_diff(ns: NodeStore, created: set[CID], deleted: set[CID]) -> Iterable[tuple]: 19 + @dataclass 20 + class RecordDelta: 21 + delta_type: DeltaType 22 + key: str 23 + prior_value: Optional[CID] 24 + later_value: Optional[CID] 25 + 26 + def __repr__(self) -> str: 27 + prior = "NULL" if self.prior_value is None else self.prior_value.encode('base32') 28 + later = "NULL" if self.later_value is None else self.later_value.encode('base32') 29 + return f"{self.delta_type.name} {json.dumps(self.key)}: {prior} -> {later}" 30 + 31 + def record_diff(ns: NodeStore, created: set[CID], deleted: set[CID]) -> Iterable[RecordDelta]: 13 32 """ 14 33 Given two sets of MST nodes (for example, the result of :meth:`mst_diff`), this 15 - returns an iterator of record changes, in one of 3 formats: :: 16 - 17 - ("created", key, value) 18 - ("updated", key, old_value, new_value) 19 - ("deleted", key, value) 20 - 34 + returns an iterator of record changes. 21 35 """ 22 - created_kv = reduce(operator.__or__, ({ k: v for k, v in zip(node.keys, node.vals)} for node in map(ns.get_node, created)), {}) 23 - deleted_kv = reduce(operator.__or__, ({ k: v for k, v in zip(node.keys, node.vals)} for node in map(ns.get_node, deleted)), {}) 36 + created_kv: Dict[str, CID] = dict(sum((list(zip(node.keys, node.vals)) for node in map(ns.get_node, created)), [])) 37 + deleted_kv: Dict[str, CID] = dict(sum((list(zip(node.keys, node.vals)) for node in map(ns.get_node, deleted)), [])) 24 38 for created_key in created_kv.keys() - deleted_kv.keys(): 25 - yield ("created", created_key, created_kv[created_key].encode("base32")) 39 + yield RecordDelta( 40 + delta_type=DeltaType.CREATED, 41 + key=created_key, 42 + prior_value=None, 43 + later_value=created_kv[created_key] 44 + ) 26 45 for updated_key in created_kv.keys() & deleted_kv.keys(): 27 46 v1 = created_kv[updated_key] 28 47 v2 = deleted_kv[updated_key] 29 48 if v1 != v2: 30 - yield ("updated", updated_key, v1.encode("base32"), v2.encode("base32")) 49 + yield RecordDelta( 50 + delta_type=DeltaType.UPDATED, 51 + key=updated_key, 52 + prior_value=v1, 53 + later_value=v2 54 + ) 31 55 for deleted_key in deleted_kv.keys() - created_kv.keys(): 32 - yield ("deleted", deleted_key, deleted_kv[deleted_key].encode("base32")) #XXX: encode() is just for debugging 56 + yield RecordDelta( 57 + delta_type=DeltaType.DELETED, 58 + key=deleted_key, 59 + prior_value=None, 60 + later_value=deleted_kv[deleted_key] 61 + ) 33 62 34 63 def very_slow_mst_diff(ns: NodeStore, root_a: CID, root_b: CID): 35 64 """
+2 -2
src/atmst/mst/node_walker.py
··· 36 36 ns: NodeStore 37 37 stack: List[StackFrame] 38 38 39 - def __init__(self, ns: NodeStore, root_cid: CID, lkey: Optional[str]=KEY_MIN, rkey: Optional[str]=KEY_MAX) -> None: 39 + def __init__(self, ns: NodeStore, root_cid: Optional[CID], lkey: Optional[str]=KEY_MIN, rkey: Optional[str]=KEY_MAX) -> None: 40 40 self.ns = ns 41 41 self.stack = [self.StackFrame( 42 - node=self.ns.get_node(root_cid), 42 + node=MSTNode.empty_root() if root_cid is None else self.ns.get_node(root_cid), 43 43 lkey=lkey, 44 44 rkey=rkey, 45 45 idx=0
+4 -1
src/atmst/util.py
··· 6 6 ISTR = " " 7 7 return ISTR + msg.replace("\n", "\n"+ISTR) 8 8 9 - @lru_cache(maxsize=1024) # unreasonably effective, lol 9 + @lru_cache(maxsize=64) # unreasonably effective, lol 10 10 def hash_to_cid(data: bytes, codec="dag-cbor") -> CID: 11 + """ 12 + NB: don't use this function with large blobs! They'll take up too much space in the LRU cache. 13 + """ 11 14 #digest = b"\x12\x20" + hashlib.sha256(data).digest() 12 15 digest = multihash.digest(data, "sha2-256") 13 16 return CID("base32", 1, codec, digest)