kaneo (minimalist kanban) fork to experiment adding a tangled integration github.com/usekaneo/kaneo
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(gitea): align fetch timeout, label api bodies, and webhook events

- Read response body before clearing the gitea fetch timer so slow streams
stay abortable; map timeout aborts to 408 after body handling
- Send { labels } JSON for issue label POST/PUT per Gitea API
- Return applied/before/after from updateTaskStatus for event publishing
- Publish task.status_changed after Gitea label/reopen/close status sync
- Per-integration try/catch in issue_labeled; richer reopen error context
- Share outbound echo helpers; replace issue_closed createdFrom skip with
lastOutboundStateSyncAt window

+228 -122
+32 -25
apps/api/src/plugins/gitea/utils/gitea-api.ts
··· 84 84 } 85 85 } 86 86 87 - let res: Response; 88 87 try { 89 - res = await fetch(url, { 88 + const res = await fetch(url, { 90 89 ...init, 91 90 signal: controller.signal, 92 91 headers: { ··· 94 93 ...init?.headers, 95 94 }, 96 95 }); 96 + 97 + const text = await res.text(); 98 + clearTimeout(timeoutId); 99 + 100 + if (!res.ok) { 101 + throw new GiteaApiError( 102 + `Gitea API error ${res.status}`, 103 + res.status, 104 + text, 105 + ); 106 + } 107 + 108 + if (res.status === 204 || text === "") { 109 + return undefined; 110 + } 111 + 112 + try { 113 + return JSON.parse(text) as T; 114 + } catch { 115 + throw new GiteaApiError( 116 + "Gitea API returned invalid JSON", 117 + res.status, 118 + text, 119 + ); 120 + } 97 121 } catch (error) { 122 + clearTimeout(timeoutId); 123 + if (error instanceof GiteaApiError) { 124 + throw error; 125 + } 98 126 if (error instanceof Error && error.name === "AbortError") { 99 127 if (timedOut) { 100 128 throw new GiteaApiError( ··· 105 133 throw error; 106 134 } 107 135 throw error; 108 - } finally { 109 - clearTimeout(timeoutId); 110 - } 111 - 112 - const text = await res.text(); 113 - if (!res.ok) { 114 - throw new GiteaApiError(`Gitea API error ${res.status}`, res.status, text); 115 - } 116 - 117 - if (res.status === 204 || text === "") { 118 - return undefined; 119 - } 120 - 121 - try { 122 - return JSON.parse(text) as T; 123 - } catch { 124 - throw new GiteaApiError( 125 - "Gitea API returned invalid JSON", 126 - res.status, 127 - text, 128 - ); 129 136 } 130 137 } 131 138 ··· 321 328 const chunk = labelIds.slice(i, i + MAX_LABELS_PER_REQUEST); 322 329 await giteaFetch<unknown>(baseUrl, accessToken, path, { 323 330 method: "POST", 324 - body: JSON.stringify(chunk), 331 + body: JSON.stringify({ labels: chunk }), 325 332 }); 326 333 } 327 334 }, ··· 338 345 `${owner(repositoryOwner, repositoryName)}/issues/${index}/labels`, 339 346 { 340 347 method: "PUT", 341 - body: JSON.stringify(labelIds), 348 + body: JSON.stringify({ labels: labelIds }), 342 349 }, 343 350 ); 344 351 },
+11
apps/api/src/plugins/gitea/utils/outbound-echo.ts
··· 1 + /** Skip webhook sync when it likely echoes our own outbound API update. */ 2 + export const OUTBOUND_STATE_ECHO_WINDOW_MS = 5000; 3 + 4 + export function parseIssueUpdatedAtMs(issue: { 5 + updated_at?: string; 6 + }): number | null { 7 + const raw = issue.updated_at; 8 + if (!raw || typeof raw !== "string") return null; 9 + const t = Date.parse(raw); 10 + return Number.isNaN(t) ? null : t; 11 + }
+31 -3
apps/api/src/plugins/gitea/webhooks/issue-closed.ts
··· 1 1 import { and, eq } from "drizzle-orm"; 2 2 import db from "../../../database"; 3 3 import { externalLinkTable, taskTable } from "../../../database/schema"; 4 + import { publishEvent } from "../../../events"; 4 5 import { updateExternalLink } from "../../github/services/link-manager"; 5 6 import { updateTaskStatus } from "../../github/services/task-service"; 6 7 import { 7 8 findAllIntegrationsByGiteaRepo, 8 9 repoOwnerLogin, 9 10 } from "../services/integration-lookup"; 11 + import { 12 + OUTBOUND_STATE_ECHO_WINDOW_MS, 13 + parseIssueUpdatedAtMs, 14 + } from "../utils/outbound-echo"; 10 15 import { resolveTargetStatus } from "../utils/resolve-column"; 11 16 import { baseUrlFromRepositoryHtmlUrl } from "../utils/webhook-repo"; 12 17 ··· 17 22 title: string; 18 23 html_url: string; 19 24 state: string; 25 + updated_at?: string; 20 26 }; 21 27 repository: { 22 28 owner: { login?: string; username?: string }; ··· 79 85 } 80 86 } 81 87 82 - if (existingMetadata.createdFrom === "kaneo") { 83 - continue; 88 + const lastOutbound = existingMetadata.lastOutboundStateSyncAt; 89 + if (typeof lastOutbound === "number" && Number.isFinite(lastOutbound)) { 90 + const eventMs = parseIssueUpdatedAtMs(issue); 91 + if ( 92 + eventMs !== null && 93 + Math.abs(eventMs - lastOutbound) <= OUTBOUND_STATE_ECHO_WINDOW_MS 94 + ) { 95 + continue; 96 + } 84 97 } 85 98 86 99 const targetStatus = await resolveTargetStatus( ··· 89 102 "done", 90 103 ); 91 104 92 - await updateTaskStatus(task.id, targetStatus); 105 + const statusResult = await updateTaskStatus(task.id, targetStatus); 106 + if ( 107 + statusResult.applied && 108 + statusResult.before.status !== statusResult.after.status 109 + ) { 110 + await publishEvent("task.status_changed", { 111 + taskId: statusResult.after.id, 112 + projectId: statusResult.after.projectId, 113 + userId: null, 114 + oldStatus: statusResult.before.status, 115 + newStatus: statusResult.after.status, 116 + title: statusResult.after.title, 117 + assigneeId: statusResult.after.userId, 118 + type: "status_changed", 119 + }); 120 + } 93 121 94 122 await updateExternalLink(externalLink.id, { 95 123 metadata: {
+108 -80
apps/api/src/plugins/gitea/webhooks/issue-labeled.ts
··· 1 1 import { eq, inArray } from "drizzle-orm"; 2 2 import db from "../../../database"; 3 3 import { labelTable, taskTable } from "../../../database/schema"; 4 + import { publishEvent } from "../../../events"; 4 5 import { findExternalLink } from "../../github/services/link-manager"; 5 6 import { updateTaskStatus } from "../../github/services/task-service"; 6 7 import { ··· 125 126 ); 126 127 127 128 for (const integration of integrations) { 128 - const existingLink = await findExternalLink( 129 - integration.id, 130 - "issue", 131 - issue.number.toString(), 132 - ); 129 + try { 130 + const existingLink = await findExternalLink( 131 + integration.id, 132 + "issue", 133 + issue.number.toString(), 134 + ); 133 135 134 - if (!existingLink) { 135 - continue; 136 - } 136 + if (!existingLink) { 137 + continue; 138 + } 137 139 138 - const priority = extractIssuePriority(issue.labels); 139 - const status = extractIssueStatus(issue.labels); 140 + const priority = extractIssuePriority(issue.labels); 141 + const status = extractIssueStatus(issue.labels); 142 + 143 + if (priority) { 144 + await db 145 + .update(taskTable) 146 + .set({ priority }) 147 + .where(eq(taskTable.id, existingLink.taskId)); 148 + } 140 149 141 - if (priority) { 142 - await db 143 - .update(taskTable) 144 - .set({ priority }) 145 - .where(eq(taskTable.id, existingLink.taskId)); 146 - } 150 + if (status) { 151 + const statusResult = await updateTaskStatus( 152 + existingLink.taskId, 153 + status, 154 + ); 155 + if ( 156 + statusResult.applied && 157 + statusResult.before.status !== statusResult.after.status 158 + ) { 159 + await publishEvent("task.status_changed", { 160 + taskId: statusResult.after.id, 161 + projectId: statusResult.after.projectId, 162 + userId: null, 163 + oldStatus: statusResult.before.status, 164 + newStatus: statusResult.after.status, 165 + title: statusResult.after.title, 166 + assigneeId: statusResult.after.userId, 167 + type: "status_changed", 168 + }); 169 + } 170 + } 147 171 148 - if (status) { 149 - await updateTaskStatus(existingLink.taskId, status); 150 - } 172 + if (payload.action === "label_updated") { 173 + if (issue.labels === undefined) { 174 + continue; 175 + } 151 176 152 - if (payload.action === "label_updated") { 153 - if (issue.labels === undefined) { 177 + const task = await db.query.taskTable.findFirst({ 178 + where: eq(taskTable.id, existingLink.taskId), 179 + with: { 180 + project: true, 181 + }, 182 + }); 183 + if (task?.project?.workspaceId) { 184 + await syncGiteaLabelsToTask( 185 + existingLink.taskId, 186 + task.project.workspaceId, 187 + giteaLabelsForSync(issue.labels), 188 + ); 189 + } 154 190 continue; 155 191 } 156 192 157 - const task = await db.query.taskTable.findFirst({ 158 - where: eq(taskTable.id, existingLink.taskId), 159 - with: { 160 - project: true, 161 - }, 162 - }); 163 - if (task?.project?.workspaceId) { 164 - await syncGiteaLabelsToTask( 165 - existingLink.taskId, 166 - task.project.workspaceId, 167 - giteaLabelsForSync(issue.labels), 168 - ); 193 + if (!addedLabel) { 194 + continue; 169 195 } 170 - continue; 171 - } 172 196 173 - if (!addedLabel) { 174 - continue; 175 - } 197 + if (isSystemLabelName(addedLabel.name)) { 198 + continue; 199 + } 176 200 177 - if (isSystemLabelName(addedLabel.name)) { 178 - continue; 179 - } 201 + if (payload.action === "labeled") { 202 + const task = await db.query.taskTable.findFirst({ 203 + where: eq(taskTable.id, existingLink.taskId), 204 + with: { 205 + project: true, 206 + }, 207 + }); 180 208 181 - if (payload.action === "labeled") { 182 - const task = await db.query.taskTable.findFirst({ 183 - where: eq(taskTable.id, existingLink.taskId), 184 - with: { 185 - project: true, 186 - }, 187 - }); 209 + if (task?.project?.workspaceId) { 210 + const existingLabel = await db.query.labelTable.findFirst({ 211 + where: (table, { and, eq: e }) => 212 + and( 213 + e(table.workspaceId, task.project.workspaceId), 214 + e(table.name, addedLabel.name), 215 + e(table.taskId, task.id), 216 + ), 217 + }); 188 218 189 - if (task?.project?.workspaceId) { 190 - const existingLabel = await db.query.labelTable.findFirst({ 219 + if (!existingLabel) { 220 + const color = addedLabel.color 221 + ? `#${addedLabel.color.replace(/^#/, "")}` 222 + : "#6B7280"; 223 + await db 224 + .insert(labelTable) 225 + .values({ 226 + name: addedLabel.name, 227 + color, 228 + taskId: task.id, 229 + workspaceId: task.project.workspaceId, 230 + }) 231 + .onConflictDoNothing({ 232 + target: [labelTable.taskId, labelTable.name], 233 + }); 234 + } 235 + } 236 + } 237 + 238 + if (payload.action === "unlabeled") { 239 + const labelsToDelete = await db.query.labelTable.findMany({ 191 240 where: (table, { and, eq: e }) => 192 241 and( 193 - e(table.workspaceId, task.project.workspaceId), 242 + e(table.taskId, existingLink.taskId), 194 243 e(table.name, addedLabel.name), 195 - e(table.taskId, task.id), 196 244 ), 197 245 }); 198 246 199 - if (!existingLabel) { 200 - const color = addedLabel.color 201 - ? `#${addedLabel.color.replace(/^#/, "")}` 202 - : "#6B7280"; 203 - await db 204 - .insert(labelTable) 205 - .values({ 206 - name: addedLabel.name, 207 - color, 208 - taskId: task.id, 209 - workspaceId: task.project.workspaceId, 210 - }) 211 - .onConflictDoNothing({ 212 - target: [labelTable.taskId, labelTable.name], 213 - }); 247 + for (const label of labelsToDelete) { 248 + await db.delete(labelTable).where(eq(labelTable.id, label.id)); 214 249 } 215 250 } 216 - } 217 - 218 - if (payload.action === "unlabeled") { 219 - const labelsToDelete = await db.query.labelTable.findMany({ 220 - where: (table, { and, eq: e }) => 221 - and( 222 - e(table.taskId, existingLink.taskId), 223 - e(table.name, addedLabel.name), 224 - ), 251 + } catch (error) { 252 + console.error("Gitea issue_labeled handler failed for integration", { 253 + integrationId: integration.id, 254 + issueNumber: issue.number, 255 + repository: `${owner}/${repository.name}`, 256 + error, 225 257 }); 226 - 227 - for (const label of labelsToDelete) { 228 - await db.delete(labelTable).where(eq(labelTable.id, label.id)); 229 - } 230 258 } 231 259 } 232 260 }
+23 -11
apps/api/src/plugins/gitea/webhooks/issue-reopened.ts
··· 1 1 import { and, eq } from "drizzle-orm"; 2 2 import db from "../../../database"; 3 3 import { externalLinkTable, taskTable } from "../../../database/schema"; 4 + import { publishEvent } from "../../../events"; 4 5 import { updateExternalLink } from "../../github/services/link-manager"; 5 6 import { updateTaskStatus } from "../../github/services/task-service"; 6 7 import { 7 8 findAllIntegrationsByGiteaRepo, 8 9 repoOwnerLogin, 9 10 } from "../services/integration-lookup"; 11 + import { 12 + OUTBOUND_STATE_ECHO_WINDOW_MS, 13 + parseIssueUpdatedAtMs, 14 + } from "../utils/outbound-echo"; 10 15 import { resolveTargetStatus } from "../utils/resolve-column"; 11 16 import { baseUrlFromRepositoryHtmlUrl } from "../utils/webhook-repo"; 12 - 13 - /** Skip reopen sync when it likely echoes our own outbound state update (webhook vs API). */ 14 - const OUTBOUND_STATE_ECHO_WINDOW_MS = 5000; 15 - 16 - function parseIssueUpdatedAtMs(issue: { updated_at?: string }): number | null { 17 - const raw = issue.updated_at; 18 - if (!raw || typeof raw !== "string") return null; 19 - const t = Date.parse(raw); 20 - return Number.isNaN(t) ? null : t; 21 - } 22 17 23 18 type IssueReopenedPayload = { 24 19 action: string; ··· 108 103 "to-do", 109 104 ); 110 105 111 - await updateTaskStatus(task.id, targetStatus); 106 + const statusResult = await updateTaskStatus(task.id, targetStatus); 107 + if ( 108 + statusResult.applied && 109 + statusResult.before.status !== statusResult.after.status 110 + ) { 111 + await publishEvent("task.status_changed", { 112 + taskId: statusResult.after.id, 113 + projectId: statusResult.after.projectId, 114 + userId: null, 115 + oldStatus: statusResult.before.status, 116 + newStatus: statusResult.after.status, 117 + title: statusResult.after.title, 118 + assigneeId: statusResult.after.userId, 119 + type: "status_changed", 120 + }); 121 + } 112 122 113 123 await updateExternalLink(externalLink.id, { 114 124 metadata: { ··· 119 129 } catch (error) { 120 130 console.error("Gitea issue_reopened handler failed for integration", { 121 131 integrationId: integration.id, 132 + issueNumber: issue.number, 133 + repository: `${owner}/${repository.name}`, 122 134 error, 123 135 }); 124 136 }
+23 -3
apps/api/src/plugins/github/services/task-service.ts
··· 1 + import type { InferSelectModel } from "drizzle-orm"; 1 2 import { and, eq } from "drizzle-orm"; 2 3 import db from "../../../database"; 3 4 import { ··· 5 6 integrationTable, 6 7 taskTable, 7 8 } from "../../../database/schema"; 9 + 10 + export type TaskRow = InferSelectModel<typeof taskTable>; 11 + 12 + export type UpdateTaskStatusResult = 13 + | { applied: false } 14 + | { applied: true; before: TaskRow; after: TaskRow }; 8 15 9 16 const NON_COLUMN_STATUSES = new Set(["planned", "archived"]); 10 17 ··· 23 30 }); 24 31 } 25 32 26 - export async function updateTaskStatus(taskId: string, newStatus: string) { 33 + export async function updateTaskStatus( 34 + taskId: string, 35 + newStatus: string, 36 + ): Promise<UpdateTaskStatusResult> { 27 37 const task = await db.query.taskTable.findFirst({ 28 38 where: eq(taskTable.id, taskId), 29 39 }); 30 40 31 41 if (!task) { 32 - return; 42 + return { applied: false }; 33 43 } 34 44 35 45 let columnId: string | null = null; ··· 47 57 console.warn( 48 58 `[GitHub] Skipping status update for task ${taskId}: column "${newStatus}" not found in project ${task.projectId}`, 49 59 ); 50 - return; 60 + return { applied: false }; 51 61 } 52 62 53 63 await db 54 64 .update(taskTable) 55 65 .set({ status: newStatus, columnId }) 56 66 .where(eq(taskTable.id, taskId)); 67 + 68 + const after = await db.query.taskTable.findFirst({ 69 + where: eq(taskTable.id, taskId), 70 + }); 71 + 72 + if (!after) { 73 + return { applied: false }; 74 + } 75 + 76 + return { applied: true, before: task, after }; 57 77 } 58 78 59 79 export async function isTaskInFinalState(task: {