Unified Agent + reusable Go agent core.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: align telegram heartbeat task injection and task tracking

Lyric 2c289635 7faa8a0e

+84 -13
+12 -2
docs/prompt.md
··· 384 384 385 385 3. Telegram normal chat run (default path) 386 386 387 - - File: `cmd/mistermorph/telegramcmd/command.go` 387 + - File: `internal/channelruntime/telegram/runtime_task.go` 388 388 - Default payload when `job.Meta == nil`: 389 389 390 390 ```json ··· 397 397 } 398 398 ``` 399 399 400 + - Message injection behavior: 401 + - Non-heartbeat Telegram runs set `SkipTaskMessage=true` to avoid duplicating the same inbound text. 402 + - The current inbound text is already included via `llmHistory` (`historyWithCurrent`). 403 + 400 404 4. Telegram scheduled heartbeat 401 405 402 - - File: `cmd/mistermorph/telegramcmd/command.go` 406 + - Files: 407 + - `internal/channelruntime/telegram/runtime.go` (heartbeat job creation + meta) 408 + - `internal/channelruntime/telegram/runtime_task.go` (RunOptions wiring) 403 409 - Heartbeat worker payload from `buildHeartbeatMeta(...)`: 404 410 405 411 ```json ··· 421 427 } 422 428 } 423 429 ``` 430 + 431 + - Message injection behavior: 432 + - Heartbeat Telegram runs set `SkipTaskMessage=false`. 433 + - This ensures `job.Text` (heartbeat checklist task, usually from `HEARTBEAT.md`) is appended as a user task message and reaches the model. 424 434 425 435 5. MAEP inbound auto-reply run 426 436
+17 -1
internal/channelruntime/telegram/run_test.go
··· 1 1 package telegram 2 2 3 - import "testing" 3 + import ( 4 + "testing" 5 + "time" 6 + ) 4 7 5 8 func TestNormalizeAllowedChatIDs(t *testing.T) { 6 9 got := normalizeAllowedChatIDs([]int64{1, 0, 2, 1}) ··· 21 24 t.Fatalf("got = %#v, want [/ip4/1 /ip4/2]", got) 22 25 } 23 26 } 27 + 28 + func TestTelegramHeartbeatTaskID(t *testing.T) { 29 + t1 := time.Unix(0, 1700000000000000000).UTC() 30 + t2 := t1.Add(time.Second) 31 + id1 := telegramHeartbeatTaskID(0, t1) 32 + id2 := telegramHeartbeatTaskID(0, t2) 33 + if id1 == "" || id2 == "" { 34 + t.Fatalf("heartbeat task id should not be empty: %q %q", id1, id2) 35 + } 36 + if id1 == id2 { 37 + t.Fatalf("heartbeat task ids should differ across schedules: %q %q", id1, id2) 38 + } 39 + }
+40 -9
internal/channelruntime/telegram/runtime.go
··· 511 511 if !job.IsHeartbeat { 512 512 typingStop = startTypingTicker(workerCtx, api, chatID, "typing", 4*time.Second) 513 513 defer typingStop() 514 - if daemonStore != nil && strings.TrimSpace(job.TaskID) != "" { 515 - startedAt := time.Now().UTC() 516 - daemonStore.Update(job.TaskID, func(rec *daemonruntime.TaskInfo) { 517 - rec.Status = daemonruntime.TaskRunning 518 - rec.StartedAt = &startedAt 519 - }) 520 - } 514 + } 515 + if daemonStore != nil && strings.TrimSpace(job.TaskID) != "" { 516 + startedAt := time.Now().UTC() 517 + daemonStore.Update(job.TaskID, func(rec *daemonruntime.TaskInfo) { 518 + rec.Status = daemonruntime.TaskRunning 519 + rec.StartedAt = &startedAt 520 + }) 521 521 } 522 522 523 523 runCtx, cancel := context.WithTimeout(workerCtx, taskTimeout) ··· 529 529 return 530 530 } 531 531 displayErr := formatRuntimeError(runErr) 532 - if !job.IsHeartbeat && daemonStore != nil && strings.TrimSpace(job.TaskID) != "" { 532 + if daemonStore != nil && strings.TrimSpace(job.TaskID) != "" { 533 533 finishedAt := time.Now().UTC() 534 534 failedStatus := daemonruntime.TaskFailed 535 535 if isTaskContextCanceled(runErr) { ··· 576 576 577 577 outText := formatFinalOutput(final) 578 578 publishText := shouldPublishTelegramText(final) 579 - if !job.IsHeartbeat && daemonStore != nil && strings.TrimSpace(job.TaskID) != "" { 579 + if daemonStore != nil && strings.TrimSpace(job.TaskID) != "" { 580 580 finishedAt := time.Now().UTC() 581 581 summary := daemonruntime.TruncateUTF8(outText, 4000) 582 582 daemonStore.Update(job.TaskID, func(rec *daemonruntime.TaskInfo) { ··· 796 796 extra["last_success_utc"] = lastSuccess.UTC().Format(time.RFC3339) 797 797 } 798 798 meta := buildHeartbeatMeta(d, "telegram", hbInterval, hbChecklist, checklistEmpty, extra) 799 + heartbeatRunAt := time.Now().UTC() 800 + heartbeatTaskID := telegramHeartbeatTaskID(chatID, heartbeatRunAt) 799 801 job := telegramJob{ 802 + TaskID: heartbeatTaskID, 800 803 ChatID: chatID, 801 804 ChatType: chatType, 805 + SentAt: heartbeatRunAt, 802 806 FromUserID: fromUserID, 803 807 FromUsername: fromUsername, 804 808 FromFirstName: fromFirst, ··· 809 813 IsHeartbeat: true, 810 814 Meta: meta, 811 815 MentionUsers: mentionUsers, 816 + } 817 + if daemonStore != nil && strings.TrimSpace(job.TaskID) != "" { 818 + daemonStore.Upsert(daemonruntime.TaskInfo{ 819 + ID: job.TaskID, 820 + Status: daemonruntime.TaskQueued, 821 + Task: daemonruntime.TruncateUTF8(task, 2000), 822 + Model: strings.TrimSpace(model), 823 + Timeout: taskTimeout.String(), 824 + CreatedAt: heartbeatRunAt, 825 + Result: map[string]any{ 826 + "source": "telegram", 827 + "trigger": "heartbeat", 828 + "telegram_chat_id": chatID, 829 + "telegram_chat_type": chatType, 830 + "telegram_from_user_id": fromUserID, 831 + "telegram_from_username": fromUsername, 832 + "telegram_from_name": fromName, 833 + "mention_users": append([]string(nil), mentionUsers...), 834 + }, 835 + }) 812 836 } 813 837 select { 814 838 case w.Jobs <- job: ··· 1393 1417 1394 1418 func telegramTaskID(chatID int64, messageID int64) string { 1395 1419 return daemonruntime.BuildTaskID("tg", chatID, messageID) 1420 + } 1421 + 1422 + func telegramHeartbeatTaskID(chatID int64, scheduledAt time.Time) string { 1423 + if scheduledAt.IsZero() { 1424 + scheduledAt = time.Now().UTC() 1425 + } 1426 + return daemonruntime.BuildTaskID("tg_hb", chatID, scheduledAt.UnixNano()) 1396 1427 } 1397 1428 1398 1429 func isTaskContextCanceled(err error) bool {
+6 -1
internal/channelruntime/telegram/runtime_task.go
··· 184 184 Model: model, 185 185 History: llmHistory, 186 186 Meta: meta, 187 - SkipTaskMessage: true, 187 + SkipTaskMessage: shouldSkipTaskMessage(job), 188 188 }) 189 189 if err != nil { 190 190 return final, agentCtx, loadedSkills, nil, err ··· 234 234 return false 235 235 } 236 236 return strings.TrimSpace(longTermSubjectID) != "" 237 + } 238 + 239 + func shouldSkipTaskMessage(job telegramJob) bool { 240 + // Non-heartbeat runs already inject the current inbound text via llmHistory. 241 + return !job.IsHeartbeat 237 242 } 238 243 239 244 func buildTelegramRegistry(baseReg *tools.Registry, chatType string) *tools.Registry {
+9
internal/channelruntime/telegram/runtime_task_test.go
··· 67 67 t.Fatalf("disabled identity subject = %q, want empty", got) 68 68 } 69 69 } 70 + 71 + func TestShouldSkipTaskMessage(t *testing.T) { 72 + if got := shouldSkipTaskMessage(telegramJob{IsHeartbeat: true}); got { 73 + t.Fatalf("heartbeat should not skip task message") 74 + } 75 + if got := shouldSkipTaskMessage(telegramJob{IsHeartbeat: false}); !got { 76 + t.Fatalf("non-heartbeat should skip task message") 77 + } 78 + }