objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix sequencer not sending events from db

futurGH ac0a3af6 a0688f82

+101 -92
+99 -90
pegasus/lib/sequencer.ml
··· 33 33 type commit_evt_op = 34 34 { action: commit_op_action 35 35 ; path: string 36 - ; cid: Cid.t option 37 - ; prev: Cid.t option } 36 + ; cid: Cid.t option [@default None] 37 + ; prev: Cid.t option [@default None] } 38 38 [@@deriving yojson] 39 39 40 40 type commit_evt = 41 41 { rebase: bool (* always false *) 42 - ; too_big: bool (* always false *) 42 + ; too_big: bool [@key "tooBig"] (* always false *) 43 43 ; repo: string 44 44 ; commit: Cid.t 45 45 ; rev: string 46 - ; since: string option 46 + ; since: string option [@default None] 47 47 ; blocks: bytes 48 48 ; ops: commit_evt_op list 49 49 ; blobs: Cid.t list (* always empty *) 50 - ; prev_data: Cid.t option } 50 + ; prev_data: Cid.t option [@key "prevData"] [@default None] } 51 51 [@@deriving yojson] 52 52 53 53 type sync_evt = {did: string; blocks: bytes; rev: string} [@@deriving yojson] 54 54 55 - type identity_evt = {did: string; handle: string option} [@@deriving yojson] 55 + type identity_evt = {did: string; handle: string option [@default None]} 56 + [@@deriving yojson] 56 57 57 58 let account_status_to_string = function 58 59 | `Active -> ··· 105 106 Error "invalid account status"] ) 106 107 [@@deriving yojson] 107 108 108 - type account_evt = {did: string; active: bool; status: account_status option} 109 + type account_evt = 110 + { did: string 111 + ; active: bool 112 + ; status: account_status option [@default Some `Active] } 109 113 [@@deriving yojson] 110 114 111 115 type info_evt = {name: string; message: string} [@@deriving yojson] ··· 205 209 | Error of error_payload 206 210 207 211 type event = {seq: int; time: string; kind: event_kind} 208 - 209 - let event_of_db_event (dbe : Data_store.Types.firehose_event) = 210 - let t = header_t_of_string dbe.t in 211 - match t with 212 - | Ok t -> 213 - let yj = Dag_cbor.decode_to_yojson dbe.data in 214 - let kind = 215 - match t with 216 - | `Commit -> 217 - Message (Commit (Result.get_ok @@ commit_evt_of_yojson yj), t) 218 - | `Sync -> 219 - Message (Sync (Result.get_ok @@ sync_evt_of_yojson yj), t) 220 - | `Identity -> 221 - Message (Identity (Result.get_ok @@ identity_evt_of_yojson yj), t) 222 - | `Account -> 223 - Message (Account (Result.get_ok @@ account_evt_of_yojson yj), t) 224 - | `Info -> 225 - assert false 226 - in 227 - Ok {seq= dbe.seq; time= Util.ms_to_iso8601 dbe.time; kind} 228 - | Error _ -> 229 - Error ("invalid header type " ^ dbe.t) 230 212 end 231 213 232 214 open Types 233 215 216 + module Encode = struct 217 + let format_commit 218 + ({repo; commit; rev; since; blocks; ops; prev_data; _} : commit_evt) = 219 + `Assoc 220 + ( [ ("rebase", `Bool false) 221 + ; ("tooBig", `Bool false) 222 + ; ("repo", `String repo) 223 + ; ("commit", Cid.to_yojson commit) 224 + ; ("rev", `String rev) 225 + ; ("since", match since with Some s -> `String s | None -> `Null) 226 + ; ("blocks", Dag_cbor.to_yojson (`Bytes blocks)) 227 + ; ("ops", `List (List.map commit_evt_op_to_yojson ops)) 228 + ; ("blobs", `List []) ] 229 + @ 230 + match prev_data with 231 + | Some cid -> 232 + [("prevData", Cid.to_yojson cid)] 233 + | None -> 234 + [] ) 235 + 236 + let format_sync ({did; blocks; rev; _} : sync_evt) = 237 + `Assoc 238 + [ ("did", `String did) 239 + ; ("blocks", Dag_cbor.to_yojson (`Bytes blocks)) 240 + ; ("rev", `String rev) ] 241 + 242 + let format_identity ({did; handle; _} : identity_evt) = 243 + let fields = 244 + [("did", `String did)] 245 + @ match handle with Some h -> [("handle", `String h)] | None -> [] 246 + in 247 + `Assoc fields 248 + 249 + let format_account ({did; active; status} : account_evt) = 250 + let fields = 251 + [("did", `String did); ("active", `Bool active)] 252 + @ 253 + match status with 254 + | Some s -> 255 + [("status", `String (account_status_to_string s))] 256 + | None -> 257 + [] 258 + in 259 + `Assoc fields 260 + end 261 + 234 262 module Frame = struct 235 263 let message_header t = 236 264 let header = ··· 246 274 | Commit commit -> 247 275 ( message_header `Commit 248 276 , header_t_to_string `Commit 249 - , commit_evt_to_yojson commit ) 277 + , Encode.format_commit commit ) 250 278 | Sync sync -> 251 279 ( message_header `Sync 252 280 , header_t_to_string `Sync 253 - , sync_evt_to_yojson sync ) 281 + , Encode.format_sync sync ) 254 282 | Identity identity -> 255 283 ( message_header `Identity 256 284 , header_t_to_string `Identity 257 - , identity_evt_to_yojson identity ) 285 + , Encode.format_identity identity ) 258 286 | Account account -> 259 287 ( message_header `Account 260 288 , header_t_to_string `Account 261 - , account_evt_to_yojson account ) 289 + , Encode.format_account account ) 262 290 | Info info -> 263 291 ( message_header `Info 264 292 , header_t_to_string `Info ··· 281 309 Bytes.cat error_header payload 282 310 end 283 311 284 - module Encode = struct 285 - let format_commit 286 - ({repo; commit; rev; since; blocks; ops; prev_data; _} : commit_evt) = 287 - let json = 288 - `Assoc 289 - ( [ ("rebase", `Bool false) 290 - ; ("tooBig", `Bool false) 291 - ; ("repo", `String repo) 292 - ; ("commit", Cid.to_yojson commit) 293 - ; ("rev", `String rev) 294 - ; ("since", match since with Some s -> `String s | None -> `Null) 295 - ; ("blocks", Dag_cbor.to_yojson (`Bytes blocks)) 296 - ; ("ops", `List (List.map commit_evt_op_to_yojson ops)) 297 - ; ("blobs", `List []) ] 298 - @ 299 - match prev_data with 300 - | Some cid -> 301 - [("prevData", Cid.to_yojson cid)] 302 - | None -> 303 - [] ) 304 - in 305 - Dag_cbor.encode_yojson json 306 - 307 - let format_sync ({did; blocks; rev; _} : sync_evt) = 308 - let json = 309 - `Assoc 310 - [ ("did", `String did) 311 - ; ("blocks", Dag_cbor.to_yojson (`Bytes blocks)) 312 - ; ("rev", `String rev) ] 313 - in 314 - Dag_cbor.encode_yojson json 315 - 316 - let format_identity ({did; handle; _} : identity_evt) = 317 - let fields = 318 - [("did", `String did)] 319 - @ match handle with Some h -> [("handle", `String h)] | None -> [] 320 - in 321 - Dag_cbor.encode_yojson (`Assoc fields) 322 - 323 - let format_account ({did; active; status} : account_evt) = 324 - let fields = 325 - [("did", `String did); ("active", `Bool active)] 326 - @ 327 - match status with 328 - | Some s -> 329 - [("status", `String (account_status_to_string s))] 330 - | None -> 331 - [] 332 - in 333 - Dag_cbor.encode_yojson (`Assoc fields) 334 - end 335 - 336 312 module Parse = struct 337 313 let parse_commit (bytes : bytes) : (commit_evt, string) result = 338 314 try ··· 444 420 in 445 421 Ok {did; active; status} 446 422 with e -> Error (Printexc.to_string e) 423 + 424 + let event_of_db_event (dbe : Data_store.Types.firehose_event) = 425 + let t = header_t_of_string dbe.t in 426 + match t with 427 + | Ok t -> ( 428 + let kind_result = 429 + match t with 430 + | `Commit -> 431 + parse_commit dbe.data 432 + |> Result.map (fun evt -> Message (Commit evt, t)) 433 + | `Sync -> 434 + parse_sync dbe.data 435 + |> Result.map (fun evt -> Message (Sync evt, t)) 436 + | `Identity -> 437 + parse_identity dbe.data 438 + |> Result.map (fun evt -> Message (Identity evt, t)) 439 + | `Account -> 440 + parse_account dbe.data 441 + |> Result.map (fun evt -> Message (Account evt, t)) 442 + | `Info -> 443 + Error "Info events not supported in DB" 444 + in 445 + match kind_result with 446 + | Ok kind -> 447 + Ok {seq= dbe.seq; time= Util.ms_to_iso8601 dbe.time; kind} 448 + | Error e -> 449 + Error ("failed to parse event: " ^ e) ) 450 + | Error _ -> 451 + Error ("invalid header type " ^ dbe.t) 447 452 end 448 453 449 454 module Bus = struct ··· 561 566 | Some max when r.seq > max -> 562 567 None 563 568 | _ -> ( 564 - match event_of_db_event r with Ok e -> Some e | Error _ -> None ) ) 569 + match Parse.event_of_db_event r with 570 + | Ok e -> 571 + Some e 572 + | Error _ -> 573 + None ) ) 565 574 rows 566 575 in 567 576 Lwt.return evs ··· 685 694 ; ops 686 695 ; prev_data } 687 696 in 688 - let raw = Encode.format_commit evt in 697 + let raw = Dag_cbor.encode_yojson @@ Encode.format_commit evt in 689 698 let%lwt seq = DB.append_event conn ~t:`Commit ~time:time_ms ~data:raw in 690 699 let frame = Frame.encode_message ~seq ~time:time_iso (Commit evt) in 691 700 let%lwt () = Bus.publish {seq; bytes= frame} in ··· 696 705 let time_ms = Util.now_ms () in 697 706 let time_iso = Util.ms_to_iso8601 time_ms in 698 707 let evt : sync_evt = {did; rev; blocks} in 699 - let raw = Encode.format_sync evt in 708 + let raw = Dag_cbor.encode_yojson @@ Encode.format_sync evt in 700 709 let%lwt seq = DB.append_event conn ~t:`Sync ~time:time_ms ~data:raw in 701 710 let frame = Frame.encode_message ~seq ~time:time_iso (Sync evt) in 702 711 let%lwt () = Bus.publish {seq; bytes= frame} in ··· 707 716 let time_ms = Util.now_ms () in 708 717 let time_iso = Util.ms_to_iso8601 time_ms in 709 718 let evt : identity_evt = {did; handle} in 710 - let raw = Encode.format_identity evt in 719 + let raw = Dag_cbor.encode_yojson @@ Encode.format_identity evt in 711 720 let%lwt seq = DB.append_event conn ~t:`Identity ~time:time_ms ~data:raw in 712 721 let frame = Frame.encode_message ~seq ~time:time_iso (Identity evt) in 713 722 let%lwt () = Bus.publish {seq; bytes= frame} in ··· 718 727 let time_ms = Util.now_ms () in 719 728 let time_iso = Util.ms_to_iso8601 time_ms in 720 729 let evt : account_evt = {did; active; status} in 721 - let raw = Encode.format_account evt in 730 + let raw = Dag_cbor.encode_yojson @@ Encode.format_account evt in 722 731 let%lwt seq = DB.append_event conn ~t:`Account ~time:time_ms ~data:raw in 723 732 let frame = Frame.encode_message ~seq ~time:time_iso (Account evt) in 724 733 let%lwt () = Bus.publish {seq; bytes= frame} in
+2 -2
pegasus/test/test_sequencer.ml
··· 82 82 let time0 = Util.now_ms () in 83 83 let mk_raw did = 84 84 let evt : Sequencer.Types.identity_evt = {did; handle= None} in 85 - Sequencer.Encode.format_identity evt 85 + Dag_cbor.encode_yojson @@ Sequencer.Encode.format_identity evt 86 86 in 87 87 let%lwt _ = 88 88 Sequencer.DB.append_event conn ~t:`Identity ~time:(time0 + 1) ··· 139 139 (* add 2 identity events to db without publishing *) 140 140 let mk_raw did = 141 141 let evt : Sequencer.Types.identity_evt = {did; handle= None} in 142 - Sequencer.Encode.format_identity evt 142 + Dag_cbor.encode_yojson @@ Sequencer.Encode.format_identity evt 143 143 in 144 144 let%lwt _ = 145 145 Sequencer.DB.append_event conn ~t:`Identity ~time:(time0 + 1)