Unified Agent + reusable Go agent core.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: move memory drafting to projection

Lyric 75cce0dd 74a8b852

+984 -559
+1 -1
README.md
··· 434 434 `auth_profiles.*.credential.secret_ref` is also a direct environment variable name (example: `JSONBILL_API_KEY`). 435 435 436 436 Key meanings (see `assets/config/config.example.yaml` for the canonical list): 437 - - Core: `llm.provider` selects the backend. Most providers use `llm.endpoint`/`llm.api_key`/`llm.model`. Optional defaults `llm.temperature`, `llm.reasoning_effort`, and `llm.reasoning_budget_tokens` are forwarded to `uniai` only when set. Azure uses `llm.azure.deployment` for deployment name, while endpoint/key are still read from `llm.endpoint` and `llm.api_key`. Bedrock uses `llm.bedrock.*`. `llm.tools_emulation_mode` controls tool-call emulation for models without native tool calling (`off|fallback|force`). `llm.profiles` defines named profile overrides, and `llm.routes` routes semantic purposes such as `main_loop`, `addressing`, `heartbeat`, and `plan_create`. 437 + - Core: `llm.provider` selects the backend. Most providers use `llm.endpoint`/`llm.api_key`/`llm.model`. Optional defaults `llm.temperature`, `llm.reasoning_effort`, and `llm.reasoning_budget_tokens` are forwarded to `uniai` only when set. Azure uses `llm.azure.deployment` for deployment name, while endpoint/key are still read from `llm.endpoint` and `llm.api_key`. Bedrock uses `llm.bedrock.*`. `llm.tools_emulation_mode` controls tool-call emulation for models without native tool calling (`off|fallback|force`). `llm.profiles` defines named profile overrides, and `llm.routes` routes semantic purposes such as `main_loop`, `addressing`, `heartbeat`, `plan_create`, and `memory_draft`. 438 438 - LLM secret refs: secret-like fields in `llm` and `llm.profiles` accept sibling `*_ref` keys. The value of `*_ref` is the environment variable name, not the secret itself. If both plaintext and `*_ref` are set, `*_ref` wins. If `*_ref` points to a missing or empty env var, startup fails fast. Example: 439 439 ```yaml 440 440 llm:
+2 -1
assets/config/config.example.yaml
··· 59 59 # # api_key_ref: XAI_API_KEY 60 60 # reasoning_effort: "high" 61 61 # Optional route map. Supported purposes: 62 - # main_loop | addressing | heartbeat | plan_create 62 + # main_loop | addressing | heartbeat | plan_create | memory_draft 63 63 # routes: 64 64 # main_loop: default 65 65 # addressing: cheap 66 66 # heartbeat: cheap 67 67 # plan_create: reasoning 68 + # memory_draft: cheap 68 69 69 70 # Multimodal input policy (cross-channel, cross-source). 70 71 multimodal:
+3 -2
docs/arch.md
··· 193 193 Notes: 194 194 195 195 - Runtime-level memory integration is wired in Telegram, Slack, and Heartbeat. 196 - - Telegram currently uses a legacy direct memory adapter (`internal/channelruntime/telegram/runtime_task.go`). 197 - - Slack and Heartbeat use shared orchestrator wiring (`internal/memoryruntime/*`) via: 196 + - Telegram, Slack, and Heartbeat all use shared orchestrator wiring (`internal/memoryruntime/*`) via: 197 + - `internal/channelruntime/telegram/runtime.go` 198 + - `internal/channelruntime/telegram/runtime_task.go` 198 199 - `internal/channelruntime/slack/runtime.go` 199 200 - `internal/channelruntime/slack/runtime_task.go` 200 201 - `internal/channelruntime/heartbeat/run.go`
+4 -2
docs/feat/feat_20260228_memory_refactor.md
··· 1 1 # Memory Implementation Plan (WAL-first, Minimal) 2 2 3 + Note: this plan doc is historical. The current event shape and projector behavior are documented in `docs/feat/memory-project-time-draft.md` and `docs/memory.md`. 4 + 3 5 ## 1. Principles 4 6 5 7 - `memory/log/*.jsonl` is the single source of truth. ··· 42 44 - `protocol` 43 45 - `task_text` 44 46 - `final_output` 45 - - `draft_summary_items` 46 - - `draft_promote` 47 + - `source_history` 48 + - `session_context` 47 49 - [x] Define id rules: 48 50 - `event_id` unique per event. 49 51 - `task_run_id` links events to one run.
+57
docs/feat/memory-project-time-draft.md
··· 1 + # Memory Project-Time Draft 2 + 3 + ## Goal 4 + 5 + Move memory drafting from channel task execution into the memory projection stage. 6 + 7 + The intended pipeline is: 8 + 9 + 1. `channel runtime` writes a raw memory event into the journal/log. 10 + 2. `memory projector` reads raw events and resolves a draft. 11 + 3. `memory projector` writes projected short-term and long-term memory artifacts. 12 + 13 + This replaces the old shape: 14 + 15 + 1. `channel runtime` called LLM to build `SessionDraft` 16 + 2. `channel runtime` wrote drafted event into the journal 17 + 3. `projector` only merged precomputed draft 18 + 19 + ## Why 20 + 21 + - The memory journal should be a project-level raw source of truth, not a place that already contains channel-local interpretation. 22 + - LLM draft generation is part of projection, not ingestion. 23 + - Channel runtimes should only normalize source data and append raw events. 24 + - Projection can now use current projected state (`existing short-term content`) when resolving new drafts. 25 + 26 + ## Event Shape 27 + 28 + New events are written with raw fields: 29 + 30 + - `task_text` 31 + - `final_output` 32 + - `source_history` 33 + - `session_context` 34 + 35 + Projector behavior: 36 + 37 + - projector asks its `DraftResolver` to derive a draft from raw event data 38 + - old journal compatibility is intentionally dropped; replay expects the raw event shape 39 + 40 + ## Draft Resolver 41 + 42 + `memoryruntime.NewDraftResolver(...)` owns the project-time draft strategy. 43 + 44 + - If an LLM client is configured and the event has enough source history, it calls `memory.draft`. 45 + - Otherwise it falls back to a simple deterministic summary derived from `final_output`. 46 + 47 + This keeps the draft decision inside memory/projection, not inside channel task execution. 48 + 49 + ## Channel Responsibilities 50 + 51 + Channels now only append raw events: 52 + 53 + - Telegram: appends source history plus session context. 54 + - Slack: appends source history plus session context. 55 + - Heartbeat: appends raw summary output and minimal session context. 56 + 57 + No channel task path should call `BuildLLMDraft` directly anymore.
+6 -6
docs/memory.md
··· 316 316 - `participants` (array, multi-party aware, may be empty) 317 317 - `task_text` 318 318 - `final_output` 319 - - `draft_summary_items` 320 - - `draft_promote` 319 + - `source_history` 320 + - `session_context` 321 321 322 322 The schema should evolve conservatively. 323 323 ··· 338 338 339 339 ```json 340 340 { 341 - "schema_version": 1, 341 + "schema_version": 3, 342 342 "event_id": "evt_01JY7K9M7T3H2QZ6A9D5V4N8P1", 343 343 "task_run_id": "run_01JY7K9B3W8F6M2N4C1R0T9X5Q", 344 344 "ts_utc": "2026-02-28T06:15:12Z", ··· 354 354 ], 355 355 "task_text": "emmm", 356 356 "final_output": "", 357 - "draft_summary_items": [], 358 - "draft_promote": {} 357 + "source_history": [], 358 + "session_context": {} 359 359 } 360 360 ``` 361 361 ··· 430 430 Projection outputs in one pass: 431 431 432 432 - one short-memory file per touched `(day, subject_id)` bucket 433 - - optional long-term update (`memory/index.md`) when event contains `draft_promote` 433 + - optional long-term update (`memory/index.md`) when resolved draft contains `promote` 434 434 435 435 When projecting one target file, summary merge uses all existing summary items in that file. 436 436
+2 -25
internal/channelruntime/heartbeat/memory_flow.go
··· 2 2 3 3 import ( 4 4 "fmt" 5 - "strings" 6 5 "time" 7 - 8 - "github.com/quailyquaily/mistermorph/memory" 9 6 ) 10 7 11 8 const ( 12 - heartbeatMemorySubjectID = "heartbeat" 13 - heartbeatMemorySessionID = "heartbeat" 14 - heartbeatMemorySummaryRunes = 1024 9 + heartbeatMemorySubjectID = "heartbeat" 10 + heartbeatMemorySessionID = "heartbeat" 15 11 ) 16 12 17 13 func heartbeatTaskRunID(now time.Time) string { 18 14 now = now.UTC() 19 15 return fmt.Sprintf("heartbeat:%s", now.Format("20060102T150405.000000000Z07:00")) 20 16 } 21 - 22 - func buildHeartbeatDraft(summary string) memory.SessionDraft { 23 - summary = strings.TrimSpace(summary) 24 - if summary == "" { 25 - return memory.SessionDraft{} 26 - } 27 - summary = strings.Join(strings.Fields(summary), " ") 28 - if summary == "" { 29 - return memory.SessionDraft{} 30 - } 31 - runes := []rune(summary) 32 - if len(runes) > heartbeatMemorySummaryRunes { 33 - summary = strings.TrimSpace(string(runes[:heartbeatMemorySummaryRunes])) 34 - } 35 - if summary == "" { 36 - return memory.SessionDraft{} 37 - } 38 - return memory.SessionDraft{SummaryItems: []string{summary}} 39 - }
-19
internal/channelruntime/heartbeat/memory_flow_test.go
··· 13 13 t.Fatalf("unexpected task run id: %q", id) 14 14 } 15 15 } 16 - 17 - func TestBuildHeartbeatDraft(t *testing.T) { 18 - if got := buildHeartbeatDraft(""); len(got.SummaryItems) != 0 { 19 - t.Fatalf("empty summary should produce empty draft: %#v", got) 20 - } 21 - got := buildHeartbeatDraft(" hello world ") 22 - if len(got.SummaryItems) != 1 || got.SummaryItems[0] != "hello world" { 23 - t.Fatalf("draft summary mismatch: %#v", got) 24 - } 25 - 26 - long := strings.Repeat("a", heartbeatMemorySummaryRunes+100) 27 - got = buildHeartbeatDraft(long) 28 - if len(got.SummaryItems) != 1 { 29 - t.Fatalf("draft summary count = %d, want 1", len(got.SummaryItems)) 30 - } 31 - if len([]rune(got.SummaryItems[0])) != heartbeatMemorySummaryRunes { 32 - t.Fatalf("draft summary rune len = %d, want %d", len([]rune(got.SummaryItems[0])), heartbeatMemorySummaryRunes) 33 - } 34 - }
+17 -4
internal/channelruntime/heartbeat/run.go
··· 76 76 sharedGuard := depsutil.GuardFromCommon(common, logger) 77 77 cfg := opts.AgentLimits.ToConfig() 78 78 79 - orchestrator, projectionWorker, cleanup, err := newHeartbeatOrchestrator(opts) 79 + orchestrator, projectionWorker, cleanup, err := newHeartbeatOrchestrator(common, opts) 80 80 if err != nil { 81 81 return err 82 82 } ··· 277 277 }}, 278 278 TaskText: task, 279 279 FinalOutput: summary, 280 - Draft: buildHeartbeatDraft(summary), 280 + SessionContext: memory.SessionContext{ 281 + ConversationID: heartbeatMemorySubjectID, 282 + }, 281 283 }); memErr != nil && opts.Logger != nil { 282 284 opts.Logger.Warn("memory_record_error", "source", "heartbeat", "error", memErr.Error()) 283 285 } else if opts.MemoryProjectionWorker != nil { ··· 308 310 return reg 309 311 } 310 312 311 - func newHeartbeatOrchestrator(opts runtimeLoopOptions) (*memoryruntime.Orchestrator, *memoryruntime.ProjectionWorker, func(), error) { 313 + func newHeartbeatOrchestrator(common depsutil.CommonDependencies, opts runtimeLoopOptions) (*memoryruntime.Orchestrator, *memoryruntime.ProjectionWorker, func(), error) { 312 314 if !opts.MemoryEnabled { 313 315 return nil, nil, func() {}, nil 314 316 } 315 317 mgr := memory.NewManager(statepaths.MemoryDir(), opts.MemoryShortTermDays) 316 318 journal := mgr.NewJournal(memory.JournalOptions{}) 317 - projector := memory.NewProjector(mgr, journal, memory.ProjectorOptions{}) 319 + draftResolver, err := memoryruntime.NewConfiguredDraftResolver(memoryruntime.DraftResolverFactoryOptions{ 320 + ResolveLLMRoute: common.ResolveLLMRoute, 321 + CreateLLMClient: func(route llmutil.ResolvedRoute) (llm.Client, error) { 322 + return depsutil.CreateClientFromCommon(common, route) 323 + }, 324 + }) 325 + if err != nil { 326 + return nil, nil, func() {}, err 327 + } 328 + projector := memory.NewProjector(mgr, journal, memory.ProjectorOptions{ 329 + DraftResolver: draftResolver, 330 + }) 318 331 orchestrator, err := memoryruntime.New(mgr, journal, projector, memoryruntime.OrchestratorOptions{}) 319 332 if err != nil { 320 333 return nil, nil, func() {}, err
+39 -18
internal/channelruntime/slack/memory_flow.go
··· 3 3 import ( 4 4 "fmt" 5 5 "strings" 6 + "time" 6 7 8 + "github.com/quailyquaily/mistermorph/internal/chathistory" 7 9 "github.com/quailyquaily/mistermorph/memory" 8 - ) 9 - 10 - const ( 11 - slackMemorySummaryMaxRunes = 1024 12 10 ) 13 11 14 12 func slackMemorySubjectID(job slackJob) string { ··· 65 63 return out 66 64 } 67 65 68 - func buildSlackMemoryDraft(finalOutput string) memory.SessionDraft { 69 - item := strings.TrimSpace(finalOutput) 70 - if item == "" { 71 - return memory.SessionDraft{} 66 + func buildSlackMemoryHistory(history []chathistory.ChatHistoryItem, job slackJob, output string, sentAt time.Time, maxItems int) []chathistory.ChatHistoryItem { 67 + out := append([]chathistory.ChatHistoryItem{}, history...) 68 + out = append(out, newSlackInboundHistoryItem(job)) 69 + if strings.TrimSpace(output) != "" { 70 + out = append(out, newSlackOutboundAgentHistoryItem(job, output, sentAt, "")) 71 + } 72 + if maxItems > 0 && len(out) > maxItems { 73 + out = out[len(out)-maxItems:] 72 74 } 73 - item = strings.Join(strings.Fields(item), " ") 74 - if item == "" { 75 - return memory.SessionDraft{} 75 + return out 76 + } 77 + 78 + func slackMemorySessionContext(job slackJob) memory.SessionContext { 79 + ctx := memory.SessionContext{ 80 + ConversationID: strings.TrimSpace(job.ChannelID), 81 + ConversationType: strings.ToLower(strings.TrimSpace(job.ChatType)), 82 + CounterpartyID: strings.TrimSpace(job.UserID), 83 + CounterpartyName: strings.TrimSpace(job.DisplayName), 84 + } 85 + if username := strings.TrimSpace(job.Username); username != "" { 86 + ctx.CounterpartyHandle = username 87 + } 88 + ctx.CounterpartyLabel = slackMemoryCounterpartyLabel(job) 89 + return ctx 90 + } 91 + 92 + func slackMemoryCounterpartyLabel(job slackJob) string { 93 + id := strings.TrimSpace(job.UserID) 94 + name := strings.TrimSpace(job.DisplayName) 95 + if name == "" { 96 + name = strings.TrimSpace(job.Username) 76 97 } 77 - runes := []rune(item) 78 - if len(runes) > slackMemorySummaryMaxRunes { 79 - item = strings.TrimSpace(string(runes[:slackMemorySummaryMaxRunes])) 98 + if id != "" && name != "" { 99 + return "[" + name + "](slack:" + id + ")" 80 100 } 81 - if item == "" { 82 - return memory.SessionDraft{} 101 + if name != "" { 102 + return name 83 103 } 84 - return memory.SessionDraft{ 85 - SummaryItems: []string{item}, 104 + if id != "" { 105 + return "slack:" + id 86 106 } 107 + return "" 87 108 }
+33 -19
internal/channelruntime/slack/memory_flow_test.go
··· 1 1 package slack 2 2 3 3 import ( 4 - "strings" 5 4 "testing" 5 + "time" 6 + 7 + "github.com/quailyquaily/mistermorph/internal/chathistory" 6 8 ) 7 9 8 10 func TestSlackMemorySubjectID(t *testing.T) { ··· 53 55 } 54 56 } 55 57 56 - func TestBuildSlackMemoryDraft(t *testing.T) { 57 - if got := buildSlackMemoryDraft(""); len(got.SummaryItems) != 0 { 58 - t.Fatalf("empty output should produce empty draft: %#v", got) 58 + func TestSlackMemoryRequestContext(t *testing.T) { 59 + if got := slackMemoryRequestContext("im"); got != "private" { 60 + t.Fatalf("im request context = %q, want private", got) 59 61 } 60 - got := buildSlackMemoryDraft(" hello world ") 61 - if len(got.SummaryItems) != 1 || got.SummaryItems[0] != "hello world" { 62 - t.Fatalf("draft summary mismatch: %#v", got) 62 + if got := slackMemoryRequestContext("channel"); got != "public" { 63 + t.Fatalf("channel request context = %q, want public", got) 63 64 } 65 + } 64 66 65 - long := strings.Repeat("a", slackMemorySummaryMaxRunes+50) 66 - got = buildSlackMemoryDraft(long) 67 - if len(got.SummaryItems) != 1 { 68 - t.Fatalf("long draft summary count = %d, want 1", len(got.SummaryItems)) 69 - } 70 - if len([]rune(got.SummaryItems[0])) != slackMemorySummaryMaxRunes { 71 - t.Fatalf("long draft summary rune len = %d, want %d", len([]rune(got.SummaryItems[0])), slackMemorySummaryMaxRunes) 67 + func TestSlackMemoryCounterpartyLabel(t *testing.T) { 68 + got := slackMemoryCounterpartyLabel(slackJob{ 69 + UserID: "U123", 70 + DisplayName: "Alice", 71 + }) 72 + if got != "[Alice](slack:U123)" { 73 + t.Fatalf("counterparty_label = %q, want [Alice](slack:U123)", got) 72 74 } 73 75 } 74 76 75 - func TestSlackMemoryRequestContext(t *testing.T) { 76 - if got := slackMemoryRequestContext("im"); got != "private" { 77 - t.Fatalf("im request context = %q, want private", got) 77 + func TestBuildSlackMemoryHistoryCapsLatestItems(t *testing.T) { 78 + history := []chathistory.ChatHistoryItem{ 79 + {Kind: chathistory.KindInboundUser, Text: "one"}, 80 + {Kind: chathistory.KindInboundUser, Text: "two"}, 81 + {Kind: chathistory.KindInboundUser, Text: "three"}, 78 82 } 79 - if got := slackMemoryRequestContext("channel"); got != "public" { 80 - t.Fatalf("channel request context = %q, want public", got) 83 + got := buildSlackMemoryHistory(history, slackJob{ 84 + ChannelID: "C1", 85 + ChatType: "channel", 86 + UserID: "U1", 87 + Username: "alice", 88 + Text: "four", 89 + }, "five", time.Date(2026, 3, 11, 10, 0, 0, 0, time.UTC), 4) 90 + if len(got) != 4 { 91 + t.Fatalf("len(history) = %d, want 4", len(got)) 92 + } 93 + if got[0].Text != "two" || got[1].Text != "three" || got[2].Text != "four" || got[3].Text != "five" { 94 + t.Fatalf("history texts = %#v, want [two three four five]", []string{got[0].Text, got[1].Text, got[2].Text, got[3].Text}) 81 95 } 82 96 }
+21 -1
internal/channelruntime/slack/runtime.go
··· 24 24 "github.com/quailyquaily/mistermorph/internal/llmutil" 25 25 "github.com/quailyquaily/mistermorph/internal/memoryruntime" 26 26 "github.com/quailyquaily/mistermorph/internal/statepaths" 27 + "github.com/quailyquaily/mistermorph/llm" 27 28 "github.com/quailyquaily/mistermorph/memory" 28 29 "github.com/quailyquaily/mistermorph/tools" 29 30 slacktools "github.com/quailyquaily/mistermorph/tools/slack" ··· 307 308 var memOrchestrator *memoryruntime.Orchestrator 308 309 var memProjectionWorker *memoryruntime.ProjectionWorker 309 310 if opts.MemoryEnabled { 311 + draftResolver, err := memoryruntime.NewConfiguredDraftResolver(memoryruntime.DraftResolverFactoryOptions{ 312 + ResolveLLMRoute: d.ResolveLLMRoute, 313 + CreateLLMClient: d.CreateLLMClient, 314 + DecorateClient: func(client llm.Client, route llmutil.ResolvedRoute) llm.Client { 315 + return llminspect.WrapClient(client, llminspect.ClientOptions{ 316 + PromptInspector: promptInspector, 317 + RequestInspector: requestInspector, 318 + APIBase: route.ClientConfig.Endpoint, 319 + Model: strings.TrimSpace(route.ClientConfig.Model), 320 + }) 321 + }, 322 + }) 323 + if err != nil { 324 + return err 325 + } 310 326 memManager := memory.NewManager(statepaths.MemoryDir(), opts.MemoryShortTermDays) 311 327 memJournal := memManager.NewJournal(memory.JournalOptions{}) 312 - memProjector := memory.NewProjector(memManager, memJournal, memory.ProjectorOptions{}) 328 + memProjector := memory.NewProjector(memManager, memJournal, memory.ProjectorOptions{ 329 + DraftResolver: draftResolver, 330 + }) 313 331 memOrch, err := memoryruntime.New(memManager, memJournal, memProjector, memoryruntime.OrchestratorOptions{}) 314 332 if err != nil { 315 333 return err ··· 339 357 sem := make(chan struct{}, maxConc) 340 358 341 359 groupTriggerMode := strings.ToLower(strings.TrimSpace(opts.GroupTriggerMode)) 360 + slackHistoryCap := slackHistoryCapForMode(groupTriggerMode) 342 361 addressingLLMTimeout := addressingRoute.ClientConfig.RequestTimeout 343 362 if addressingLLMTimeout <= 0 { 344 363 addressingLLMTimeout = requestTimeout ··· 482 501 model, 483 502 job, 484 503 h, 504 + slackHistoryCap, 485 505 sticky, 486 506 allowedChannels, 487 507 availableEmojiNames,
+11 -8
internal/channelruntime/slack/runtime_task.go
··· 47 47 model string, 48 48 job slackJob, 49 49 history []chathistory.ChatHistoryItem, 50 + historyCap int, 50 51 stickySkills []string, 51 52 allowedChannelIDs map[string]bool, 52 53 availableEmojiNames []string, ··· 181 182 182 183 if runtimeOpts.MemoryEnabled && runtimeOpts.MemoryOrchestrator != nil && memSubjectID != "" { 183 184 finalOutput := strings.TrimSpace(depsutil.FormatFinalOutput(final)) 185 + recordedAt := time.Now().UTC() 184 186 recordOffset, memErr := runtimeOpts.MemoryOrchestrator.Record(memoryruntime.RecordRequest{ 185 - TaskRunID: slackMemoryTaskRunID(job), 186 - SessionID: slackMemorySessionID(job), 187 - SubjectID: memSubjectID, 188 - Channel: "slack", 189 - Participants: slackMemoryParticipants(job), 190 - TaskText: task, 191 - FinalOutput: finalOutput, 192 - Draft: buildSlackMemoryDraft(finalOutput), 187 + TaskRunID: slackMemoryTaskRunID(job), 188 + SessionID: slackMemorySessionID(job), 189 + SubjectID: memSubjectID, 190 + Channel: "slack", 191 + Participants: slackMemoryParticipants(job), 192 + TaskText: task, 193 + FinalOutput: finalOutput, 194 + SourceHistory: buildSlackMemoryHistory(history, job, finalOutput, recordedAt, historyCap), 195 + SessionContext: slackMemorySessionContext(job), 193 196 }) 194 197 if memErr != nil { 195 198 if logger != nil {
+27 -226
internal/channelruntime/telegram/memory_flow.go
··· 1 1 package telegram 2 2 3 3 import ( 4 - "context" 5 4 "fmt" 6 5 "log/slog" 7 6 "strconv" ··· 11 10 "github.com/quailyquaily/mistermorph/agent" 12 11 "github.com/quailyquaily/mistermorph/internal/channelruntime/depsutil" 13 12 "github.com/quailyquaily/mistermorph/internal/chathistory" 14 - "github.com/quailyquaily/mistermorph/internal/jsonutil" 15 13 "github.com/quailyquaily/mistermorph/internal/memoryruntime" 16 - "github.com/quailyquaily/mistermorph/llm" 17 14 "github.com/quailyquaily/mistermorph/memory" 18 15 ) 19 16 20 - func recordMemoryFromJob(ctx context.Context, logger *slog.Logger, client llm.Client, model string, orchestrator *memoryruntime.Orchestrator, mgr *memory.Manager, job telegramJob, history []chathistory.ChatHistoryItem, historyCap int, final *agent.Final, requestTimeout time.Duration) error { 17 + func recordMemoryFromJob(logger *slog.Logger, orchestrator *memoryruntime.Orchestrator, job telegramJob, history []chathistory.ChatHistoryItem, historyCap int, final *agent.Final) error { 21 18 recordOffset, err := orchestrator.RecordWithAdapter(telegramMemoryRecordAdapter{ 22 - ctx: ctx, 23 - client: client, 24 - model: model, 25 - manager: mgr, 26 - job: job, 27 - history: history, 28 - historyCap: historyCap, 29 - final: final, 30 - requestTimeout: requestTimeout, 19 + job: job, 20 + history: history, 21 + historyCap: historyCap, 22 + final: final, 31 23 }) 32 24 if err != nil { 33 25 return err ··· 56 48 } 57 49 58 50 type telegramMemoryRecordAdapter struct { 59 - ctx context.Context 60 - client llm.Client 61 - model string 62 - manager *memory.Manager 63 - job telegramJob 64 - history []chathistory.ChatHistoryItem 65 - historyCap int 66 - final *agent.Final 67 - requestTimeout time.Duration 51 + job telegramJob 52 + history []chathistory.ChatHistoryItem 53 + historyCap int 54 + final *agent.Final 68 55 } 69 56 70 57 func (a telegramMemoryRecordAdapter) BuildRecordRequest() (memoryruntime.RecordRequest, error) { ··· 76 63 return memoryruntime.RecordRequest{}, fmt.Errorf("telegram task run id is required") 77 64 } 78 65 79 - ctxInfo := MemoryDraftContext{ 80 - SessionID: meta.SessionID, 81 - ChatID: a.job.ChatID, 82 - ChatType: a.job.ChatType, 83 - CounterpartyID: a.job.FromUserID, 66 + ctxInfo := memory.SessionContext{ 67 + ConversationType: strings.TrimSpace(a.job.ChatType), 84 68 CounterpartyName: strings.TrimSpace(a.job.FromDisplayName), 85 69 CounterpartyHandle: strings.TrimSpace(a.job.FromUsername), 86 - TimestampUTC: now.Format(time.RFC3339), 87 70 } 88 - if ctxInfo.CounterpartyName == "" { 89 - ctxInfo.CounterpartyName = strings.TrimSpace(strings.Join([]string{a.job.FromFirstName, a.job.FromLastName}, " ")) 90 - } 91 - 92 - _, existingContent, _, err := a.manager.LoadShortTerm(now, meta.SessionID) 93 - if err != nil { 94 - return memoryruntime.RecordRequest{}, err 71 + if a.job.ChatID != 0 { 72 + ctxInfo.ConversationID = strconv.FormatInt(a.job.ChatID, 10) 95 73 } 96 - 97 - memCtx := a.ctx 98 - if memCtx == nil { 99 - memCtx = context.Background() 74 + if a.job.FromUserID > 0 { 75 + ctxInfo.CounterpartyID = strconv.FormatInt(a.job.FromUserID, 10) 100 76 } 101 - cancel := func() {} 102 - if a.requestTimeout > 0 { 103 - memCtx, cancel = context.WithTimeout(memCtx, a.requestTimeout) 77 + if ctxInfo.CounterpartyName == "" { 78 + ctxInfo.CounterpartyName = strings.TrimSpace(strings.Join([]string{a.job.FromFirstName, a.job.FromLastName}, " ")) 104 79 } 105 - defer cancel() 106 80 ctxInfo.CounterpartyLabel = buildMemoryCounterpartyLabel(meta, ctxInfo) 107 81 108 82 draftHistory := buildMemoryDraftHistory(a.history, a.job, output, now, a.historyCap) 109 - draft, err := BuildMemoryDraft(memCtx, a.client, a.model, draftHistory, a.job.Text, output, existingContent, ctxInfo) 110 - if err != nil { 111 - return memoryruntime.RecordRequest{}, err 112 - } 113 - draft.Promote = EnforceLongTermPromotionRules(draft.Promote, nil, a.job.Text) 114 83 115 84 return memoryruntime.RecordRequest{ 116 - TaskRunID: taskRunID, 117 - SessionID: telegramMemorySessionID(a.job), 118 - SubjectID: telegramMemorySubjectID(a.job), 119 - Channel: "telegram", 120 - Participants: telegramMemoryParticipants(a.job), 121 - TaskText: strings.TrimSpace(a.job.Text), 122 - FinalOutput: output, 123 - Draft: draft, 85 + TaskRunID: taskRunID, 86 + SessionID: telegramMemorySessionID(a.job), 87 + SubjectID: telegramMemorySubjectID(a.job), 88 + Channel: "telegram", 89 + Participants: telegramMemoryParticipants(a.job), 90 + TaskText: strings.TrimSpace(a.job.Text), 91 + FinalOutput: output, 92 + SourceHistory: draftHistory, 93 + SessionContext: ctxInfo, 124 94 }, nil 125 95 } 126 96 ··· 231 201 return out 232 202 } 233 203 234 - type MemoryDraftContext struct { 235 - SessionID string `json:"session_id,omitempty"` 236 - ChatID int64 `json:"chat_id,omitempty"` 237 - ChatType string `json:"chat_type,omitempty"` 238 - CounterpartyID int64 `json:"counterparty_id,omitempty"` 239 - CounterpartyName string `json:"counterparty_name,omitempty"` 240 - CounterpartyHandle string `json:"counterparty_handle,omitempty"` 241 - CounterpartyLabel string `json:"counterparty_label,omitempty"` 242 - TimestampUTC string `json:"timestamp_utc,omitempty"` 243 - } 244 - 245 - func buildMemoryCounterpartyLabel(meta memory.WriteMeta, ctxInfo MemoryDraftContext) string { 204 + func buildMemoryCounterpartyLabel(meta memory.WriteMeta, ctxInfo memory.SessionContext) string { 246 205 contactID := firstNonEmptyString(meta.ContactIDs...) 247 206 if contactID == "" { 248 207 handle := strings.TrimPrefix(strings.TrimSpace(ctxInfo.CounterpartyHandle), "@") ··· 276 235 } 277 236 return "" 278 237 } 279 - 280 - func BuildMemoryDraft(ctx context.Context, client llm.Client, model string, history []chathistory.ChatHistoryItem, task string, output string, existing memory.ShortTermContent, ctxInfo MemoryDraftContext) (memory.SessionDraft, error) { 281 - if client == nil { 282 - return memory.SessionDraft{}, fmt.Errorf("nil llm client") 283 - } 284 - 285 - sys, user, err := renderMemoryDraftPrompts(ctxInfo, history, task, output, existing) 286 - if err != nil { 287 - return memory.SessionDraft{}, fmt.Errorf("render memory draft prompts: %w", err) 288 - } 289 - 290 - res, err := client.Chat(ctx, llm.Request{ 291 - Model: model, 292 - Scene: "memory.draft", 293 - ForceJSON: true, 294 - Messages: []llm.Message{ 295 - {Role: "system", Content: sys}, 296 - {Role: "user", Content: user}, 297 - }, 298 - Parameters: map[string]any{ 299 - "max_tokens": 10240, 300 - }, 301 - }) 302 - if err != nil { 303 - return memory.SessionDraft{}, err 304 - } 305 - 306 - raw := strings.TrimSpace(res.Text) 307 - if raw == "" { 308 - return memory.SessionDraft{}, fmt.Errorf("empty memory draft response") 309 - } 310 - 311 - var out memory.SessionDraft 312 - if err := jsonutil.DecodeWithFallback(raw, &out); err != nil { 313 - return memory.SessionDraft{}, fmt.Errorf("invalid memory draft json") 314 - } 315 - out.SummaryItems = normalizeMemorySummaryItems(out.SummaryItems) 316 - return out, nil 317 - } 318 - 319 - func EnforceLongTermPromotionRules(promote memory.PromoteDraft, history []llm.Message, task string) memory.PromoteDraft { 320 - if !hasExplicitMemoryRequest(history, task) { 321 - return memory.PromoteDraft{} 322 - } 323 - return limitPromoteToOne(promote) 324 - } 325 - 326 - func hasExplicitMemoryRequest(history []llm.Message, task string) bool { 327 - texts := make([]string, 0, len(history)+1) 328 - for _, m := range history { 329 - if strings.ToLower(strings.TrimSpace(m.Role)) != "user" { 330 - continue 331 - } 332 - content := strings.TrimSpace(m.Content) 333 - if content == "" { 334 - continue 335 - } 336 - texts = append(texts, content) 337 - } 338 - if strings.TrimSpace(task) != "" { 339 - texts = append(texts, task) 340 - } 341 - if len(texts) == 0 { 342 - return false 343 - } 344 - combined := strings.ToLower(strings.Join(texts, "\n")) 345 - return containsExplicitMemoryRequest(combined) 346 - } 347 - 348 - func containsExplicitMemoryRequest(lowerText string) bool { 349 - if strings.TrimSpace(lowerText) == "" { 350 - return false 351 - } 352 - keywords := []string{ 353 - "记住", 354 - "记下来", 355 - "别忘", 356 - "记得", 357 - "长期记忆", 358 - "写入长期记忆", 359 - "加入长期记忆", 360 - "记到长期", 361 - "remember", 362 - "don't forget", 363 - "dont forget", 364 - "long-term memory", 365 - "add to memory", 366 - "save this", 367 - "keep this", 368 - "store this", 369 - "memorize", 370 - } 371 - for _, k := range keywords { 372 - if strings.Contains(lowerText, k) { 373 - return true 374 - } 375 - } 376 - return false 377 - } 378 - 379 - func limitPromoteToOne(promote memory.PromoteDraft) memory.PromoteDraft { 380 - if item, ok := firstNonEmptyText(promote.GoalsProjects); ok { 381 - return memory.PromoteDraft{GoalsProjects: []string{item}} 382 - } 383 - if item, ok := firstKVItem(promote.KeyFacts); ok { 384 - return memory.PromoteDraft{KeyFacts: []memory.KVItem{item}} 385 - } 386 - return memory.PromoteDraft{} 387 - } 388 - 389 - func firstNonEmptyText(items []string) (string, bool) { 390 - for _, raw := range items { 391 - v := strings.TrimSpace(raw) 392 - if v == "" { 393 - continue 394 - } 395 - return v, true 396 - } 397 - return "", false 398 - } 399 - 400 - func firstKVItem(items []memory.KVItem) (memory.KVItem, bool) { 401 - for _, it := range items { 402 - title := strings.TrimSpace(it.Title) 403 - value := strings.TrimSpace(it.Value) 404 - if title == "" && value == "" { 405 - continue 406 - } 407 - it.Title = title 408 - it.Value = value 409 - return it, true 410 - } 411 - return memory.KVItem{}, false 412 - } 413 - 414 - func normalizeMemorySummaryItems(input []string) []string { 415 - if len(input) == 0 { 416 - return nil 417 - } 418 - out := make([]string, 0, len(input)) 419 - seen := make(map[string]bool, len(input)) 420 - for _, raw := range input { 421 - v := strings.TrimSpace(raw) 422 - if v == "" { 423 - continue 424 - } 425 - key := strings.ToLower(v) 426 - if seen[key] { 427 - continue 428 - } 429 - seen[key] = true 430 - out = append(out, v) 431 - } 432 - if len(out) == 0 { 433 - return nil 434 - } 435 - return out 436 - }
+3 -3
internal/channelruntime/telegram/memory_flow_test.go
··· 85 85 got := buildMemoryCounterpartyLabel(memory.WriteMeta{ 86 86 ContactIDs: []string{"tg:@alice"}, 87 87 ContactNicknames: []string{"Alice"}, 88 - }, MemoryDraftContext{}) 88 + }, memory.SessionContext{}) 89 89 if got != "[Alice](tg:@alice)" { 90 90 t.Fatalf("counterparty_label = %q, want %q", got, "[Alice](tg:@alice)") 91 91 } 92 92 }) 93 93 94 94 t.Run("falls back to handle in display ref style", func(t *testing.T) { 95 - got := buildMemoryCounterpartyLabel(memory.WriteMeta{}, MemoryDraftContext{ 95 + got := buildMemoryCounterpartyLabel(memory.WriteMeta{}, memory.SessionContext{ 96 96 CounterpartyHandle: "@alice", 97 97 }) 98 98 if got != "[alice](tg:@alice)" { ··· 101 101 }) 102 102 103 103 t.Run("keeps plain nickname when no reference id exists", func(t *testing.T) { 104 - got := buildMemoryCounterpartyLabel(memory.WriteMeta{}, MemoryDraftContext{ 104 + got := buildMemoryCounterpartyLabel(memory.WriteMeta{}, memory.SessionContext{ 105 105 CounterpartyName: "Alice", 106 106 }) 107 107 if got != "Alice" {
+6 -6
internal/channelruntime/telegram/memory_prompts.go internal/memoryruntime/draft_prompts.go
··· 1 - package telegram 1 + package memoryruntime 2 2 3 3 import ( 4 4 _ "embed" ··· 26 26 }, 27 27 } 28 28 29 - var memoryDraftSystemPromptTemplate = prompttmpl.MustParse("telegram_memory_draft_system", memoryDraftSystemPromptTemplateSource, nil) 30 - var memoryDraftUserPromptTemplate = prompttmpl.MustParse("telegram_memory_draft_user", memoryDraftUserPromptTemplateSource, memoryPromptTemplateFuncs) 29 + var memoryDraftSystemPromptTemplate = prompttmpl.MustParse("memoryruntime_memory_draft_system", memoryDraftSystemPromptTemplateSource, nil) 30 + var memoryDraftUserPromptTemplate = prompttmpl.MustParse("memoryruntime_memory_draft_user", memoryDraftUserPromptTemplateSource, memoryPromptTemplateFuncs) 31 31 32 32 type memoryDraftUserPromptData struct { 33 - SessionContext MemoryDraftContext 33 + SessionContext memory.SessionContext 34 34 ChatHistoryMessages []chathistory.ChatHistoryItem 35 35 CurrentTask string 36 36 CurrentOutput string ··· 40 40 const memoryDraftExistingSummaryItemsLimit = 5 41 41 42 42 func renderMemoryDraftPrompts( 43 - ctxInfo MemoryDraftContext, 43 + ctxInfo memory.SessionContext, 44 44 history []chathistory.ChatHistoryItem, 45 45 task string, 46 46 output string, ··· 52 52 } 53 53 userPrompt, err := prompttmpl.Render(memoryDraftUserPromptTemplate, memoryDraftUserPromptData{ 54 54 SessionContext: ctxInfo, 55 - ChatHistoryMessages: chathistory.BuildMessages(chathistory.ChannelTelegram, history), 55 + ChatHistoryMessages: chathistory.BuildMessages("", history), 56 56 CurrentTask: task, 57 57 CurrentOutput: output, 58 58 ExistingSummaryItems: recentSummaryItems(existing.SummaryItems, memoryDraftExistingSummaryItemsLimit),
+34 -2
internal/channelruntime/telegram/memory_prompts_test.go internal/memoryruntime/draft_test.go
··· 1 - package telegram 1 + package memoryruntime 2 2 3 3 import ( 4 4 "encoding/json" ··· 61 61 } 62 62 63 63 _, userPrompt, err := renderMemoryDraftPrompts( 64 - MemoryDraftContext{}, 64 + memory.SessionContext{}, 65 65 nil, 66 66 "say hi", 67 67 "hi", ··· 86 86 } 87 87 } 88 88 } 89 + 90 + func TestEnforceLongTermPromotionRules(t *testing.T) { 91 + t.Parallel() 92 + 93 + promote := memory.PromoteDraft{ 94 + GoalsProjects: []string{"remember goal", "extra"}, 95 + KeyFacts: []memory.KVItem{ 96 + {Title: "Project", Value: "mistermorph"}, 97 + }, 98 + } 99 + 100 + t.Run("drops promotion without explicit memory request", func(t *testing.T) { 101 + t.Parallel() 102 + 103 + got := EnforceLongTermPromotionRules(promote, nil, "just replying") 104 + if len(got.GoalsProjects) != 0 || len(got.KeyFacts) != 0 { 105 + t.Fatalf("got = %#v, want empty promote", got) 106 + } 107 + }) 108 + 109 + t.Run("keeps only one promoted item with explicit request", func(t *testing.T) { 110 + t.Parallel() 111 + 112 + got := EnforceLongTermPromotionRules(promote, nil, "remember this for later") 113 + if len(got.GoalsProjects) != 1 || got.GoalsProjects[0] != "remember goal" { 114 + t.Fatalf("goals = %#v, want [\"remember goal\"]", got.GoalsProjects) 115 + } 116 + if len(got.KeyFacts) != 0 { 117 + t.Fatalf("key facts = %#v, want empty", got.KeyFacts) 118 + } 119 + }) 120 + }
internal/channelruntime/telegram/prompts/memory_draft_system.md internal/memoryruntime/prompts/memory_draft_system.md
internal/channelruntime/telegram/prompts/memory_draft_user.md internal/memoryruntime/prompts/memory_draft_user.md
+19 -2
internal/channelruntime/telegram/runtime.go
··· 29 29 "github.com/quailyquaily/mistermorph/internal/memoryruntime" 30 30 "github.com/quailyquaily/mistermorph/internal/statepaths" 31 31 "github.com/quailyquaily/mistermorph/internal/telegramutil" 32 + "github.com/quailyquaily/mistermorph/llm" 32 33 "github.com/quailyquaily/mistermorph/memory" 33 34 "github.com/quailyquaily/mistermorph/tools" 34 35 telegramtools "github.com/quailyquaily/mistermorph/tools/telegram" ··· 295 296 var memManager *memory.Manager 296 297 var memProjectionWorker *memoryruntime.ProjectionWorker 297 298 if opts.MemoryEnabled { 299 + draftResolver, err := memoryruntime.NewConfiguredDraftResolver(memoryruntime.DraftResolverFactoryOptions{ 300 + ResolveLLMRoute: d.ResolveLLMRoute, 301 + CreateLLMClient: d.CreateLLMClient, 302 + DecorateClient: func(client llm.Client, route llmutil.ResolvedRoute) llm.Client { 303 + return llminspect.WrapClient(client, llminspect.ClientOptions{ 304 + PromptInspector: promptInspector, 305 + RequestInspector: requestInspector, 306 + APIBase: route.ClientConfig.Endpoint, 307 + Model: strings.TrimSpace(route.ClientConfig.Model), 308 + }) 309 + }, 310 + }) 311 + if err != nil { 312 + return err 313 + } 298 314 memManager = memory.NewManager(statepaths.MemoryDir(), opts.MemoryShortTermDays) 299 315 memJournal := memManager.NewJournal(memory.JournalOptions{}) 300 - memProjector := memory.NewProjector(memManager, memJournal, memory.ProjectorOptions{}) 316 + memProjector := memory.NewProjector(memManager, memJournal, memory.ProjectorOptions{ 317 + DraftResolver: draftResolver, 318 + }) 301 319 memOrch, err := memoryruntime.New(memManager, memJournal, memProjector, memoryruntime.OrchestratorOptions{}) 302 320 if err != nil { 303 321 return err ··· 319 337 ImageRecognitionEnabled: opts.ImageRecognitionEnabled, 320 338 PlanCreateClient: planClient, 321 339 PlanCreateModel: planModel, 322 - MemoryManager: memManager, 323 340 MemoryOrchestrator: memOrchestrator, 324 341 MemoryProjectionWorker: memProjectionWorker, 325 342 }
+1 -10
internal/channelruntime/telegram/runtime_task.go
··· 4 4 "bytes" 5 5 "context" 6 6 "encoding/base64" 7 - "errors" 8 7 "fmt" 9 8 "image" 10 9 _ "image/gif" ··· 26 25 "github.com/quailyquaily/mistermorph/internal/llmstats" 27 26 "github.com/quailyquaily/mistermorph/internal/memoryruntime" 28 27 "github.com/quailyquaily/mistermorph/internal/promptprofile" 29 - "github.com/quailyquaily/mistermorph/internal/retryutil" 30 28 "github.com/quailyquaily/mistermorph/internal/todo" 31 29 "github.com/quailyquaily/mistermorph/internal/toolsutil" 32 30 "github.com/quailyquaily/mistermorph/llm" 33 - "github.com/quailyquaily/mistermorph/memory" 34 31 "github.com/quailyquaily/mistermorph/tools" 35 32 telegramtools "github.com/quailyquaily/mistermorph/tools/telegram" 36 33 ) ··· 41 38 ImageRecognitionEnabled bool 42 39 PlanCreateClient llm.Client 43 40 PlanCreateModel string 44 - MemoryManager *memory.Manager 45 41 MemoryOrchestrator *memoryruntime.Orchestrator 46 42 MemoryProjectionWorker *memoryruntime.ProjectionWorker 47 43 } ··· 203 199 204 200 publishText := shouldPublishTelegramText(final) 205 201 if shouldWriteMemory(publishText, runtimeOpts.MemoryOrchestrator, memSubjectID) { 206 - if err := recordMemoryFromJob(ctx, logger, client, model, runtimeOpts.MemoryOrchestrator, runtimeOpts.MemoryManager, job, history, historyCap, final, requestTimeout); err != nil { 207 - if errors.Is(err, context.DeadlineExceeded) { 208 - retryutil.AsyncRetry(logger, "memory_update", 2*time.Second, requestTimeout, func(retryCtx context.Context) error { 209 - return recordMemoryFromJob(retryCtx, logger, client, model, runtimeOpts.MemoryOrchestrator, runtimeOpts.MemoryManager, job, history, historyCap, final, requestTimeout) 210 - }) 211 - } 202 + if err := recordMemoryFromJob(logger, runtimeOpts.MemoryOrchestrator, job, history, historyCap, final); err != nil { 212 203 logger.Warn("memory_update_error", "error", err.Error()) 213 204 } else if runtimeOpts.MemoryProjectionWorker != nil { 214 205 runtimeOpts.MemoryProjectionWorker.NotifyRecordAppended()
+37 -3
internal/llmutil/llmutil_test.go
··· 199 199 } 200 200 } 201 201 202 + func TestResolveRoute_MemoryDraftPurpose(t *testing.T) { 203 + values := RuntimeValues{ 204 + Provider: "openai", 205 + Endpoint: "https://api.openai.com", 206 + APIKey: "base-key", 207 + Model: "gpt-5.2", 208 + RequestTimeoutRaw: "90s", 209 + Profiles: map[string]ProfileConfig{ 210 + "memory": { 211 + Model: "gpt-4.1-mini", 212 + }, 213 + }, 214 + Routes: RoutesConfig{ 215 + PurposeRoutes: PurposeRoutes{ 216 + MemoryDraft: "memory", 217 + }, 218 + }, 219 + } 220 + resolved, err := ResolveRoute(values, RoutePurposeMemoryDraft) 221 + if err != nil { 222 + t.Fatalf("ResolveRoute() error = %v", err) 223 + } 224 + if resolved.Profile != "memory" { 225 + t.Fatalf("profile = %q, want memory", resolved.Profile) 226 + } 227 + if resolved.ClientConfig.Model != "gpt-4.1-mini" { 228 + t.Fatalf("model = %q, want gpt-4.1-mini", resolved.ClientConfig.Model) 229 + } 230 + } 231 + 202 232 func TestResolveRoute_TopLevelAPIKeyRef(t *testing.T) { 203 233 t.Setenv("OPENAI_API_KEY", "env-openai-key") 204 234 values := RuntimeValues{ ··· 350 380 }, 351 381 }) 352 382 v.Set("llm.routes", map[string]any{ 353 - "main_loop": "default", 354 - "addressing": "cheap", 355 - "plan_create": "reasoning", 383 + "main_loop": "default", 384 + "addressing": "cheap", 385 + "plan_create": "reasoning", 386 + "memory_draft": "cheap", 356 387 }) 357 388 358 389 values := RuntimeValuesFromReader(v) ··· 373 404 } 374 405 if values.Routes.MainLoop != "default" { 375 406 t.Fatalf("main loop route = %q, want default", values.Routes.MainLoop) 407 + } 408 + if values.Routes.MemoryDraft != "cheap" { 409 + t.Fatalf("memory draft route = %q, want cheap", values.Routes.MemoryDraft) 376 410 } 377 411 }
+15 -10
internal/llmutil/routes.go
··· 9 9 ) 10 10 11 11 const ( 12 - RoutePurposeMainLoop = "main_loop" 13 - RoutePurposeAddressing = "addressing" 14 - RoutePurposeHeartbeat = "heartbeat" 15 - RoutePurposePlanCreate = "plan_create" 16 - RouteProfileDefault = "default" 12 + RoutePurposeMainLoop = "main_loop" 13 + RoutePurposeAddressing = "addressing" 14 + RoutePurposeHeartbeat = "heartbeat" 15 + RoutePurposePlanCreate = "plan_create" 16 + RoutePurposeMemoryDraft = "memory_draft" 17 + RouteProfileDefault = "default" 17 18 ) 18 19 19 20 type ProfileConfig struct { ··· 46 47 } 47 48 48 49 type PurposeRoutes struct { 49 - MainLoop string `mapstructure:"main_loop"` 50 - Addressing string `mapstructure:"addressing"` 51 - Heartbeat string `mapstructure:"heartbeat"` 52 - PlanCreate string `mapstructure:"plan_create"` 50 + MainLoop string `mapstructure:"main_loop"` 51 + Addressing string `mapstructure:"addressing"` 52 + Heartbeat string `mapstructure:"heartbeat"` 53 + PlanCreate string `mapstructure:"plan_create"` 54 + MemoryDraft string `mapstructure:"memory_draft"` 53 55 } 54 56 55 57 type RoutesConfig struct { ··· 185 187 cfg.Addressing = strings.TrimSpace(cfg.Addressing) 186 188 cfg.Heartbeat = strings.TrimSpace(cfg.Heartbeat) 187 189 cfg.PlanCreate = strings.TrimSpace(cfg.PlanCreate) 190 + cfg.MemoryDraft = strings.TrimSpace(cfg.MemoryDraft) 188 191 return cfg 189 192 } 190 193 ··· 202 205 return strings.TrimSpace(routes.Heartbeat) 203 206 case RoutePurposePlanCreate: 204 207 return strings.TrimSpace(routes.PlanCreate) 208 + case RoutePurposeMemoryDraft: 209 + return strings.TrimSpace(routes.MemoryDraft) 205 210 default: 206 211 return "" 207 212 } ··· 213 218 214 219 func isSupportedRoutePurpose(purpose string) bool { 215 220 switch purpose { 216 - case RoutePurposeMainLoop, RoutePurposeAddressing, RoutePurposeHeartbeat, RoutePurposePlanCreate: 221 + case RoutePurposeMainLoop, RoutePurposeAddressing, RoutePurposeHeartbeat, RoutePurposePlanCreate, RoutePurposeMemoryDraft: 217 222 return true 218 223 default: 219 224 return false
+180
internal/memoryruntime/draft.go
··· 1 + package memoryruntime 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "strings" 7 + 8 + "github.com/quailyquaily/mistermorph/internal/chathistory" 9 + "github.com/quailyquaily/mistermorph/internal/jsonutil" 10 + "github.com/quailyquaily/mistermorph/llm" 11 + "github.com/quailyquaily/mistermorph/memory" 12 + ) 13 + 14 + type DraftRequest struct { 15 + Client llm.Client 16 + Model string 17 + History []chathistory.ChatHistoryItem 18 + Task string 19 + Output string 20 + Existing memory.ShortTermContent 21 + SessionContext memory.SessionContext 22 + } 23 + 24 + func BuildLLMDraft(ctx context.Context, req DraftRequest) (memory.SessionDraft, error) { 25 + if req.Client == nil { 26 + return memory.SessionDraft{}, fmt.Errorf("nil llm client") 27 + } 28 + 29 + sys, user, err := renderMemoryDraftPrompts(req.SessionContext, req.History, req.Task, req.Output, req.Existing) 30 + if err != nil { 31 + return memory.SessionDraft{}, fmt.Errorf("render memory draft prompts: %w", err) 32 + } 33 + 34 + res, err := req.Client.Chat(ctx, llm.Request{ 35 + Model: strings.TrimSpace(req.Model), 36 + Scene: "memory.draft", 37 + ForceJSON: true, 38 + Messages: []llm.Message{ 39 + {Role: "system", Content: sys}, 40 + {Role: "user", Content: user}, 41 + }, 42 + Parameters: map[string]any{ 43 + "max_tokens": 10240, 44 + }, 45 + }) 46 + if err != nil { 47 + return memory.SessionDraft{}, err 48 + } 49 + 50 + raw := strings.TrimSpace(res.Text) 51 + if raw == "" { 52 + return memory.SessionDraft{}, fmt.Errorf("empty memory draft response") 53 + } 54 + 55 + var out memory.SessionDraft 56 + if err := jsonutil.DecodeWithFallback(raw, &out); err != nil { 57 + return memory.SessionDraft{}, fmt.Errorf("invalid memory draft json") 58 + } 59 + out.SummaryItems = normalizeSummaryItems(out.SummaryItems) 60 + return out, nil 61 + } 62 + 63 + func normalizeSummaryItems(items []string) []string { 64 + if len(items) == 0 { 65 + return nil 66 + } 67 + out := make([]string, 0, len(items)) 68 + seen := make(map[string]bool, len(items)) 69 + for _, raw := range items { 70 + item := strings.TrimSpace(raw) 71 + if item == "" { 72 + continue 73 + } 74 + key := strings.ToLower(item) 75 + if seen[key] { 76 + continue 77 + } 78 + seen[key] = true 79 + out = append(out, item) 80 + } 81 + if len(out) == 0 { 82 + return nil 83 + } 84 + return out 85 + } 86 + 87 + func EnforceLongTermPromotionRules(promote memory.PromoteDraft, history []llm.Message, task string) memory.PromoteDraft { 88 + if !hasExplicitMemoryRequest(history, task) { 89 + return memory.PromoteDraft{} 90 + } 91 + return limitPromoteToOne(promote) 92 + } 93 + 94 + func hasExplicitMemoryRequest(history []llm.Message, task string) bool { 95 + texts := make([]string, 0, len(history)+1) 96 + for _, m := range history { 97 + if strings.ToLower(strings.TrimSpace(m.Role)) != "user" { 98 + continue 99 + } 100 + content := strings.TrimSpace(m.Content) 101 + if content == "" { 102 + continue 103 + } 104 + texts = append(texts, content) 105 + } 106 + if strings.TrimSpace(task) != "" { 107 + texts = append(texts, task) 108 + } 109 + if len(texts) == 0 { 110 + return false 111 + } 112 + combined := strings.ToLower(strings.Join(texts, "\n")) 113 + return containsExplicitMemoryRequest(combined) 114 + } 115 + 116 + func containsExplicitMemoryRequest(lowerText string) bool { 117 + if strings.TrimSpace(lowerText) == "" { 118 + return false 119 + } 120 + keywords := []string{ 121 + "记住", 122 + "记下来", 123 + "别忘", 124 + "记得", 125 + "长期记忆", 126 + "写入长期记忆", 127 + "加入长期记忆", 128 + "记到长期", 129 + "remember", 130 + "don't forget", 131 + "dont forget", 132 + "long-term memory", 133 + "add to memory", 134 + "save this", 135 + "keep this", 136 + "store this", 137 + "memorize", 138 + } 139 + for _, k := range keywords { 140 + if strings.Contains(lowerText, k) { 141 + return true 142 + } 143 + } 144 + return false 145 + } 146 + 147 + func limitPromoteToOne(promote memory.PromoteDraft) memory.PromoteDraft { 148 + if item, ok := firstNonEmptyText(promote.GoalsProjects); ok { 149 + return memory.PromoteDraft{GoalsProjects: []string{item}} 150 + } 151 + if item, ok := firstKVItem(promote.KeyFacts); ok { 152 + return memory.PromoteDraft{KeyFacts: []memory.KVItem{item}} 153 + } 154 + return memory.PromoteDraft{} 155 + } 156 + 157 + func firstNonEmptyText(items []string) (string, bool) { 158 + for _, raw := range items { 159 + v := strings.TrimSpace(raw) 160 + if v == "" { 161 + continue 162 + } 163 + return v, true 164 + } 165 + return "", false 166 + } 167 + 168 + func firstKVItem(items []memory.KVItem) (memory.KVItem, bool) { 169 + for _, it := range items { 170 + title := strings.TrimSpace(it.Title) 171 + value := strings.TrimSpace(it.Value) 172 + if title == "" && value == "" { 173 + continue 174 + } 175 + it.Title = title 176 + it.Value = value 177 + return it, true 178 + } 179 + return memory.KVItem{}, false 180 + }
+29 -80
internal/memoryruntime/orchestrator.go
··· 6 6 "time" 7 7 8 8 "github.com/google/uuid" 9 + "github.com/quailyquaily/mistermorph/internal/chathistory" 9 10 "github.com/quailyquaily/mistermorph/memory" 10 11 ) 11 12 ··· 22 23 newEvent func() string 23 24 } 24 25 26 + const journalSourceHistoryCap = 3 27 + 25 28 type PrepareInjectionRequest struct { 26 29 SubjectID string 27 30 RequestContext memory.RequestContext ··· 29 32 } 30 33 31 34 type RecordRequest struct { 32 - TaskRunID string 33 - TSUTC string 34 - SessionID string 35 - SubjectID string 36 - Channel string 37 - Participants []memory.MemoryParticipant 38 - TaskText string 39 - FinalOutput string 40 - Draft memory.SessionDraft 35 + TaskRunID string 36 + TSUTC string 37 + SessionID string 38 + SubjectID string 39 + Channel string 40 + Participants []memory.MemoryParticipant 41 + TaskText string 42 + FinalOutput string 43 + SourceHistory []chathistory.ChatHistoryItem 44 + SessionContext memory.SessionContext 41 45 } 42 46 43 47 func New(manager *memory.Manager, journal *memory.Journal, projector *memory.Projector, opts OrchestratorOptions) (*Orchestrator, error) { ··· 75 79 tsUTC = o.now().UTC().Format(time.RFC3339) 76 80 } 77 81 event := memory.MemoryEvent{ 78 - SchemaVersion: memory.CurrentMemoryEventSchemaVersion, 79 - EventID: strings.TrimSpace(o.newEvent()), 80 - TaskRunID: strings.TrimSpace(req.TaskRunID), 81 - TSUTC: tsUTC, 82 - SessionID: strings.TrimSpace(req.SessionID), 83 - SubjectID: strings.TrimSpace(req.SubjectID), 84 - Channel: strings.TrimSpace(req.Channel), 85 - Participants: append([]memory.MemoryParticipant(nil), req.Participants...), 86 - TaskText: strings.TrimSpace(req.TaskText), 87 - FinalOutput: strings.TrimSpace(req.FinalOutput), 88 - DraftSummaryItems: normalizeDraftSummaryItems(req.Draft.SummaryItems), 89 - DraftPromote: normalizePromoteDraft(req.Draft.Promote), 82 + SchemaVersion: memory.CurrentMemoryEventSchemaVersion, 83 + EventID: strings.TrimSpace(o.newEvent()), 84 + TaskRunID: strings.TrimSpace(req.TaskRunID), 85 + TSUTC: tsUTC, 86 + SessionID: strings.TrimSpace(req.SessionID), 87 + SubjectID: strings.TrimSpace(req.SubjectID), 88 + Channel: strings.TrimSpace(req.Channel), 89 + Participants: append([]memory.MemoryParticipant(nil), req.Participants...), 90 + TaskText: strings.TrimSpace(req.TaskText), 91 + FinalOutput: strings.TrimSpace(req.FinalOutput), 92 + SourceHistory: capChatHistoryItems(req.SourceHistory, journalSourceHistoryCap), 93 + SessionContext: req.SessionContext, 90 94 } 91 95 return o.journal.Append(event) 92 96 } 93 97 94 - func normalizeDraftSummaryItems(items []string) []string { 98 + func capChatHistoryItems(items []chathistory.ChatHistoryItem, limit int) []chathistory.ChatHistoryItem { 95 99 if len(items) == 0 { 96 100 return nil 97 101 } 98 - out := make([]string, 0, len(items)) 99 - seen := make(map[string]bool, len(items)) 100 - for _, raw := range items { 101 - item := strings.TrimSpace(raw) 102 - if item == "" { 103 - continue 104 - } 105 - key := strings.ToLower(item) 106 - if seen[key] { 107 - continue 108 - } 109 - seen[key] = true 110 - out = append(out, item) 102 + if limit > 0 && len(items) > limit { 103 + items = items[len(items)-limit:] 111 104 } 112 - if len(out) == 0 { 113 - return nil 114 - } 115 - return out 116 - } 117 - 118 - func normalizePromoteDraft(promote memory.PromoteDraft) memory.PromoteDraft { 119 - goals := make([]string, 0, len(promote.GoalsProjects)) 120 - goalSeen := make(map[string]bool, len(promote.GoalsProjects)) 121 - for _, raw := range promote.GoalsProjects { 122 - goal := strings.TrimSpace(raw) 123 - if goal == "" { 124 - continue 125 - } 126 - key := strings.ToLower(goal) 127 - if goalSeen[key] { 128 - continue 129 - } 130 - goalSeen[key] = true 131 - goals = append(goals, goal) 132 - } 133 - 134 - facts := make([]memory.KVItem, 0, len(promote.KeyFacts)) 135 - factSeen := make(map[string]bool, len(promote.KeyFacts)) 136 - for _, kv := range promote.KeyFacts { 137 - title := strings.TrimSpace(kv.Title) 138 - value := strings.TrimSpace(kv.Value) 139 - if title == "" && value == "" { 140 - continue 141 - } 142 - key := strings.ToLower(title + "|" + value) 143 - if factSeen[key] { 144 - continue 145 - } 146 - factSeen[key] = true 147 - facts = append(facts, memory.KVItem{Title: title, Value: value}) 148 - } 149 - 150 - if len(goals) == 0 && len(facts) == 0 { 151 - return memory.PromoteDraft{} 152 - } 153 - return memory.PromoteDraft{ 154 - GoalsProjects: goals, 155 - KeyFacts: facts, 156 - } 105 + return append([]chathistory.ChatHistoryItem(nil), items...) 157 106 }
+78 -13
internal/memoryruntime/orchestrator_test.go
··· 7 7 "testing" 8 8 "time" 9 9 10 + "github.com/quailyquaily/mistermorph/internal/chathistory" 10 11 "github.com/quailyquaily/mistermorph/memory" 11 12 ) 12 13 ··· 14 15 root := t.TempDir() 15 16 mgr := memory.NewManager(root, 7) 16 17 j := mgr.NewJournal(memory.JournalOptions{MaxFileBytes: 1 << 20}) 17 - p := memory.NewProjector(mgr, j, memory.ProjectorOptions{CheckpointBatch: 10}) 18 + p := memory.NewProjector(mgr, j, memory.ProjectorOptions{ 19 + CheckpointBatch: 10, 20 + DraftResolver: stubDraftResolver{ 21 + draft: memory.SessionDraft{ 22 + SummaryItems: []string{"one", "Two"}, 23 + }, 24 + }, 25 + }) 18 26 19 27 now := mustRFC3339(t, "2026-03-01T09:10:00Z") 20 28 o, err := New(mgr, j, p, OrchestratorOptions{ ··· 31 39 SubjectID: "tg--1001", 32 40 Channel: "telegram", 33 41 TaskText: "hello", 34 - Draft: memory.SessionDraft{ 35 - SummaryItems: []string{" one ", "one", "", "Two"}, 36 - Promote: memory.PromoteDraft{ 37 - GoalsProjects: []string{" keep sync ", "", "keep sync"}, 38 - }, 42 + SourceHistory: []chathistory.ChatHistoryItem{{ 43 + Channel: chathistory.ChannelTelegram, 44 + Kind: chathistory.KindInboundUser, 45 + Text: "ping", 46 + }}, 47 + SessionContext: memory.SessionContext{ 48 + ConversationID: "1001", 39 49 }, 40 50 }) 41 51 if err != nil { ··· 52 62 if rec.Event.TSUTC != now.UTC().Format(time.RFC3339) { 53 63 t.Fatalf("ts_utc = %q, want %q", rec.Event.TSUTC, now.UTC().Format(time.RFC3339)) 54 64 } 55 - if got := strings.Join(rec.Event.DraftSummaryItems, "|"); got != "one|Two" { 56 - t.Fatalf("draft_summary_items = %q, want one|Two", got) 65 + if len(rec.Event.SourceHistory) != 1 || rec.Event.SourceHistory[0].Text != "ping" { 66 + t.Fatalf("source_history = %#v, want one ping message", rec.Event.SourceHistory) 57 67 } 58 - if len(rec.Event.DraftPromote.GoalsProjects) != 1 || rec.Event.DraftPromote.GoalsProjects[0] != "keep sync" { 59 - t.Fatalf("draft_promote.goals = %#v, want [keep sync]", rec.Event.DraftPromote.GoalsProjects) 68 + if rec.Event.SessionContext.ConversationID != "1001" { 69 + t.Fatalf("session_context.conversation_id = %q, want 1001", rec.Event.SessionContext.ConversationID) 60 70 } 61 71 return nil 62 72 }) ··· 152 162 SubjectID: "heartbeat", 153 163 Channel: "heartbeat", 154 164 TaskText: "tick", 155 - Draft: memory.SessionDraft{ 156 - SummaryItems: []string{"heartbeat ok"}, 157 - }, 158 165 }, 159 166 }) 160 167 if err != nil { ··· 174 181 } 175 182 } 176 183 184 + func TestRecordCapsJournalSourceHistoryToLatestThree(t *testing.T) { 185 + root := t.TempDir() 186 + mgr := memory.NewManager(root, 7) 187 + j := mgr.NewJournal(memory.JournalOptions{MaxFileBytes: 1 << 20}) 188 + p := memory.NewProjector(mgr, j, memory.ProjectorOptions{CheckpointBatch: 10}) 189 + o, err := New(mgr, j, p, OrchestratorOptions{}) 190 + if err != nil { 191 + t.Fatalf("New() error = %v", err) 192 + } 193 + 194 + _, err = o.Record(RecordRequest{ 195 + TaskRunID: "run_cap", 196 + SessionID: "tg--cap", 197 + SubjectID: "tg--cap", 198 + Channel: "telegram", 199 + TaskText: "cap", 200 + SourceHistory: []chathistory.ChatHistoryItem{ 201 + {Kind: chathistory.KindInboundUser, Text: "one"}, 202 + {Kind: chathistory.KindInboundUser, Text: "two"}, 203 + {Kind: chathistory.KindInboundUser, Text: "three"}, 204 + {Kind: chathistory.KindInboundUser, Text: "four"}, 205 + {Kind: chathistory.KindOutboundAgent, Text: "five"}, 206 + }, 207 + }) 208 + if err != nil { 209 + t.Fatalf("Record() error = %v", err) 210 + } 211 + 212 + _, _, err = j.ReplayFrom(memory.JournalOffset{}, 10, func(rec memory.JournalRecord) error { 213 + if len(rec.Event.SourceHistory) != 3 { 214 + t.Fatalf("len(source_history) = %d, want 3", len(rec.Event.SourceHistory)) 215 + } 216 + if rec.Event.SourceHistory[0].Text != "three" || rec.Event.SourceHistory[1].Text != "four" || rec.Event.SourceHistory[2].Text != "five" { 217 + t.Fatalf("source_history texts = %#v, want [three four five]", []string{ 218 + rec.Event.SourceHistory[0].Text, 219 + rec.Event.SourceHistory[1].Text, 220 + rec.Event.SourceHistory[2].Text, 221 + }) 222 + } 223 + return nil 224 + }) 225 + if err != nil { 226 + t.Fatalf("ReplayFrom() error = %v", err) 227 + } 228 + } 229 + 177 230 type fakeInjectionAdapter struct { 178 231 subjectID string 179 232 reqCtx memory.RequestContext ··· 197 250 return RecordRequest{}, f.err 198 251 } 199 252 return f.req, nil 253 + } 254 + 255 + type stubDraftResolver struct { 256 + draft memory.SessionDraft 257 + err error 258 + } 259 + 260 + func (s stubDraftResolver) ResolveDraft(ctx context.Context, event memory.MemoryEvent, existing memory.ShortTermContent) (memory.SessionDraft, error) { 261 + if s.err != nil { 262 + return memory.SessionDraft{}, s.err 263 + } 264 + return s.draft, nil 200 265 } 201 266 202 267 func mustRFC3339(t *testing.T, value string) time.Time {
+91
internal/memoryruntime/resolver.go
··· 1 + package memoryruntime 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "strings" 7 + 8 + "github.com/quailyquaily/mistermorph/internal/llmutil" 9 + "github.com/quailyquaily/mistermorph/llm" 10 + "github.com/quailyquaily/mistermorph/memory" 11 + ) 12 + 13 + const defaultFallbackSummaryMaxRunes = 1024 14 + 15 + type draftResolver struct { 16 + client llm.Client 17 + model string 18 + } 19 + 20 + type DraftResolverFactoryOptions struct { 21 + ResolveLLMRoute func(purpose string) (llmutil.ResolvedRoute, error) 22 + CreateLLMClient func(route llmutil.ResolvedRoute) (llm.Client, error) 23 + DecorateClient func(client llm.Client, route llmutil.ResolvedRoute) llm.Client 24 + } 25 + 26 + func NewConfiguredDraftResolver(opts DraftResolverFactoryOptions) (memory.DraftResolver, error) { 27 + if opts.ResolveLLMRoute == nil { 28 + return nil, fmt.Errorf("ResolveLLMRoute dependency missing") 29 + } 30 + if opts.CreateLLMClient == nil { 31 + return nil, fmt.Errorf("CreateLLMClient dependency missing") 32 + } 33 + route, err := opts.ResolveLLMRoute(llmutil.RoutePurposeMemoryDraft) 34 + if err != nil { 35 + return nil, err 36 + } 37 + client, err := opts.CreateLLMClient(route) 38 + if err != nil { 39 + return nil, err 40 + } 41 + if opts.DecorateClient != nil { 42 + client = opts.DecorateClient(client, route) 43 + } 44 + return NewDraftResolver(client, strings.TrimSpace(route.ClientConfig.Model)), nil 45 + } 46 + 47 + func NewDraftResolver(client llm.Client, model string) memory.DraftResolver { 48 + return draftResolver{ 49 + client: client, 50 + model: strings.TrimSpace(model), 51 + } 52 + } 53 + 54 + func (r draftResolver) ResolveDraft(ctx context.Context, event memory.MemoryEvent, existing memory.ShortTermContent) (memory.SessionDraft, error) { 55 + if r.client != nil && len(event.SourceHistory) > 0 { 56 + draft, err := BuildLLMDraft(ctx, DraftRequest{ 57 + Client: r.client, 58 + Model: r.model, 59 + History: event.SourceHistory, 60 + Task: event.TaskText, 61 + Output: event.FinalOutput, 62 + Existing: existing, 63 + SessionContext: event.SessionContext, 64 + }) 65 + if err != nil { 66 + return memory.SessionDraft{}, err 67 + } 68 + draft.Promote = EnforceLongTermPromotionRules(draft.Promote, nil, event.TaskText) 69 + return draft, nil 70 + } 71 + return buildFallbackDraft(event.FinalOutput), nil 72 + } 73 + 74 + func buildFallbackDraft(finalOutput string) memory.SessionDraft { 75 + item := strings.TrimSpace(finalOutput) 76 + if item == "" { 77 + return memory.SessionDraft{} 78 + } 79 + item = strings.Join(strings.Fields(item), " ") 80 + if item == "" { 81 + return memory.SessionDraft{} 82 + } 83 + runes := []rune(item) 84 + if len(runes) > defaultFallbackSummaryMaxRunes { 85 + item = strings.TrimSpace(string(runes[:defaultFallbackSummaryMaxRunes])) 86 + } 87 + if item == "" { 88 + return memory.SessionDraft{} 89 + } 90 + return memory.SessionDraft{SummaryItems: []string{item}} 91 + }
+28
internal/memoryruntime/resolver_test.go
··· 1 + package memoryruntime 2 + 3 + import ( 4 + "strings" 5 + "testing" 6 + ) 7 + 8 + func TestBuildFallbackDraft(t *testing.T) { 9 + t.Parallel() 10 + 11 + if got := buildFallbackDraft(""); len(got.SummaryItems) != 0 { 12 + t.Fatalf("empty fallback draft = %#v, want empty", got) 13 + } 14 + 15 + got := buildFallbackDraft(" hello world ") 16 + if len(got.SummaryItems) != 1 || got.SummaryItems[0] != "hello world" { 17 + t.Fatalf("summary_items = %#v, want [\"hello world\"]", got.SummaryItems) 18 + } 19 + 20 + long := strings.Repeat("a", defaultFallbackSummaryMaxRunes+8) 21 + got = buildFallbackDraft(long) 22 + if len(got.SummaryItems) != 1 { 23 + t.Fatalf("len(summary_items) = %d, want 1", len(got.SummaryItems)) 24 + } 25 + if len([]rune(got.SummaryItems[0])) != defaultFallbackSummaryMaxRunes { 26 + t.Fatalf("rune length = %d, want %d", len([]rune(got.SummaryItems[0])), defaultFallbackSummaryMaxRunes) 27 + } 28 + }
+9 -10
internal/memoryruntime/worker_test.go
··· 268 268 269 269 func baseWorkerEvent(eventID, runID, tsUTC, subjectID string) memory.MemoryEvent { 270 270 return memory.MemoryEvent{ 271 - SchemaVersion: memory.CurrentMemoryEventSchemaVersion, 272 - EventID: eventID, 273 - TaskRunID: runID, 274 - TSUTC: tsUTC, 275 - SessionID: subjectID, 276 - SubjectID: subjectID, 277 - Channel: "telegram", 278 - TaskText: "task", 279 - FinalOutput: "output", 280 - DraftSummaryItems: []string{"one"}, 271 + SchemaVersion: memory.CurrentMemoryEventSchemaVersion, 272 + EventID: eventID, 273 + TaskRunID: runID, 274 + TSUTC: tsUTC, 275 + SessionID: subjectID, 276 + SubjectID: subjectID, 277 + Channel: "telegram", 278 + TaskText: "task", 279 + FinalOutput: "output", 281 280 } 282 281 }
+24 -18
memory/event.go
··· 7 7 "strconv" 8 8 "strings" 9 9 "time" 10 + 11 + "github.com/quailyquaily/mistermorph/internal/chathistory" 10 12 ) 11 13 12 14 const ( 13 - CurrentMemoryEventSchemaVersion = 1 15 + CurrentMemoryEventSchemaVersion = 3 14 16 ) 15 17 18 + type SessionContext struct { 19 + ConversationID string `json:"conversation_id,omitempty"` 20 + ConversationType string `json:"conversation_type,omitempty"` 21 + CounterpartyID string `json:"counterparty_id,omitempty"` 22 + CounterpartyName string `json:"counterparty_name,omitempty"` 23 + CounterpartyHandle string `json:"counterparty_handle,omitempty"` 24 + CounterpartyLabel string `json:"counterparty_label,omitempty"` 25 + } 26 + 16 27 type MemoryEvent struct { 17 - SchemaVersion int `json:"schema_version"` 18 - EventID string `json:"event_id"` 19 - TaskRunID string `json:"task_run_id"` 20 - TSUTC string `json:"ts_utc"` 21 - SessionID string `json:"session_id"` 22 - SubjectID string `json:"subject_id"` 23 - Channel string `json:"channel"` 24 - Participants []MemoryParticipant `json:"participants"` 25 - TaskText string `json:"task_text"` 26 - FinalOutput string `json:"final_output"` 27 - DraftSummaryItems []string `json:"draft_summary_items"` 28 - DraftPromote PromoteDraft `json:"draft_promote"` 28 + SchemaVersion int `json:"schema_version"` 29 + EventID string `json:"event_id"` 30 + TaskRunID string `json:"task_run_id"` 31 + TSUTC string `json:"ts_utc"` 32 + SessionID string `json:"session_id"` 33 + SubjectID string `json:"subject_id"` 34 + Channel string `json:"channel"` 35 + Participants []MemoryParticipant `json:"participants"` 36 + TaskText string `json:"task_text"` 37 + FinalOutput string `json:"final_output"` 38 + SourceHistory []chathistory.ChatHistoryItem `json:"source_history,omitempty"` 39 + SessionContext SessionContext `json:"session_context,omitempty"` 29 40 } 30 41 31 42 type MemoryParticipant struct { ··· 65 76 } 66 77 for i, p := range e.Participants { 67 78 if err := validateMemoryParticipant(i, p); err != nil { 68 - return err 69 - } 70 - } 71 - for i, item := range e.DraftSummaryItems { 72 - if err := validateRequiredCanonicalString(fmt.Sprintf("draft_summary_items[%d]", i), item); err != nil { 73 79 return err 74 80 } 75 81 }
+10 -20
memory/event_test.go
··· 96 96 } 97 97 }) 98 98 99 - t.Run("invalid draft summary item empty", func(t *testing.T) { 100 - ev := baseMemoryEvent() 101 - ev.DraftSummaryItems = []string{"ok", ""} 102 - err := ev.ValidateForAppend() 103 - if err == nil || !strings.Contains(err.Error(), "draft_summary_items[1] is required") { 104 - t.Fatalf("ValidateForAppend() error = %v, want draft_summary_items[1] is required", err) 105 - } 106 - }) 107 99 } 108 100 109 101 func baseMemoryEvent() MemoryEvent { 110 102 return MemoryEvent{ 111 - SchemaVersion: CurrentMemoryEventSchemaVersion, 112 - EventID: "evt_01", 113 - TaskRunID: "run_01", 114 - TSUTC: "2026-02-28T06:15:12Z", 115 - SessionID: "tg:-1003824466118", 116 - SubjectID: "ext:telegram:28036192", 117 - Channel: "telegram", 118 - Participants: nil, 119 - TaskText: "啧啧啧", 120 - FinalOutput: "", 121 - DraftSummaryItems: []string{"short summary"}, 122 - DraftPromote: PromoteDraft{}, 103 + SchemaVersion: CurrentMemoryEventSchemaVersion, 104 + EventID: "evt_01", 105 + TaskRunID: "run_01", 106 + TSUTC: "2026-02-28T06:15:12Z", 107 + SessionID: "tg:-1003824466118", 108 + SubjectID: "ext:telegram:28036192", 109 + Channel: "telegram", 110 + Participants: nil, 111 + TaskText: "啧啧啧", 112 + FinalOutput: "", 123 113 } 124 114 }
+2 -4
memory/journal_test.go
··· 449 449 Protocol: "tg", 450 450 }, 451 451 }, 452 - TaskText: "hello", 453 - FinalOutput: "world", 454 - DraftSummaryItems: []string{"short"}, 455 - DraftPromote: PromoteDraft{}, 452 + TaskText: "hello", 453 + FinalOutput: "world", 456 454 } 457 455 }
+54 -19
memory/projector.go
··· 19 19 type ProjectorOptions struct { 20 20 CheckpointBatch int 21 21 SemanticResolver entryutil.SemanticResolver 22 + DraftResolver DraftResolver 22 23 } 23 24 24 25 type Projector struct { ··· 43 44 type shortTermBucket struct { 44 45 Target shortTermTarget 45 46 Records []JournalRecord 47 + } 48 + 49 + type DraftResolver interface { 50 + ResolveDraft(ctx context.Context, event MemoryEvent, existing ShortTermContent) (SessionDraft, error) 51 + } 52 + 53 + type projectedPromote struct { 54 + SubjectID string 55 + Promote PromoteDraft 56 + Offset JournalOffset 46 57 } 47 58 48 59 func NewProjector(manager *Manager, journal *Journal, opts ProjectorOptions) *Projector { ··· 93 104 } 94 105 result.Processed = len(records) 95 106 96 - errs := make([]error, 0, 4) 97 - errs = append(errs, p.projectShortTermBuckets(ctx, buckets, order)...) 107 + promotes, errs := p.projectShortTermBuckets(ctx, buckets, order) 98 108 99 - for _, rec := range records { 100 - if !hasDraftPromote(rec.Event.DraftPromote) { 109 + for _, promote := range promotes { 110 + if !hasDraftPromote(promote.Promote) { 101 111 continue 102 112 } 103 - if _, err := p.manager.UpdateLongTerm(rec.Event.SubjectID, rec.Event.DraftPromote); err != nil { 104 - errs = append(errs, fmt.Errorf("long-term projection failed at %s:%d: %w", rec.Offset.File, rec.Offset.Line, err)) 113 + if _, err := p.manager.UpdateLongTerm(promote.SubjectID, promote.Promote); err != nil { 114 + errs = append(errs, fmt.Errorf("long-term projection failed at %s:%d: %w", promote.Offset.File, promote.Offset.Line, err)) 105 115 } 106 116 } 107 117 ··· 126 136 }, nil 127 137 } 128 138 129 - func (p *Projector) projectShortTermBucket(ctx context.Context, bucket shortTermBucket) error { 139 + func (p *Projector) projectShortTermBucket(ctx context.Context, bucket shortTermBucket) ([]projectedPromote, error) { 130 140 _, existing, _, err := p.manager.LoadShortTerm(bucket.Target.DayUTC, bucket.Target.SessionID) 131 141 if err != nil { 132 - return fmt.Errorf("load short-term %s: %w", bucket.Target.Key, err) 142 + return nil, fmt.Errorf("load short-term %s: %w", bucket.Target.Key, err) 133 143 } 134 144 135 145 merged := existing 136 146 hasIncomingSummary := false 147 + promotes := make([]projectedPromote, 0, len(bucket.Records)) 137 148 138 149 for _, rec := range bucket.Records { 139 150 createdAt, err := memorySummaryCreatedAt(rec.Event.TSUTC) 140 151 if err != nil { 141 - return fmt.Errorf("invalid ts_utc for %s:%d: %w", rec.Offset.File, rec.Offset.Line, err) 152 + return nil, fmt.Errorf("invalid ts_utc for %s:%d: %w", rec.Offset.File, rec.Offset.Line, err) 153 + } 154 + draft, err := p.resolveDraft(ctx, rec.Event, merged) 155 + if err != nil { 156 + return nil, fmt.Errorf("resolve draft failed for %s at %s:%d: %w", bucket.Target.Key, rec.Offset.File, rec.Offset.Line, err) 142 157 } 143 - draft := SessionDraft{ 144 - SummaryItems: rec.Event.DraftSummaryItems, 158 + if hasDraftPromote(draft.Promote) { 159 + promotes = append(promotes, projectedPromote{ 160 + SubjectID: rec.Event.SubjectID, 161 + Promote: draft.Promote, 162 + Offset: rec.Offset, 163 + }) 145 164 } 146 165 beforeCount := len(merged.SummaryItems) 147 166 merged = MergeShortTerm(merged, draft, createdAt) ··· 151 170 } 152 171 153 172 if !hasIncomingSummary { 154 - return nil 173 + return promotes, nil 155 174 } 156 175 157 176 if len(existing.SummaryItems) > 0 && p.opts.SemanticResolver != nil { 158 177 deduped, err := SemanticDedupeSummaryItems(ctx, merged.SummaryItems, p.opts.SemanticResolver) 159 178 if err != nil { 160 179 last := bucket.Records[len(bucket.Records)-1] 161 - return fmt.Errorf("semantic dedupe failed for %s at %s:%d: %w", bucket.Target.Key, last.Offset.File, last.Offset.Line, err) 180 + return nil, fmt.Errorf("semantic dedupe failed for %s at %s:%d: %w", bucket.Target.Key, last.Offset.File, last.Offset.Line, err) 162 181 } 163 182 merged = NormalizeShortTermContent(ShortTermContent{SummaryItems: deduped}) 164 183 } 165 184 166 185 _, err = p.manager.WriteShortTerm(bucket.Target.DayUTC, merged, WriteMeta{SessionID: bucket.Target.SessionID}) 167 186 if err != nil { 168 - return fmt.Errorf("write short-term %s: %w", bucket.Target.Key, err) 187 + return nil, fmt.Errorf("write short-term %s: %w", bucket.Target.Key, err) 169 188 } 170 - return nil 189 + return promotes, nil 171 190 } 172 191 173 192 func (p *Projector) saveCheckpointInBatches(records []JournalRecord) error { ··· 191 210 return nil 192 211 } 193 212 194 - func (p *Projector) projectShortTermBuckets(ctx context.Context, buckets map[string]shortTermBucket, order []string) []error { 213 + func (p *Projector) projectShortTermBuckets(ctx context.Context, buckets map[string]shortTermBucket, order []string) ([]projectedPromote, []error) { 195 214 if len(order) == 0 { 196 - return nil 215 + return nil, nil 197 216 } 198 217 workers := p.currentDayBucketWorkers() 199 218 if workers > len(order) { ··· 210 229 close(jobs) 211 230 212 231 errs := make([]error, 0, 4) 232 + promotes := make([]projectedPromote, 0, len(order)) 213 233 var errsMu sync.Mutex 234 + var promotesMu sync.Mutex 214 235 var wg sync.WaitGroup 215 236 worker := func() { 216 237 defer wg.Done() 217 238 for bucket := range jobs { 218 - if err := p.projectShortTermBucket(ctx, bucket); err != nil { 239 + bucketPromotes, err := p.projectShortTermBucket(ctx, bucket) 240 + if err != nil { 219 241 errsMu.Lock() 220 242 errs = append(errs, err) 221 243 errsMu.Unlock() 244 + continue 245 + } 246 + if len(bucketPromotes) > 0 { 247 + promotesMu.Lock() 248 + promotes = append(promotes, bucketPromotes...) 249 + promotesMu.Unlock() 222 250 } 223 251 } 224 252 } ··· 228 256 go worker() 229 257 } 230 258 wg.Wait() 231 - return errs 259 + return promotes, errs 260 + } 261 + 262 + func (p *Projector) resolveDraft(ctx context.Context, event MemoryEvent, existing ShortTermContent) (SessionDraft, error) { 263 + if p.opts.DraftResolver == nil { 264 + return SessionDraft{}, nil 265 + } 266 + return p.opts.DraftResolver.ResolveDraft(ctx, event, existing) 232 267 } 233 268 234 269 func (p *Projector) currentDayBucketWorkers() int {
+117 -13
memory/projector_test.go
··· 9 9 "testing" 10 10 "time" 11 11 12 + "github.com/quailyquaily/mistermorph/internal/chathistory" 12 13 "github.com/quailyquaily/mistermorph/internal/entryutil" 13 14 ) 14 15 ··· 21 22 day := mustTimeRFC3339(t, "2026-02-28T06:00:00Z") 22 23 e1 := baseProjectorEvent("evt_1", "run_1", "2026-02-28T06:01:00Z", "tg-1001", []string{"first item"}) 23 24 e2 := baseProjectorEvent("evt_2", "run_1", "2026-02-28T06:02:00Z", "tg-1001", []string{"second item"}) 24 - e2.DraftPromote = PromoteDraft{GoalsProjects: []string{"Keep archive synced"}} 25 25 e3 := baseProjectorEvent("evt_3", "run_2", "2026-02-28T06:03:00Z", "tg-1002", []string{"another room item"}) 26 26 27 27 if _, err := j.Append(e1); err != nil { ··· 36 36 37 37 p := NewProjector(mgr, j, ProjectorOptions{ 38 38 CheckpointBatch: 2, 39 + DraftResolver: fakeDraftResolver{ 40 + byEventID: map[string]SessionDraft{ 41 + "evt_1": {SummaryItems: []string{"first item"}}, 42 + "evt_2": { 43 + SummaryItems: []string{"second item"}, 44 + Promote: PromoteDraft{GoalsProjects: []string{"Keep archive synced"}}, 45 + }, 46 + "evt_3": {SummaryItems: []string{"another room item"}}, 47 + }, 48 + }, 39 49 }) 40 50 41 51 got, err := p.ProjectOnce(context.Background(), 10) ··· 116 126 117 127 p := NewProjector(mgr, j, ProjectorOptions{ 118 128 CheckpointBatch: 2, 129 + DraftResolver: fakeDraftResolver{ 130 + byEventID: map[string]SessionDraft{ 131 + "evt_1": {SummaryItems: []string{"one"}}, 132 + "evt_2": {SummaryItems: []string{"two"}}, 133 + "evt_3": {SummaryItems: []string{"three"}}, 134 + }, 135 + }, 119 136 }) 120 137 121 138 first, err := p.ProjectOnce(context.Background(), 2) ··· 183 200 p := NewProjector(mgr, j, ProjectorOptions{ 184 201 CheckpointBatch: 10, 185 202 SemanticResolver: alwaysFailResolver{}, 203 + DraftResolver: fakeDraftResolver{ 204 + byEventID: map[string]SessionDraft{ 205 + "evt_1": {SummaryItems: []string{"alpha"}}, 206 + "evt_2": {SummaryItems: []string{"beta"}}, 207 + }, 208 + }, 186 209 }) 187 210 188 211 _, err = p.ProjectOnce(context.Background(), 10) ··· 237 260 } 238 261 239 262 p := NewProjector(mgr, j, ProjectorOptions{CheckpointBatch: 10}) 263 + p.opts.DraftResolver = fakeDraftResolver{ 264 + byEventID: map[string]SessionDraft{ 265 + "evt_1": {SummaryItems: []string{"same item"}}, 266 + "evt_2": {SummaryItems: []string{"another item"}}, 267 + }, 268 + } 240 269 if _, err := p.ProjectOnce(context.Background(), 10); err != nil { 241 270 t.Fatalf("ProjectOnce(first) error = %v", err) 242 271 } ··· 274 303 } 275 304 } 276 305 306 + func TestProjectorProjectOnce_UsesDraftResolverForRawEvents(t *testing.T) { 307 + root := t.TempDir() 308 + mgr := NewManager(root, 7) 309 + j := mgr.NewJournal(JournalOptions{MaxFileBytes: 1 << 20}) 310 + j.now = func() time.Time { return mustTimeRFC3339(t, "2026-03-01T06:00:00Z") } 311 + 312 + ev := baseProjectorEvent("evt_raw_1", "run_raw_1", "2026-03-01T06:01:00Z", "tg-raw-1", nil) 313 + ev.SessionID = "tg-raw-1" 314 + ev.SourceHistory = []chathistory.ChatHistoryItem{{ 315 + Channel: chathistory.ChannelTelegram, 316 + Kind: chathistory.KindInboundUser, 317 + Text: "hello", 318 + }} 319 + ev.SessionContext = SessionContext{ 320 + ConversationID: "123", 321 + } 322 + if _, err := j.Append(ev); err != nil { 323 + t.Fatalf("Append(raw event) error = %v", err) 324 + } 325 + 326 + p := NewProjector(mgr, j, ProjectorOptions{ 327 + CheckpointBatch: 10, 328 + DraftResolver: fakeDraftResolver{ 329 + draft: SessionDraft{ 330 + SummaryItems: []string{"resolved in projector"}, 331 + Promote: PromoteDraft{ 332 + GoalsProjects: []string{"projected goal"}, 333 + }, 334 + }, 335 + }, 336 + }) 337 + 338 + if _, err := p.ProjectOnce(context.Background(), 10); err != nil { 339 + t.Fatalf("ProjectOnce() error = %v", err) 340 + } 341 + 342 + day := mustTimeRFC3339(t, "2026-03-01T00:00:00Z") 343 + _, content, ok, err := mgr.LoadShortTerm(day, "tg-raw-1") 344 + if err != nil { 345 + t.Fatalf("LoadShortTerm() error = %v", err) 346 + } 347 + if !ok { 348 + t.Fatalf("LoadShortTerm() ok = false, want true") 349 + } 350 + if len(content.SummaryItems) != 1 || content.SummaryItems[0].Content != "resolved in projector" { 351 + t.Fatalf("short-term content = %#v, want resolved draft item", content.SummaryItems) 352 + } 353 + 354 + longPath, _ := mgr.LongTermPath("ignored") 355 + data, err := os.ReadFile(longPath) 356 + if err != nil { 357 + t.Fatalf("ReadFile(long-term) error = %v", err) 358 + } 359 + _, body, _ := ParseFrontmatter(string(data)) 360 + longContent := ParseLongTermContent(body) 361 + if len(longContent.Goals) != 1 || longContent.Goals[0].Content != "projected goal" { 362 + t.Fatalf("long-term goals = %#v, want projected goal", longContent.Goals) 363 + } 364 + } 365 + 277 366 type alwaysFailResolver struct{} 278 367 279 368 func (alwaysFailResolver) SelectDedupKeepIndices(ctx context.Context, items []entryutil.SemanticItem) ([]int, error) { 280 369 return nil, fmt.Errorf("resolver failed") 281 370 } 282 371 372 + type fakeDraftResolver struct { 373 + draft SessionDraft 374 + byEventID map[string]SessionDraft 375 + err error 376 + } 377 + 378 + func (f fakeDraftResolver) ResolveDraft(ctx context.Context, event MemoryEvent, existing ShortTermContent) (SessionDraft, error) { 379 + if f.err != nil { 380 + return SessionDraft{}, f.err 381 + } 382 + if draft, ok := f.byEventID[event.EventID]; ok { 383 + return draft, nil 384 + } 385 + return f.draft, nil 386 + } 387 + 283 388 func baseProjectorEvent(eventID, runID, tsUTC, subjectID string, summaryItems []string) MemoryEvent { 389 + finalOutput := strings.Join(summaryItems, "\n") 284 390 return MemoryEvent{ 285 - SchemaVersion: CurrentMemoryEventSchemaVersion, 286 - EventID: eventID, 287 - TaskRunID: runID, 288 - TSUTC: tsUTC, 289 - SessionID: "tg:-1000", 290 - SubjectID: subjectID, 291 - Channel: "telegram", 292 - Participants: nil, 293 - TaskText: "task", 294 - FinalOutput: "output", 295 - DraftSummaryItems: summaryItems, 296 - DraftPromote: PromoteDraft{}, 391 + SchemaVersion: CurrentMemoryEventSchemaVersion, 392 + EventID: eventID, 393 + TaskRunID: runID, 394 + TSUTC: tsUTC, 395 + SessionID: "tg:-1000", 396 + SubjectID: subjectID, 397 + Channel: "telegram", 398 + Participants: nil, 399 + TaskText: "task", 400 + FinalOutput: finalOutput, 297 401 } 298 402 } 299 403
+24 -14
memory/recovery_test.go
··· 15 15 jA := mgrA.NewJournal(JournalOptions{MaxFileBytes: 1 << 20}) 16 16 jA.now = func() time.Time { return mustTimeRFC3339(t, "2026-03-01T12:00:00Z") } 17 17 ev := MemoryEvent{ 18 - SchemaVersion: CurrentMemoryEventSchemaVersion, 19 - EventID: "evt_recovery_1", 20 - TaskRunID: "run_recovery_1", 21 - TSUTC: "2026-03-01T12:00:30Z", 22 - SessionID: "tg:-9001", 23 - SubjectID: "tg:-9001", 24 - Channel: "telegram", 25 - TaskText: "remember this", 26 - FinalOutput: "done", 27 - DraftSummaryItems: []string{"Recovery summary item"}, 28 - DraftPromote: PromoteDraft{ 29 - GoalsProjects: []string{"Recovery goal"}, 30 - }, 18 + SchemaVersion: CurrentMemoryEventSchemaVersion, 19 + EventID: "evt_recovery_1", 20 + TaskRunID: "run_recovery_1", 21 + TSUTC: "2026-03-01T12:00:30Z", 22 + SessionID: "tg:-9001", 23 + SubjectID: "tg:-9001", 24 + Channel: "telegram", 25 + TaskText: "remember this", 26 + FinalOutput: "done", 31 27 } 32 28 if _, err := jA.Append(ev); err != nil { 33 29 t.Fatalf("Append() error = %v", err) ··· 39 35 // Process B: restart and replay projection from checkpoint. 40 36 mgrB := NewManager(root, 7) 41 37 jB := mgrB.NewJournal(JournalOptions{MaxFileBytes: 1 << 20}) 42 - pB := NewProjector(mgrB, jB, ProjectorOptions{CheckpointBatch: 10}) 38 + pB := NewProjector(mgrB, jB, ProjectorOptions{ 39 + CheckpointBatch: 10, 40 + DraftResolver: recoveryDraftResolver{}, 41 + }) 43 42 defer func() { _ = jB.Close() }() 44 43 45 44 cpBefore, okBefore, err := jB.LoadCheckpoint() ··· 95 94 t.Fatalf("checkpoint line after replay = %d, want 1", cpAfter.Line) 96 95 } 97 96 } 97 + 98 + type recoveryDraftResolver struct{} 99 + 100 + func (recoveryDraftResolver) ResolveDraft(ctx context.Context, event MemoryEvent, existing ShortTermContent) (SessionDraft, error) { 101 + return SessionDraft{ 102 + SummaryItems: []string{"Recovery summary item"}, 103 + Promote: PromoteDraft{ 104 + GoalsProjects: []string{"Recovery goal"}, 105 + }, 106 + }, nil 107 + }