perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix SQLite binary blob handling and repair

alice b28a3fc0 f2ecb2ba

+610 -107
+369 -107
lib/ATProto/PDS/Store/SQLite.pm
··· 5 5 use feature 'signatures'; 6 6 no warnings 'experimental::signatures'; 7 7 8 - use DBI; 8 + use DBI qw(:sql_types); 9 9 use Exporter 'import'; 10 10 use File::Basename qw(dirname); 11 11 use File::Path qw(make_path); 12 12 use JSON::PP qw(decode_json encode_json); 13 + use ATProto::PDS::Repo::CAR qw(read_car); 14 + use ATProto::PDS::Repo::CID qw(CID_CODEC_DAG_CBOR CID_CODEC_RAW); 15 + use ATProto::PDS::Repo::DagCbor qw(decode_dag_cbor); 13 16 use ATProto::PDS::Metrics::Store qw(observe_store_operation); 14 17 15 18 our @EXPORT_OK = qw(default_migrations); ··· 105 108 my $handle = $args{handle} // die 'handle is required'; 106 109 my $now = $args{created_at} // time; 107 110 108 - $self->dbh->do( 111 + _execute_sql( 112 + $self->dbh, 109 113 q{ 110 114 INSERT INTO accounts ( 111 115 id, account_id, did, handle, email, password_hash, password_salt, ··· 114 118 repo_commit_cid, repo_root_cid, repo_rev, invites_disabled, invite_note 115 119 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 116 120 }, 117 - undef, 118 - $account_id, 119 - $account_id, 120 - $did, 121 - $handle, 122 - $args{email}, 123 - $args{password_hash}, 124 - $args{password_salt}, 125 - $now, 126 - $now, 127 - $args{deactivated_at}, 128 - $args{deleted_at}, 129 - $args{email_confirmed_at}, 130 - _maybe_json($args{did_doc}), 131 - $args{private_key}, 132 - $args{public_key}, 133 - $args{public_key_multibase}, 134 - $args{signing_key_did}, 135 - $args{repo_commit_cid}, 136 - $args{repo_root_cid}, 137 - $args{repo_rev}, 138 - $args{invites_disabled} ? 1 : 0, 139 - $args{invite_note}, 121 + [ 122 + $account_id, 123 + $account_id, 124 + $did, 125 + $handle, 126 + $args{email}, 127 + $args{password_hash}, 128 + $args{password_salt}, 129 + $now, 130 + $now, 131 + $args{deactivated_at}, 132 + $args{deleted_at}, 133 + $args{email_confirmed_at}, 134 + _maybe_json($args{did_doc}), 135 + $args{private_key}, 136 + $args{public_key}, 137 + $args{public_key_multibase}, 138 + $args{signing_key_did}, 139 + $args{repo_commit_cid}, 140 + $args{repo_root_cid}, 141 + $args{repo_rev}, 142 + $args{invites_disabled} ? 1 : 0, 143 + $args{invite_note}, 144 + ], 145 + { 7 => 1, 14 => 1, 15 => 1 }, 140 146 ); 141 147 142 148 return $self->get_account_by_did($did); ··· 160 166 161 167 push @sets, 'updated_at = ?'; 162 168 push @bind, ($changes{updated_at} // time), $did; 163 - $self->dbh->do( 169 + my %blob_positions = map { 170 + my $column = $sets[$_]; 171 + (($column =~ /^(?:password_salt|private_key|public_key) = \?$/) ? ($_ + 1 => 1) : ()) 172 + } 0 .. $#sets; 173 + _execute_sql( 174 + $self->dbh, 164 175 'UPDATE accounts SET ' . join(', ', @sets) . ' WHERE did = ?', 165 - undef, 166 - @bind, 176 + \@bind, 177 + \%blob_positions, 167 178 ); 168 179 return $self->get_account_by_did($did); 169 180 } ··· 455 466 my $cid = $args{cid} // die 'cid is required'; 456 467 my $now = $args{updated_at} // time; 457 468 458 - $self->dbh->do( 469 + _execute_sql( 470 + $self->dbh, 459 471 q{ 460 472 INSERT INTO records ( 461 473 did, collection, rkey, cid, value_json, record_bytes, created_at, updated_at ··· 466 478 record_bytes = excluded.record_bytes, 467 479 updated_at = excluded.updated_at 468 480 }, 469 - undef, 470 - $did, 471 - $collection, 472 - $rkey, 473 - $cid, 474 - encode_json($args{value}), 475 - $args{record_bytes}, 476 - $args{created_at} // $now, 477 - $now, 481 + [ 482 + $did, 483 + $collection, 484 + $rkey, 485 + $cid, 486 + encode_json($args{value}), 487 + $args{record_bytes}, 488 + $args{created_at} // $now, 489 + $now, 490 + ], 491 + { 6 => 1 }, 478 492 ); 479 493 480 494 return $self->get_record($did, $collection, $rkey); ··· 484 498 my $dbh = $self->dbh; 485 499 $dbh->do(q{DELETE FROM records WHERE did = ?}, undef, $did); 486 500 for my $record (@$records) { 487 - $dbh->do( 501 + _execute_sql( 502 + $dbh, 488 503 q{ 489 504 INSERT INTO records ( 490 505 did, collection, rkey, cid, value_json, record_bytes, created_at, updated_at 491 506 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) 492 507 }, 493 - undef, 494 - $did, 495 - $record->{collection}, 496 - $record->{rkey}, 497 - $record->{cid}, 498 - encode_json($record->{value}), 499 - $record->{record_bytes}, 500 - $record->{created_at} // time, 501 - $record->{updated_at} // time, 508 + [ 509 + $did, 510 + $record->{collection}, 511 + $record->{rkey}, 512 + $record->{cid}, 513 + encode_json($record->{value}), 514 + $record->{record_bytes}, 515 + $record->{created_at} // time, 516 + $record->{updated_at} // time, 517 + ], 518 + { 6 => 1 }, 502 519 ); 503 520 } 504 521 return 1; ··· 592 609 sub put_block ($self, %args) { 593 610 my $cid = $args{cid} // die 'cid is required'; 594 611 my $now = $args{created_at} // time; 595 - $self->dbh->do( 612 + _execute_sql( 613 + $self->dbh, 596 614 q{ 597 615 INSERT INTO blocks (cid, codec, bytes, created_at) 598 616 VALUES (?, ?, ?, ?) ··· 600 618 codec = excluded.codec, 601 619 bytes = excluded.bytes 602 620 }, 603 - undef, 604 - $cid, 605 - $args{codec}, 606 - $args{bytes}, 607 - $now, 621 + [ 622 + $cid, 623 + $args{codec}, 624 + $args{bytes}, 625 + $now, 626 + ], 627 + { 3 => 1 }, 608 628 ); 609 629 return $self->get_block($cid); 610 630 } 611 631 612 632 sub get_block ($self, $cid) { 613 - return $self->dbh->selectrow_hashref( 633 + return _row_from_blob_columns($self->dbh->selectrow_hashref( 614 634 q{SELECT * FROM blocks WHERE cid = ?}, 615 635 undef, 616 636 $cid, 617 - ); 637 + ), qw(bytes)); 618 638 } 619 639 620 640 sub get_blocks ($self, $cids) { ··· 625 645 { Slice => {} }, 626 646 @$cids, 627 647 ); 628 - return $rows; 648 + return [ map { _row_from_blob_columns($_, qw(bytes)) } @$rows ]; 629 649 } 630 650 631 651 sub put_commit ($self, %args) { 632 652 my $did = $args{did} // die 'did is required'; 633 653 my $now = $args{created_at} // time; 634 - $self->dbh->do( 654 + _execute_sql( 655 + $self->dbh, 635 656 q{ 636 657 INSERT INTO commits ( 637 658 did, rev, cid, root_cid, prev_cid, commit_bytes, car_bytes, created_at 638 659 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) 639 660 }, 640 - undef, 641 - $did, 642 - $args{rev}, 643 - $args{cid}, 644 - $args{root_cid}, 645 - $args{prev_cid}, 646 - $args{commit_bytes}, 647 - $args{car_bytes}, 648 - $now, 661 + [ 662 + $did, 663 + $args{rev}, 664 + $args{cid}, 665 + $args{root_cid}, 666 + $args{prev_cid}, 667 + $args{commit_bytes}, 668 + $args{car_bytes}, 669 + $now, 670 + ], 671 + { 6 => 1, 7 => 1 }, 649 672 ); 650 673 $self->set_repo_head( 651 674 did => $did, ··· 659 682 } 660 683 661 684 sub get_commit_by_rev ($self, $did, $rev) { 662 - return $self->dbh->selectrow_hashref( 685 + return _row_from_blob_columns($self->dbh->selectrow_hashref( 663 686 q{SELECT * FROM commits WHERE did = ? AND rev = ?}, 664 687 undef, 665 688 $did, 666 689 $rev, 667 - ); 690 + ), qw(commit_bytes car_bytes)); 668 691 } 669 692 670 693 sub get_latest_commit ($self, $did) { 671 - return $self->dbh->selectrow_hashref( 694 + return _row_from_blob_columns($self->dbh->selectrow_hashref( 672 695 q{SELECT * FROM commits WHERE did = ? ORDER BY created_at DESC, rev DESC LIMIT 1}, 673 696 undef, 674 697 $did, 675 - ); 698 + ), qw(commit_bytes car_bytes)); 676 699 } 677 700 678 701 sub repo_car ($self, $did) { ··· 761 784 sub append_event ($self, %args) { 762 785 return observe_store_operation($self->{metrics}, 'append_event', sub { 763 786 my $now = $args{created_at} // time; 764 - $self->dbh->do( 787 + _execute_sql( 788 + $self->dbh, 765 789 q{ 766 790 INSERT INTO events ( 767 791 did, type, rev, commit_cid, payload_json, car_bytes, created_at 768 792 ) VALUES (?, ?, ?, ?, ?, ?, ?) 769 793 }, 770 - undef, 771 - $args{did}, 772 - $args{type}, 773 - $args{rev}, 774 - $args{commit_cid}, 775 - _maybe_json($args{payload}), 776 - $args{car_bytes}, 777 - $now, 794 + [ 795 + $args{did}, 796 + $args{type}, 797 + $args{rev}, 798 + $args{commit_cid}, 799 + _maybe_json($args{payload}), 800 + $args{car_bytes}, 801 + $now, 802 + ], 803 + { 6 => 1 }, 778 804 ); 779 805 return $self->dbh->sqlite_last_insert_rowid; 780 806 }); ··· 784 810 my $limit = $args{limit} // 100; 785 811 my $sql = q{SELECT * FROM events WHERE seq > ? ORDER BY seq LIMIT ?}; 786 812 my $rows = $self->dbh->selectall_arrayref($sql, { Slice => {} }, $cursor // 0, $limit); 787 - return [ map { _row_from_json_columns($_, qw(payload_json)) } @$rows ]; 813 + return [ map { _row_from_blob_columns(_row_from_json_columns($_, qw(payload_json)), qw(car_bytes)) } @$rows ]; 788 814 } 789 815 790 816 sub list_events_from ($self, $cursor, %args) { ··· 792 818 my $limit = $args{limit} // 100; 793 819 my $sql = q{SELECT * FROM events WHERE seq >= ? ORDER BY seq LIMIT ?}; 794 820 my $rows = $self->dbh->selectall_arrayref($sql, { Slice => {} }, $cursor // 0, $limit); 795 - return [ map { _row_from_json_columns($_, qw(payload_json)) } @$rows ]; 821 + return [ map { _row_from_blob_columns(_row_from_json_columns($_, qw(payload_json)), qw(car_bytes)) } @$rows ]; 796 822 }); 797 823 } 798 824 ··· 1129 1155 my $uri = $args{uri} // die 'uri is required'; 1130 1156 my $val = $args{val} // die 'val is required'; 1131 1157 my $now = $args{created_at} // time; 1132 - $self->dbh->do( 1158 + _execute_sql( 1159 + $self->dbh, 1133 1160 q{ 1134 1161 INSERT INTO labels ( 1135 1162 subject_key, src, uri, cid, val, exp, sig, created_at, updated_at ··· 1141 1168 sig = excluded.sig, 1142 1169 updated_at = excluded.updated_at 1143 1170 }, 1144 - undef, 1145 - $subject_key, 1146 - $src, 1147 - $uri, 1148 - $args{cid}, 1149 - $val, 1150 - $args{exp}, 1151 - $args{sig}, 1152 - $now, 1153 - $args{updated_at} // $now, 1171 + [ 1172 + $subject_key, 1173 + $src, 1174 + $uri, 1175 + $args{cid}, 1176 + $val, 1177 + $args{exp}, 1178 + $args{sig}, 1179 + $now, 1180 + $args{updated_at} // $now, 1181 + ], 1182 + { 7 => 1 }, 1154 1183 ); 1155 1184 return $self->get_label( 1156 1185 subject_key => $subject_key, ··· 1161 1190 } 1162 1191 1163 1192 sub get_label ($self, %args) { 1164 - return $self->dbh->selectrow_hashref( 1193 + return _row_from_blob_columns($self->dbh->selectrow_hashref( 1165 1194 q{ 1166 1195 SELECT * FROM labels 1167 1196 WHERE subject_key = ? AND src = ? AND val = ? ··· 1170 1199 $args{subject_key}, 1171 1200 $args{src}, 1172 1201 $args{val}, 1173 - ); 1202 + ), qw(sig)); 1174 1203 } 1175 1204 1176 1205 sub delete_label ($self, %args) { ··· 1217 1246 $next_cursor = $items[-1]{id}; 1218 1247 } 1219 1248 return { 1220 - items => \@items, 1249 + items => [ map { _row_from_blob_columns($_, qw(sig)) } @items ], 1221 1250 cursor => $next_cursor, 1222 1251 }; 1223 1252 }); ··· 1226 1255 sub reserve_signing_key ($self, %args) { 1227 1256 my $did = $args{did} // die 'did is required'; 1228 1257 my $now = $args{created_at} // time; 1229 - $self->dbh->do( 1258 + _execute_sql( 1259 + $self->dbh, 1230 1260 q{ 1231 1261 INSERT INTO reserved_signing_keys ( 1232 1262 did, private_key, public_key, public_key_multibase, signing_key_did, created_at, claimed_at ··· 1239 1269 created_at = excluded.created_at, 1240 1270 claimed_at = excluded.claimed_at 1241 1271 }, 1242 - undef, 1243 - $did, 1244 - $args{private_key}, 1245 - $args{public_key}, 1246 - $args{public_key_multibase}, 1247 - $args{signing_key_did}, 1248 - $now, 1249 - $args{claimed_at}, 1272 + [ 1273 + $did, 1274 + $args{private_key}, 1275 + $args{public_key}, 1276 + $args{public_key_multibase}, 1277 + $args{signing_key_did}, 1278 + $now, 1279 + $args{claimed_at}, 1280 + ], 1281 + { 2 => 1, 3 => 1 }, 1250 1282 ); 1251 1283 return $self->get_reserved_signing_key($did); 1252 1284 } 1253 1285 1254 1286 sub get_reserved_signing_key ($self, $did) { 1255 - return $self->dbh->selectrow_hashref( 1287 + return _row_from_blob_columns($self->dbh->selectrow_hashref( 1256 1288 q{SELECT * FROM reserved_signing_keys WHERE did = ?}, 1257 1289 undef, 1258 1290 $did, 1259 - ); 1291 + ), qw(private_key public_key)); 1260 1292 } 1261 1293 1262 1294 sub claim_reserved_signing_key ($self, $did, %args) { ··· 1267 1299 $did, 1268 1300 ); 1269 1301 return $self->get_reserved_signing_key($did); 1302 + } 1303 + 1304 + sub repair_binary_columns ($self) { 1305 + my %counts = ( 1306 + accounts => 0, 1307 + reserved_signing_keys => 0, 1308 + blocks => 0, 1309 + records => 0, 1310 + commits => 0, 1311 + events => 0, 1312 + labels => 0, 1313 + ); 1314 + 1315 + $self->txn(sub ($dbh) { 1316 + $counts{accounts} = _repair_blob_rows( 1317 + $dbh, 1318 + select_sql => q{SELECT did, password_salt, private_key, public_key FROM accounts}, 1319 + update_sql => q{UPDATE accounts SET password_salt = ?, private_key = ?, public_key = ? WHERE did = ?}, 1320 + columns => [qw(password_salt private_key public_key)], 1321 + validate => sub ($row, $candidate, $column) { 1322 + return 1 unless defined $candidate; 1323 + return length($candidate) == 16 if $column eq 'password_salt'; 1324 + return length($candidate) == 32 if $column eq 'private_key'; 1325 + return length($candidate) == 65 if $column eq 'public_key'; 1326 + return 0; 1327 + }, 1328 + params_for => sub ($row, $fixed) { 1329 + return @$fixed{qw(password_salt private_key public_key)}, $row->{did}; 1330 + }, 1331 + ); 1332 + 1333 + $counts{reserved_signing_keys} = _repair_blob_rows( 1334 + $dbh, 1335 + select_sql => q{SELECT did, private_key, public_key FROM reserved_signing_keys}, 1336 + update_sql => q{UPDATE reserved_signing_keys SET private_key = ?, public_key = ? WHERE did = ?}, 1337 + columns => [qw(private_key public_key)], 1338 + validate => sub ($row, $candidate, $column) { 1339 + return 1 unless defined $candidate; 1340 + return length($candidate) == 32 if $column eq 'private_key'; 1341 + return length($candidate) == 65 if $column eq 'public_key'; 1342 + return 0; 1343 + }, 1344 + params_for => sub ($row, $fixed) { 1345 + return @$fixed{qw(private_key public_key)}, $row->{did}; 1346 + }, 1347 + ); 1348 + 1349 + $counts{blocks} = _repair_blob_rows( 1350 + $dbh, 1351 + select_sql => q{SELECT cid, codec, bytes FROM blocks}, 1352 + update_sql => q{UPDATE blocks SET bytes = ? WHERE cid = ?}, 1353 + columns => [qw(bytes)], 1354 + validate => sub ($row, $candidate, $column = undef) { 1355 + return _valid_block_bytes($row->{cid}, $row->{codec}, $candidate); 1356 + }, 1357 + params_for => sub ($row, $fixed) { 1358 + return $fixed->{bytes}, $row->{cid}; 1359 + }, 1360 + ); 1361 + 1362 + $counts{records} = _repair_blob_rows( 1363 + $dbh, 1364 + select_sql => q{SELECT did, collection, rkey, cid, record_bytes FROM records}, 1365 + update_sql => q{UPDATE records SET record_bytes = ? WHERE did = ? AND collection = ? AND rkey = ?}, 1366 + columns => [qw(record_bytes)], 1367 + validate => sub ($row, $candidate, $column = undef) { 1368 + return _valid_dag_cbor_cid($row->{cid}, $candidate); 1369 + }, 1370 + params_for => sub ($row, $fixed) { 1371 + return $fixed->{record_bytes}, @$row{qw(did collection rkey)}; 1372 + }, 1373 + ); 1374 + 1375 + $counts{commits} = _repair_blob_rows( 1376 + $dbh, 1377 + select_sql => q{SELECT did, rev, cid, commit_bytes, car_bytes FROM commits}, 1378 + update_sql => q{UPDATE commits SET commit_bytes = ?, car_bytes = ? WHERE did = ? AND rev = ?}, 1379 + columns => [qw(commit_bytes car_bytes)], 1380 + validate => sub ($row, $candidate, $column) { 1381 + return _valid_dag_cbor_cid($row->{cid}, $candidate) if $column eq 'commit_bytes'; 1382 + return _valid_car_for_commit($row->{cid}, $candidate); 1383 + }, 1384 + params_for => sub ($row, $fixed) { 1385 + return @$fixed{qw(commit_bytes car_bytes)}, @$row{qw(did rev)}; 1386 + }, 1387 + ); 1388 + 1389 + $counts{events} = _repair_blob_rows( 1390 + $dbh, 1391 + select_sql => q{SELECT seq, type, commit_cid, car_bytes FROM events WHERE car_bytes IS NOT NULL}, 1392 + update_sql => q{UPDATE events SET car_bytes = ? WHERE seq = ?}, 1393 + columns => [qw(car_bytes)], 1394 + validate => sub ($row, $candidate, $column = undef) { 1395 + return _valid_event_car($row, $candidate); 1396 + }, 1397 + params_for => sub ($row, $fixed) { 1398 + return $fixed->{car_bytes}, $row->{seq}; 1399 + }, 1400 + ); 1401 + 1402 + $counts{labels} = _repair_blob_rows( 1403 + $dbh, 1404 + select_sql => q{SELECT id, sig FROM labels WHERE sig IS NOT NULL}, 1405 + update_sql => q{UPDATE labels SET sig = ? WHERE id = ?}, 1406 + columns => [qw(sig)], 1407 + validate => sub ($row, $candidate, $column = undef) { 1408 + return 1 unless defined $candidate; 1409 + return length($candidate) == 64; 1410 + }, 1411 + params_for => sub ($row, $fixed) { 1412 + return $fixed->{sig}, $row->{id}; 1413 + }, 1414 + ); 1415 + }); 1416 + 1417 + return \%counts; 1270 1418 } 1271 1419 1272 1420 sub reserve_handle ($self, $handle, %args) { ··· 1726 1874 1727 1875 sub _row_to_account ($self, $row) { 1728 1876 return undef unless $row; 1877 + _row_from_blob_columns($row, qw(password_salt private_key public_key)); 1729 1878 if (defined $row->{did_doc_json} && length $row->{did_doc_json}) { 1730 1879 $row->{did_doc} = decode_json($row->{did_doc_json}); 1731 1880 } ··· 1747 1896 1748 1897 sub _row_to_record ($row) { 1749 1898 return undef unless $row; 1899 + _row_from_blob_columns($row, qw(record_bytes)); 1750 1900 $row->{value} = decode_json($row->{value_json}) if defined $row->{value_json}; 1751 1901 delete $row->{value_json}; 1752 1902 return $row; 1903 + } 1904 + 1905 + sub _execute_sql ($dbh, $sql, $params = undef, $blob_positions = undef) { 1906 + my $sth = $dbh->prepare($sql); 1907 + my %blob = map { $_ => 1 } keys %{ $blob_positions // {} }; 1908 + my $values = $params // []; 1909 + for my $index (0 .. $#$values) { 1910 + my $position = $index + 1; 1911 + if ($blob{$position}) { 1912 + $sth->bind_param($position, _normalize_blob_scalar($values->[$index]), SQL_BLOB); 1913 + next; 1914 + } 1915 + $sth->bind_param($position, $values->[$index]); 1916 + } 1917 + $sth->execute; 1918 + return $sth; 1919 + } 1920 + 1921 + sub _row_from_blob_columns ($row, @columns) { 1922 + return undef unless $row; 1923 + for my $column (@columns) { 1924 + next unless exists $row->{$column}; 1925 + $row->{$column} = _normalize_blob_scalar($row->{$column}); 1926 + } 1927 + return $row; 1928 + } 1929 + 1930 + sub _normalize_blob_scalar ($value) { 1931 + return undef unless defined $value; 1932 + my $copy = $value; 1933 + utf8::downgrade($copy, 1); 1934 + return $copy; 1935 + } 1936 + 1937 + sub _demangle_utf8_blob ($value) { 1938 + return undef unless defined $value; 1939 + my $copy = _normalize_blob_scalar($value); 1940 + return $copy unless length $copy; 1941 + my $decoded = $copy; 1942 + return undef unless utf8::decode($decoded); 1943 + utf8::downgrade($decoded, 1); 1944 + return $decoded; 1945 + } 1946 + 1947 + sub _repair_blob_rows ($dbh, %args) { 1948 + my $rows = $dbh->selectall_arrayref($args{select_sql}, { Slice => {} }); 1949 + my %blob_positions = map { $_ + 1 => 1 } 0 .. @{ $args{columns} } - 1; 1950 + my $count = 0; 1951 + 1952 + ROW: 1953 + for my $row (@$rows) { 1954 + my %fixed = %$row; 1955 + my $changed = 0; 1956 + for my $column (@{ $args{columns} }) { 1957 + my $current = _normalize_blob_scalar($row->{$column}); 1958 + $fixed{$column} = $current; 1959 + next if $args{validate}->($row, $current, $column); 1960 + my $candidate = _demangle_utf8_blob($row->{$column}); 1961 + next ROW unless defined $candidate && $args{validate}->($row, $candidate, $column); 1962 + $fixed{$column} = $candidate; 1963 + $changed = 1; 1964 + } 1965 + next unless $changed; 1966 + _execute_sql( 1967 + $dbh, 1968 + $args{update_sql}, 1969 + [ $args{params_for}->($row, \%fixed) ], 1970 + \%blob_positions, 1971 + ); 1972 + $count++; 1973 + } 1974 + 1975 + return $count; 1976 + } 1977 + 1978 + sub _valid_block_bytes ($cid, $codec, $bytes) { 1979 + return 0 unless defined $cid && defined $bytes; 1980 + my $actual = eval { 1981 + local $SIG{__WARN__} = sub { }; 1982 + if (($codec // 0) == CID_CODEC_RAW) { 1983 + return ATProto::PDS::Repo::CID->for_raw($bytes)->to_string; 1984 + } 1985 + decode_dag_cbor($bytes) if ($codec // 0) == CID_CODEC_DAG_CBOR; 1986 + return ATProto::PDS::Repo::CID->for_dag_cbor($bytes)->to_string; 1987 + }; 1988 + return 0 if $@; 1989 + return ($actual // q()) eq $cid; 1990 + } 1991 + 1992 + sub _valid_dag_cbor_cid ($cid, $bytes) { 1993 + return 0 unless defined $cid && defined $bytes; 1994 + my $actual = eval { 1995 + local $SIG{__WARN__} = sub { }; 1996 + decode_dag_cbor($bytes); 1997 + return ATProto::PDS::Repo::CID->for_dag_cbor($bytes)->to_string; 1998 + }; 1999 + return 0 if $@; 2000 + return ($actual // q()) eq $cid; 2001 + } 2002 + 2003 + sub _valid_car_for_commit ($commit_cid, $bytes) { 2004 + return 0 unless defined $bytes; 2005 + my $car = eval { read_car($bytes) }; 2006 + return 0 if $@ || !$car; 2007 + return 1 unless defined $commit_cid && length $commit_cid; 2008 + return 0 unless @{ $car->{roots} || [] }; 2009 + return ($car->{roots}[0]->to_string // q()) eq $commit_cid; 2010 + } 2011 + 2012 + sub _valid_event_car ($row, $bytes) { 2013 + return 0 unless defined $bytes; 2014 + return _valid_car_for_commit($row->{commit_cid}, $bytes); 1753 2015 } 1754 2016 1755 2017 sub _paginate ($rows, $limit, $cursor_key) {
+13
script/perlsky-admin
··· 66 66 exit 0; 67 67 } 68 68 69 + if ($command eq 'repair-binary-columns') { 70 + my $store = ATProto::PDS::Store::SQLite->new( 71 + path => $config->{db_path} || File::Spec->catfile($root, 'data', 'runtime', 'perlsky.sqlite'), 72 + )->bootstrap; 73 + 74 + my $result = $store->repair_binary_columns; 75 + for my $key (sort keys %$result) { 76 + print "$key=$result->{$key}\n"; 77 + } 78 + exit 0; 79 + } 80 + 69 81 die usage(); 70 82 71 83 sub _new_invite_code { ··· 76 88 return <<'EOF'; 77 89 usage: 78 90 perlsky-admin create-invite [--use-count N] [--for-account DID] [--created-by DID] 91 + perlsky-admin repair-binary-columns 79 92 EOF 80 93 }
+228
t/sqlite-binary.t
··· 1 + use v5.34; 2 + use warnings; 3 + 4 + use Config (); 5 + use File::Spec; 6 + use File::Temp qw(tempfile); 7 + use FindBin qw($Bin); 8 + use Test2::V0; 9 + 10 + BEGIN { 11 + require lib; 12 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 13 + lib->import( 14 + File::Spec->catdir($root, 'lib'), 15 + File::Spec->catdir($root, 'local', 'lib', 'perl5'), 16 + File::Spec->catdir($root, 'local', 'lib', 'perl5', $Config::Config{archname}), 17 + ); 18 + } 19 + 20 + use ATProto::PDS::Repo::CAR qw(read_car write_car); 21 + use ATProto::PDS::Repo::CID; 22 + use ATProto::PDS::Repo::DagCbor qw(encode_dag_cbor); 23 + use ATProto::PDS::Store::SQLite; 24 + 25 + sub _upgrade_copy { 26 + my ($bytes) = @_; 27 + my $copy = $bytes; 28 + utf8::upgrade($copy); 29 + return $copy; 30 + } 31 + 32 + sub _mangle_copy { 33 + my ($bytes) = @_; 34 + my $copy = _upgrade_copy($bytes); 35 + utf8::encode($copy); 36 + return $copy; 37 + } 38 + 39 + my ($fh, $path) = tempfile(); 40 + close $fh; 41 + 42 + my $store = ATProto::PDS::Store::SQLite->new(path => $path)->bootstrap; 43 + 44 + my $record = { 45 + '$type' => 'app.bsky.feed.post', 46 + text => 'sqlite binary roundtrip', 47 + createdAt => '2026-03-11T02:00:00Z', 48 + }; 49 + my $record_bytes = encode_dag_cbor($record); 50 + my $record_cid = ATProto::PDS::Repo::CID->for_dag_cbor($record_bytes); 51 + my $car_bytes = write_car($record_cid, [ 52 + { cid => $record_cid, bytes => $record_bytes }, 53 + ]); 54 + 55 + my $salt = pack('C*', map { 0x80 + $_ } 0 .. 15); 56 + my $private_key = pack('C*', map { 0x80 + $_ } 0 .. 31); 57 + my $public_key = pack('C*', map { 0x80 + $_ } 0 .. 64); 58 + my $label_sig = pack('C*', map { 0x80 + $_ } 0 .. 63); 59 + 60 + my $account = $store->create_account( 61 + did => 'did:plc:sqlitebinary', 62 + handle => 'sqlitebinary.test', 63 + password_salt => _upgrade_copy($salt), 64 + private_key => _upgrade_copy($private_key), 65 + public_key => _upgrade_copy($public_key), 66 + public_key_multibase => 'ztest', 67 + signing_key_did => 'did:key:ztest', 68 + ); 69 + 70 + ok(!utf8::is_utf8($account->{password_salt}), 'account salt is returned as raw bytes'); 71 + ok(!utf8::is_utf8($account->{private_key}), 'account private key is returned as raw bytes'); 72 + ok(!utf8::is_utf8($account->{public_key}), 'account public key is returned as raw bytes'); 73 + is($account->{password_salt}, $salt, 'account salt roundtrips'); 74 + is($account->{private_key}, $private_key, 'account private key roundtrips'); 75 + is($account->{public_key}, $public_key, 'account public key roundtrips'); 76 + 77 + $store->put_block( 78 + cid => $record_cid->to_string, 79 + codec => $record_cid->codec, 80 + bytes => _upgrade_copy($record_bytes), 81 + ); 82 + my $block = $store->get_block($record_cid->to_string); 83 + ok(!utf8::is_utf8($block->{bytes}), 'block bytes are returned as raw bytes'); 84 + is($block->{bytes}, $record_bytes, 'block bytes roundtrip'); 85 + 86 + $store->put_record( 87 + did => $account->{did}, 88 + collection => 'app.bsky.feed.post', 89 + rkey => 'abc', 90 + cid => $record_cid->to_string, 91 + value => $record, 92 + record_bytes => _upgrade_copy($record_bytes), 93 + ); 94 + my $stored_record = $store->get_record($account->{did}, 'app.bsky.feed.post', 'abc'); 95 + ok(!utf8::is_utf8($stored_record->{record_bytes}), 'record bytes are returned as raw bytes'); 96 + is($stored_record->{record_bytes}, $record_bytes, 'record bytes roundtrip'); 97 + 98 + $store->put_commit( 99 + did => $account->{did}, 100 + rev => 'rev1', 101 + cid => $record_cid->to_string, 102 + root_cid => $record_cid->to_string, 103 + commit_bytes => _upgrade_copy($record_bytes), 104 + car_bytes => _upgrade_copy($car_bytes), 105 + ); 106 + my $commit = $store->get_latest_commit($account->{did}); 107 + ok(!utf8::is_utf8($commit->{commit_bytes}), 'commit bytes are returned as raw bytes'); 108 + ok(!utf8::is_utf8($commit->{car_bytes}), 'commit CAR is returned as raw bytes'); 109 + is($commit->{commit_bytes}, $record_bytes, 'commit bytes roundtrip'); 110 + is($commit->{car_bytes}, $car_bytes, 'commit CAR roundtrip'); 111 + 112 + $store->append_event( 113 + did => $account->{did}, 114 + type => 'commit', 115 + rev => 'rev1', 116 + commit_cid => $record_cid->to_string, 117 + payload => { ops => [] }, 118 + car_bytes => _upgrade_copy($car_bytes), 119 + ); 120 + my $event = $store->list_events_after(0)->[0]; 121 + ok(!utf8::is_utf8($event->{car_bytes}), 'event CAR is returned as raw bytes'); 122 + is($event->{car_bytes}, $car_bytes, 'event CAR roundtrip'); 123 + is(read_car($event->{car_bytes})->{roots}[0]->to_string, $record_cid->to_string, 'roundtripped event CAR remains parseable'); 124 + 125 + my ($repair_fh, $repair_path) = tempfile(); 126 + close $repair_fh; 127 + my $repair_store = ATProto::PDS::Store::SQLite->new(path => $repair_path)->bootstrap; 128 + my $repair_account = $repair_store->create_account( 129 + did => 'did:plc:repairbinary', 130 + handle => 'repairbinary.test', 131 + password_salt => $salt, 132 + private_key => $private_key, 133 + public_key => $public_key, 134 + public_key_multibase => 'zrepair', 135 + signing_key_did => 'did:key:zrepair', 136 + ); 137 + 138 + my $dbh = $repair_store->dbh; 139 + my $mangled_salt = _mangle_copy($salt); 140 + my $mangled_private_key = _mangle_copy($private_key); 141 + my $mangled_public_key = _mangle_copy($public_key); 142 + my $mangled_record = _mangle_copy($record_bytes); 143 + my $mangled_car = _mangle_copy($car_bytes); 144 + 145 + $dbh->do( 146 + q{UPDATE accounts SET password_salt = ?, private_key = ?, public_key = ? WHERE did = ?}, 147 + undef, 148 + $mangled_salt, 149 + $mangled_private_key, 150 + $mangled_public_key, 151 + $repair_account->{did}, 152 + ); 153 + $dbh->do( 154 + q{INSERT INTO blocks (cid, codec, bytes, created_at) VALUES (?, ?, ?, ?)}, 155 + undef, 156 + $record_cid->to_string, 157 + $record_cid->codec, 158 + $mangled_record, 159 + time, 160 + ); 161 + $dbh->do( 162 + q{INSERT INTO records (did, collection, rkey, cid, value_json, record_bytes, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)}, 163 + undef, 164 + $repair_account->{did}, 165 + 'app.bsky.feed.post', 166 + 'repair', 167 + $record_cid->to_string, 168 + '{"text":"repair"}', 169 + $mangled_record, 170 + time, 171 + time, 172 + ); 173 + $dbh->do( 174 + q{INSERT INTO commits (did, rev, cid, root_cid, prev_cid, commit_bytes, car_bytes, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)}, 175 + undef, 176 + $repair_account->{did}, 177 + 'rev2', 178 + $record_cid->to_string, 179 + $record_cid->to_string, 180 + undef, 181 + $mangled_record, 182 + $mangled_car, 183 + time, 184 + ); 185 + $dbh->do( 186 + q{INSERT INTO events (did, type, rev, commit_cid, payload_json, car_bytes, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)}, 187 + undef, 188 + $repair_account->{did}, 189 + 'commit', 190 + 'rev2', 191 + $record_cid->to_string, 192 + '{"ops":[]}', 193 + $mangled_car, 194 + time, 195 + ); 196 + $dbh->do( 197 + q{INSERT INTO labels (subject_key, src, uri, cid, val, exp, sig, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)}, 198 + undef, 199 + 'at://did:plc:repairbinary/app.bsky.feed.post/repair', 200 + 'did:plc:repairbinary', 201 + 'at://did:plc:repairbinary/app.bsky.feed.post/repair', 202 + $record_cid->to_string, 203 + '!hide', 204 + undef, 205 + _mangle_copy($label_sig), 206 + time, 207 + time, 208 + ); 209 + 210 + my $counts = $repair_store->repair_binary_columns; 211 + cmp_ok($counts->{accounts}, '>=', 1, 'repair updated account blobs'); 212 + cmp_ok($counts->{blocks}, '>=', 1, 'repair updated block blobs'); 213 + cmp_ok($counts->{records}, '>=', 1, 'repair updated record blobs'); 214 + cmp_ok($counts->{commits}, '>=', 1, 'repair updated commit blobs'); 215 + cmp_ok($counts->{events}, '>=', 1, 'repair updated event blobs'); 216 + cmp_ok($counts->{labels}, '>=', 1, 'repair updated label signatures'); 217 + 218 + my $repaired_account = $repair_store->get_account_by_did($repair_account->{did}); 219 + is($repaired_account->{password_salt}, $salt, 'repair restored account salt'); 220 + is($repaired_account->{private_key}, $private_key, 'repair restored account private key'); 221 + is($repaired_account->{public_key}, $public_key, 'repair restored account public key'); 222 + is($repair_store->get_block($record_cid->to_string)->{bytes}, $record_bytes, 'repair restored block bytes'); 223 + is($repair_store->get_record($repair_account->{did}, 'app.bsky.feed.post', 'repair')->{record_bytes}, $record_bytes, 'repair restored record bytes'); 224 + is($repair_store->get_latest_commit($repair_account->{did})->{car_bytes}, $car_bytes, 'repair restored commit CAR'); 225 + is($repair_store->list_events_after(0)->[0]{car_bytes}, $car_bytes, 'repair restored event CAR'); 226 + is($repair_store->get_label(subject_key => 'at://did:plc:repairbinary/app.bsky.feed.post/repair', src => 'did:plc:repairbinary', val => '!hide')->{sig}, $label_sig, 'repair restored label signature bytes'); 227 + 228 + done_testing;