Openstatus www.openstatus.dev
6
fork

Configure Feed

Select the types of activity you want to include in your feed.

🔥 improve tcp (#1026)

* 🔥 improve tcp

* ci: apply automated fixes

* 🧪

* 🧪

* ci: apply automated fixes

* 📝 update doc

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>

authored by

Thibault Le Ouay
autofix-ci[bot]
and committed by
GitHub
6854555f ebbc0920

+173 -66
+1 -1
apps/checker/handlers/checker.go
··· 281 281 log.Ctx(ctx).Error().Err(err).Msg("failed to send event to tinybird") 282 282 } 283 283 284 - if req.Status == "active" { 284 + if req.Status != "error" { 285 285 checker.UpdateStatus(ctx, checker.UpdateData{ 286 286 MonitorId: req.MonitorID, 287 287 Status: "error",
+103 -31
apps/checker/handlers/tcp.go
··· 1 1 package handlers 2 2 3 3 import ( 4 + "encoding/json" 4 5 "fmt" 5 6 "net/http" 6 7 "strconv" ··· 22 23 Timing checker.TCPResponseTiming `json:"timing"` 23 24 } 24 25 26 + // Only used for Tinybird 27 + type TCPData struct { 28 + Timing string `json:"timing"` 29 + ErrorMessage string `json:"error"` 30 + Region string `json:"region"` 31 + 32 + RequestId int64 `json:"requestId,omitempty"` 33 + WorkspaceID int64 `json:"workspaceId"` 34 + MonitorID int64 `json:"monitorId"` 35 + Timestamp int64 `json:"timestamp"` 36 + Latency int64 `json:"latency"` 37 + 38 + Error uint8 `json:"errorMessage"` 39 + } 40 + 25 41 func (h Handler) TCPHandler(c *gin.Context) { 26 42 ctx := c.Request.Context() 27 43 dataSourceName := "tcp_response__v0" ··· 50 66 51 67 return 52 68 } 69 + 53 70 workspaceId, err := strconv.ParseInt(req.WorkspaceID, 10, 64) 54 71 55 72 if err != nil { ··· 57 74 58 75 return 59 76 } 77 + 60 78 monitorId, err := strconv.ParseInt(req.MonitorID, 10, 64) 61 79 62 80 if err != nil { ··· 66 84 } 67 85 68 86 var called int 87 + 69 88 op := func() error { 70 89 called++ 71 90 res, err := checker.PingTcp(int(req.Timeout), req.URL) ··· 74 93 return fmt.Errorf("unable to check tcp %s", err) 75 94 } 76 95 77 - r := TCPResponse{ 78 - WorkspaceID: workspaceId, 79 - Timestamp: req.CronTimestamp, 80 - Timing: checker.TCPResponseTiming{ 81 - TCPStart: res.TCPStart, 82 - TCPDone: res.TCPDone, 83 - }, 84 - Region: h.Region, 85 - MonitorID: monitorId, 96 + timingAsString, err := json.Marshal(res) 97 + if err != nil { 98 + return fmt.Errorf("error while parsing timing data %s: %w", req.URL, err) 86 99 } 100 + 87 101 latency := res.TCPDone - res.TCPStart 88 102 103 + data := TCPData{ 104 + WorkspaceID: workspaceId, 105 + Timestamp: req.CronTimestamp, 106 + Error: 0, 107 + ErrorMessage: "", 108 + Region: h.Region, 109 + MonitorID: monitorId, 110 + Timing: string(timingAsString), 111 + Latency: latency, 112 + } 113 + 89 114 if req.Status == "active" && req.DegradedAfter > 0 && latency > req.DegradedAfter { 90 115 checker.UpdateStatus(ctx, checker.UpdateData{ 91 116 MonitorId: req.MonitorID, ··· 105 130 } 106 131 107 132 if req.Status == "error" { 108 - checker.UpdateStatus(ctx, checker.UpdateData{ 109 - MonitorId: req.MonitorID, 110 - Status: "active", 111 - Region: h.Region, 112 - CronTimestamp: req.CronTimestamp, 113 - }) 133 + if req.DegradedAfter == 0 || (req.DegradedAfter > 0 && latency < req.DegradedAfter) { 134 + checker.UpdateStatus(ctx, checker.UpdateData{ 135 + MonitorId: req.MonitorID, 136 + Status: "active", 137 + Region: h.Region, 138 + CronTimestamp: req.CronTimestamp, 139 + }) 140 + } 141 + 142 + if req.DegradedAfter > 0 && latency > req.DegradedAfter { 143 + checker.UpdateStatus(ctx, checker.UpdateData{ 144 + MonitorId: req.MonitorID, 145 + Status: "degraded", 146 + Region: h.Region, 147 + CronTimestamp: req.CronTimestamp, 148 + }) 149 + } 150 + 114 151 } 115 152 116 - if err := h.TbClient.SendEvent(ctx, r, dataSourceName); err != nil { 153 + if err := h.TbClient.SendEvent(ctx, data, dataSourceName); err != nil { 117 154 log.Ctx(ctx).Error().Err(err).Msg("failed to send event to tinybird") 118 155 } 119 156 ··· 121 158 } 122 159 123 160 if err := backoff.Retry(op, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 3)); err != nil { 124 - if err := h.TbClient.SendEvent(ctx, TCPResponse{ 125 - WorkspaceID: workspaceId, 126 - Timestamp: req.CronTimestamp, 127 - Error: err.Error(), 128 - Region: h.Region, 129 - MonitorID: monitorId, 161 + if err := h.TbClient.SendEvent(ctx, TCPData{ 162 + WorkspaceID: workspaceId, 163 + Timestamp: req.CronTimestamp, 164 + ErrorMessage: err.Error(), 165 + Region: h.Region, 166 + MonitorID: monitorId, 167 + Error: 1, 130 168 }, dataSourceName); err != nil { 131 169 log.Ctx(ctx).Error().Err(err).Msg("failed to send event to tinybird") 132 170 } 133 171 134 - if req.Status == "active" { 172 + if req.Status != "error" { 135 173 checker.UpdateStatus(ctx, checker.UpdateData{ 136 174 MonitorId: req.MonitorID, 137 175 Status: "error", ··· 152 190 region := c.Param("region") 153 191 if region == "" { 154 192 c.String(http.StatusBadRequest, "region is required") 193 + 155 194 return 156 195 } 157 196 158 197 if c.GetHeader("Authorization") != fmt.Sprintf("Basic %s", h.Secret) { 159 198 c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"}) 199 + 160 200 return 161 201 } 162 202 ··· 166 206 if region != "" && region != h.Region { 167 207 c.Header("fly-replay", fmt.Sprintf("region=%s", region)) 168 208 c.String(http.StatusAccepted, "Forwarding request to %s", region) 209 + 169 210 return 170 211 } 171 212 } 213 + 172 214 var req request.TCPCheckerRequest 215 + 173 216 if err := c.ShouldBindJSON(&req); err != nil { 174 217 log.Ctx(ctx).Error().Err(err).Msg("failed to decode checker request") 175 218 c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"}) 219 + 176 220 return 177 221 } 178 222 179 223 workspaceId, err := strconv.ParseInt(req.WorkspaceID, 10, 64) 180 224 if err != nil { 181 225 c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"}) 226 + 182 227 return 183 228 } 229 + 184 230 monitorId, err := strconv.ParseInt(req.MonitorID, 10, 64) 185 231 if err != nil { 186 232 c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"}) 233 + 187 234 return 188 235 } 189 236 190 237 var called int 238 + 191 239 var response TCPResponse 240 + 192 241 op := func() error { 193 242 called++ 194 243 res, err := checker.PingTcp(int(req.Timeout), req.URL) 244 + 195 245 if err != nil { 196 246 return fmt.Errorf("unable to check tcp %s", err) 197 247 } ··· 207 257 MonitorID: monitorId, 208 258 } 209 259 260 + timingAsString, err := json.Marshal(res) 261 + if err != nil { 262 + return fmt.Errorf("error while parsing timing data %s: %w", req.URL, err) 263 + } 264 + 265 + latency := res.TCPDone - res.TCPStart 266 + 267 + data := TCPData{ 268 + WorkspaceID: workspaceId, 269 + Timestamp: req.CronTimestamp, 270 + Error: 0, 271 + ErrorMessage: "", 272 + Region: h.Region, 273 + MonitorID: monitorId, 274 + Timing: string(timingAsString), 275 + Latency: latency, 276 + RequestId: req.RequestId, 277 + } 278 + 210 279 if req.RequestId != 0 { 211 - if err := h.TbClient.SendEvent(ctx, response, dataSourceName); err != nil { 280 + if err := h.TbClient.SendEvent(ctx, data, dataSourceName); err != nil { 212 281 log.Ctx(ctx).Error().Err(err).Msg("failed to send event to tinybird") 213 282 } 214 283 } ··· 216 285 return nil 217 286 } 218 287 219 - if err := backoff.Retry(op, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 3)); err != nil { 220 - if err := h.TbClient.SendEvent(ctx, TCPResponse{ 221 - WorkspaceID: workspaceId, 222 - Timestamp: req.CronTimestamp, 223 - Error: err.Error(), 224 - Region: h.Region, 225 - MonitorID: monitorId, 288 + if err := backoff.Retry(op, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 3)); err != nil && req.RequestId != 0 { 289 + if err := h.TbClient.SendEvent(ctx, TCPData{ 290 + WorkspaceID: workspaceId, 291 + Timestamp: req.CronTimestamp, 292 + ErrorMessage: err.Error(), 293 + Region: h.Region, 294 + MonitorID: monitorId, 295 + Error: 1, 296 + RequestId: req.RequestId, 226 297 }, dataSourceName); err != nil { 227 298 log.Ctx(ctx).Error().Err(err).Msg("failed to send event to tinybird") 228 299 } 229 300 } 301 + 230 302 c.JSON(http.StatusOK, response) 231 303 }
+4
apps/docs/api-reference/check/http/post-http.mdx
··· 1 + --- 2 + title: Create a Check run for a HTTP request 3 + openapi: post /check/http 4 + ---
-4
apps/docs/api-reference/check/post-check.mdx
··· 1 - --- 2 - title: Create a Check run 3 - openapi: post /check 4 - ---
+2 -2
apps/server/src/v1/check/index.ts
··· 3 3 import type { Variables } from "../index"; 4 4 5 5 import { handleZodError } from "../../libs/errors"; 6 - import { registerPostCheck } from "./post"; 6 + import { registerHTTPPostCheck } from "./http/post"; 7 7 8 8 const checkAPI = new OpenAPIHono<{ Variables: Variables }>({ 9 9 defaultHook: handleZodError, 10 10 }); 11 11 12 - registerPostCheck(checkAPI); 12 + registerHTTPPostCheck(checkAPI); 13 13 14 14 export { checkAPI };
+2 -2
apps/server/src/v1/check/post.test.ts apps/server/src/v1/check/http/post.test.ts
··· 1 1 import { expect, test } from "bun:test"; 2 2 3 - import { api } from "../index"; 3 + import { api } from "../../index"; 4 4 5 5 import { afterEach, mock } from "bun:test"; 6 6 ··· 30 30 ), 31 31 ); 32 32 33 - const res = await api.request("/check", { 33 + const res = await api.request("/check/http", { 34 34 method: "POST", 35 35 headers: { 36 36 "x-openstatus-key": "1",
+5 -5
apps/server/src/v1/check/post.ts apps/server/src/v1/check/http/post.ts
··· 3 3 import { db } from "@openstatus/db"; 4 4 import { check } from "@openstatus/db/src/schema/check"; 5 5 import percentile from "percentile"; 6 - import { env } from "../../env"; 7 - import { openApiErrorResponses } from "../../libs/errors/openapi-error-responses"; 8 - import type { checkAPI } from "./index"; 6 + import { env } from "../../../env"; 7 + import { openApiErrorResponses } from "../../../libs/errors/openapi-error-responses"; 8 + import type { checkAPI } from "../index"; 9 9 import { 10 10 AggregatedResponseSchema, 11 11 AggregatedResult, ··· 18 18 method: "post", 19 19 tags: ["page"], 20 20 description: "Run a single check", 21 - path: "/", 21 + path: "/http", 22 22 request: { 23 23 body: { 24 24 description: "The run request to create", ··· 42 42 }, 43 43 }); 44 44 45 - export function registerPostCheck(api: typeof checkAPI) { 45 + export function registerHTTPPostCheck(api: typeof checkAPI) { 46 46 return api.openapi(postRoute, async (c) => { 47 47 const data = c.req.valid("json"); 48 48 const workspaceId = Number(c.get("workspaceId"));
+1 -1
apps/server/src/v1/check/schema.ts apps/server/src/v1/check/http/schema.ts
··· 1 1 import { z } from "zod"; 2 - import { MonitorSchema } from "../monitors/schema"; 2 + import { MonitorSchema } from "../../monitors/schema"; 3 3 4 4 export const CheckSchema = MonitorSchema.pick({ 5 5 url: true,
+37 -14
apps/web/src/app/api/checker/cron/_cron.ts
··· 15 15 } from "@openstatus/db/src/schema"; 16 16 17 17 import { env } from "@/env"; 18 - import type { payloadSchema } from "../schema"; 18 + import type { httpPayloadSchema, tpcPayloadSchema } from "../schema"; 19 19 20 20 const periodicityAvailable = selectMonitorSchema.pick({ periodicity: true }); 21 21 ··· 156 156 status: MonitorStatus; 157 157 region: string; 158 158 }) => { 159 - const payload: z.infer<typeof payloadSchema> = { 160 - workspaceId: String(row.workspaceId), 161 - monitorId: String(row.id), 162 - url: row.url, 163 - method: row.method || "GET", 164 - cronTimestamp: timestamp, 165 - body: row.body, 166 - headers: row.headers, 167 - status: status, 168 - assertions: row.assertions ? JSON.parse(row.assertions) : null, 169 - degradedAfter: row.degradedAfter, 170 - timeout: row.timeout, 171 - }; 159 + let payload: 160 + | z.infer<typeof httpPayloadSchema> 161 + | z.infer<typeof tpcPayloadSchema> 162 + | null = null; 163 + // 164 + if (row.jobType === "http") { 165 + payload = { 166 + workspaceId: String(row.workspaceId), 167 + monitorId: String(row.id), 168 + url: row.url, 169 + method: row.method || "GET", 170 + cronTimestamp: timestamp, 171 + body: row.body, 172 + headers: row.headers, 173 + status: status, 174 + assertions: row.assertions ? JSON.parse(row.assertions) : null, 175 + degradedAfter: row.degradedAfter, 176 + timeout: row.timeout, 177 + }; 178 + } 179 + if (row.jobType === "tcp") { 180 + payload = { 181 + workspaceId: String(row.workspaceId), 182 + monitorId: String(row.id), 183 + url: row.url, 184 + status: status, 185 + assertions: row.assertions ? JSON.parse(row.assertions) : null, 186 + cronTimestamp: timestamp, 187 + degradedAfter: row.degradedAfter, 188 + timeout: row.timeout, 189 + }; 190 + } 191 + 192 + if (!payload) { 193 + throw new Error("Invalid jobType"); 194 + } 172 195 173 196 const newTask: google.cloud.tasks.v2beta3.ITask = { 174 197 httpRequest: {
+13 -2
apps/web/src/app/api/checker/schema.ts
··· 3 3 import { base } from "@openstatus/assertions"; 4 4 import { monitorMethods, monitorStatus } from "@openstatus/db/src/schema"; 5 5 6 - export const payloadSchema = z.object({ 6 + export const httpPayloadSchema = z.object({ 7 7 workspaceId: z.string(), 8 8 monitorId: z.string(), 9 9 method: z.enum(monitorMethods), ··· 17 17 degradedAfter: z.number().nullable(), 18 18 }); 19 19 20 - export type Payload = z.infer<typeof payloadSchema>; 20 + export type HttpPayload = z.infer<typeof httpPayloadSchema>; 21 + 22 + export const tpcPayloadSchema = z.object({ 23 + status: z.enum(monitorStatus), 24 + workspaceId: z.string(), 25 + url: z.string(), 26 + monitorId: z.string(), 27 + assertions: z.array(base).nullable(), 28 + cronTimestamp: z.number(), 29 + timeout: z.number().default(45000), 30 + degradedAfter: z.number().nullable(), 31 + });
+2 -2
apps/web/src/app/api/checker/test/http/route.ts
··· 4 4 import { monitorFlyRegionSchema } from "@openstatus/db/src/schema/constants"; 5 5 6 6 import { checkRegion } from "@/components/ping-response-analysis/utils"; 7 - import { payloadSchema } from "../../schema"; 7 + import { httpPayloadSchema } from "../../schema"; 8 8 import { isAnInvalidTestUrl } from "../../utils"; 9 9 10 10 export const runtime = "edge"; ··· 19 19 export async function POST(request: Request) { 20 20 try { 21 21 const json = await request.json(); 22 - const _valid = payloadSchema 22 + const _valid = httpPayloadSchema 23 23 .pick({ url: true, method: true, headers: true, body: true }) 24 24 .merge(z.object({ region: monitorFlyRegionSchema.default("ams") })) 25 25 .safeParse(json);
+2 -2
apps/web/src/app/api/checker/test/route.ts
··· 4 4 import { monitorFlyRegionSchema } from "@openstatus/db/src/schema/constants"; 5 5 6 6 import { checkRegion } from "@/components/ping-response-analysis/utils"; 7 - import { payloadSchema } from "../schema"; 7 + import { httpPayloadSchema } from "../schema"; 8 8 import { isAnInvalidTestUrl } from "../utils"; 9 9 10 10 export const runtime = "edge"; ··· 19 19 export async function POST(request: Request) { 20 20 try { 21 21 const json = await request.json(); 22 - const _valid = payloadSchema 22 + const _valid = httpPayloadSchema 23 23 .pick({ url: true, method: true, headers: true, body: true }) 24 24 .merge(z.object({ region: monitorFlyRegionSchema.default("ams") })) 25 25 .safeParse(json);
+1
packages/db/src/schema/monitors/validation.ts
··· 49 49 }).extend({ 50 50 headers: headersToArraySchema.default([]), 51 51 body: bodyToStringSchema.default(""), 52 + // for tcp monitors the method is not needed 52 53 method: monitorMethodsSchema.default("GET"), 53 54 }); 54 55