because I got bored of customising my CV for every job
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(server): add admin module with entity lookup and queue monitor

+739
+33
apps/server/src/modules/admin/admin.module.ts
··· 1 + import { AIModule } from "@cv/ai-provider"; 2 + import { BaseModule } from "@cv/system"; 3 + import { Module } from "@nestjs/common"; 4 + import { ApplicationStatusModule } from "@/modules/application/application-status/application-status.module"; 5 + import { EducationModule } from "@/modules/education/education.module"; 6 + import { CompanyModule } from "@/modules/job-experience/company/company.module"; 7 + import { LevelModule } from "@/modules/job-experience/level/level.module"; 8 + import { RoleModule } from "@/modules/job-experience/role/role.module"; 9 + import { SkillModule } from "@/modules/job-experience/skill/skill.module"; 10 + import { OrganizationModule } from "@/modules/organization/organization.module"; 11 + import { JobTypeModule } from "@/modules/vacancies/job-type/job-type.module"; 12 + import { DatabaseModule } from "@/modules/database/database.module"; 13 + import { AdminResolver } from "./admin.resolver"; 14 + import { AdminLookupResolver } from "./graphql/admin-lookup.resolver"; 15 + import { QueueMonitorService } from "./queue-monitor.service"; 16 + 17 + @Module({ 18 + imports: [ 19 + AIModule.forConfig(), 20 + BaseModule, 21 + DatabaseModule, 22 + SkillModule, 23 + CompanyModule, 24 + RoleModule, 25 + LevelModule, 26 + EducationModule, 27 + JobTypeModule, 28 + ApplicationStatusModule, 29 + OrganizationModule, 30 + ], 31 + providers: [AdminResolver, AdminLookupResolver, QueueMonitorService], 32 + }) 33 + export class AdminModule {}
+130
apps/server/src/modules/admin/admin.resolver.ts
··· 1 + import { 2 + AI_PROVIDER, 3 + type AIProvider, 4 + registeredProviderTypes, 5 + } from "@cv/ai-provider"; 6 + import { AdminGuard, JwtAuthGuard, VerifiedScopeGuard } from "@cv/auth"; 7 + import { Inject, Optional, UseGuards } from "@nestjs/common"; 8 + import { PageInfo } from "@cv/system"; 9 + import { Args, Int, Query, Resolver } from "@nestjs/graphql"; 10 + import { 11 + AiCallLogEntryType, 12 + AiProviderStatus, 13 + QueueMessageConnection, 14 + QueueStats, 15 + SystemStatus, 16 + WorkerHealth, 17 + } from "./admin.type"; 18 + import { AiCallLogService } from "./ai-call-log.service"; 19 + import { QueueMonitorService } from "./queue-monitor.service"; 20 + 21 + @Resolver() 22 + @UseGuards(JwtAuthGuard, VerifiedScopeGuard, AdminGuard) 23 + export class AdminResolver { 24 + constructor( 25 + @Inject(AI_PROVIDER) 26 + @Optional() 27 + private readonly globalProvider: AIProvider | undefined, 28 + private readonly aiCallLogService: AiCallLogService, 29 + private readonly queueMonitorService: QueueMonitorService, 30 + ) {} 31 + 32 + @Query(() => SystemStatus) 33 + async systemStatus(): Promise<SystemStatus> { 34 + const status = new SystemStatus(); 35 + status.serverUptime = process.uptime(); 36 + status.registeredProviderTypes = registeredProviderTypes(); 37 + 38 + if (!this.globalProvider?.getStatus) { 39 + status.platformProvider = this.globalProvider 40 + ? { 41 + healthy: await this.globalProvider.isHealthy(), 42 + providerName: this.globalProvider.name, 43 + detailsJson: JSON.stringify(null), 44 + } 45 + : null; 46 + 47 + return status; 48 + } 49 + 50 + const providerStatus = await this.globalProvider.getStatus(); 51 + const platformProvider = new AiProviderStatus(); 52 + platformProvider.healthy = providerStatus.healthy; 53 + platformProvider.providerName = providerStatus.providerName; 54 + platformProvider.detailsJson = JSON.stringify( 55 + providerStatus.details ?? null, 56 + ); 57 + status.platformProvider = platformProvider; 58 + 59 + return status; 60 + } 61 + 62 + @Query(() => [AiCallLogEntryType]) 63 + aiCallLog( 64 + @Args("limit", { type: () => Int, nullable: true }) limit?: number, 65 + @Args("status", { type: () => String, nullable: true }) status?: string, 66 + @Args("providerName", { type: () => String, nullable: true }) 67 + providerName?: string, 68 + ): AiCallLogEntryType[] { 69 + const entries = this.aiCallLogService.getEntries(limit ?? undefined); 70 + 71 + return entries.filter( 72 + (e) => 73 + (!status || e.status === status) && 74 + (!providerName || e.providerName === providerName), 75 + ); 76 + } 77 + 78 + @Query(() => [AiCallLogEntryType]) 79 + async aiCallLogHistory( 80 + @Args("limit", { type: () => Int, nullable: true }) limit?: number, 81 + @Args("status", { type: () => String, nullable: true }) status?: string, 82 + @Args("providerName", { type: () => String, nullable: true }) 83 + providerName?: string, 84 + ): Promise<AiCallLogEntryType[]> { 85 + const rows = await this.aiCallLogService.queryHistory({ 86 + ...(limit != null ? { limit } : {}), 87 + ...(status != null ? { status } : {}), 88 + ...(providerName != null ? { providerName } : {}), 89 + }); 90 + 91 + return rows.map((r) => ({ 92 + id: r.id, 93 + timestamp: r.createdAt.toISOString(), 94 + providerName: r.providerName, 95 + durationMs: r.durationMs, 96 + promptTokens: r.promptTokens ?? undefined, 97 + completionTokens: r.completionTokens ?? undefined, 98 + model: r.model ?? undefined, 99 + finishReason: r.finishReason ?? undefined, 100 + status: r.status, 101 + error: r.error ?? undefined, 102 + userId: r.userId ?? undefined, 103 + source: r.source ?? undefined, 104 + })); 105 + } 106 + 107 + @Query(() => QueueStats) 108 + async queueStats(): Promise<QueueStats> { 109 + const result = await this.queueMonitorService.getStats(); 110 + return QueueStats.fromDomain(result); 111 + } 112 + 113 + @Query(() => QueueMessageConnection) 114 + async queueMessages( 115 + @Args("limit", { type: () => Int, nullable: true }) limit?: number, 116 + ): Promise<InstanceType<typeof QueueMessageConnection>> { 117 + const result = await this.queueMonitorService.getMessages(limit ?? undefined); 118 + return QueueMessageConnection.fromPaginationResult({ 119 + edges: result.map((r, i) => ({ node: r, cursor: String(i) })), 120 + pageInfo: new PageInfo(false, false, null, null), 121 + totalCount: result.length, 122 + }); 123 + } 124 + 125 + @Query(() => [WorkerHealth]) 126 + async workerHealth(): Promise<WorkerHealth[]> { 127 + const results = await this.queueMonitorService.getWorkers(); 128 + return results.map(WorkerHealth.fromDomain); 129 + } 130 + }
+179
apps/server/src/modules/admin/admin.type.ts
··· 1 + import { Field, Float, ID, Int, ObjectType } from "@nestjs/graphql"; 2 + import { createConnection } from "@/modules/base/connection.factory"; 3 + import type { 4 + QueueMessageResult, 5 + QueueStatsResult, 6 + WorkerHealthResult, 7 + } from "./queue-monitor.service"; 8 + 9 + @ObjectType() 10 + export class QueueStats { 11 + @Field(() => Int) 12 + pending: number; 13 + 14 + @Field(() => Int) 15 + scheduled: number; 16 + 17 + @Field(() => Int) 18 + processing: number; 19 + 20 + @Field(() => Float, { nullable: true }) 21 + oldestPendingSeconds: number | null; 22 + 23 + constructor(data: { 24 + pending: number; 25 + scheduled: number; 26 + processing: number; 27 + oldestPendingSeconds: number | null; 28 + }) { 29 + this.pending = data.pending; 30 + this.scheduled = data.scheduled; 31 + this.processing = data.processing; 32 + this.oldestPendingSeconds = data.oldestPendingSeconds; 33 + } 34 + 35 + static fromDomain(result: QueueStatsResult): QueueStats { 36 + return new QueueStats(result); 37 + } 38 + } 39 + 40 + @ObjectType() 41 + export class QueueMessage { 42 + @Field(() => ID) 43 + id: string; 44 + 45 + @Field() 46 + queueName: string; 47 + 48 + @Field(() => String, { nullable: true }) 49 + messageName: string | null; 50 + 51 + @Field() 52 + status: string; 53 + 54 + @Field() 55 + createdAt: Date; 56 + 57 + constructor(data: { 58 + id: string; 59 + queueName: string; 60 + messageName: string | null; 61 + status: string; 62 + createdAt: Date; 63 + }) { 64 + this.id = data.id; 65 + this.queueName = data.queueName; 66 + this.messageName = data.messageName; 67 + this.status = data.status; 68 + this.createdAt = data.createdAt; 69 + } 70 + 71 + static fromDomain(result: QueueMessageResult): QueueMessage { 72 + return new QueueMessage(result); 73 + } 74 + } 75 + 76 + export const { 77 + Connection: QueueMessageConnection, 78 + Edge: QueueMessageEdge, 79 + } = createConnection<QueueMessage, QueueMessageResult>( 80 + QueueMessage, 81 + QueueMessage.fromDomain, 82 + ); 83 + 84 + export type QueueMessageConnection = InstanceType<typeof QueueMessageConnection>; 85 + export type QueueMessageEdge = InstanceType<typeof QueueMessageEdge>; 86 + 87 + @ObjectType() 88 + export class WorkerHealth { 89 + @Field() 90 + workerId: string; 91 + 92 + @Field() 93 + status: string; 94 + 95 + @Field() 96 + startedAt: Date; 97 + 98 + @Field() 99 + lastSeenAt: Date; 100 + 101 + constructor(data: { 102 + workerId: string; 103 + status: string; 104 + startedAt: Date; 105 + lastSeenAt: Date; 106 + }) { 107 + this.workerId = data.workerId; 108 + this.status = data.status; 109 + this.startedAt = data.startedAt; 110 + this.lastSeenAt = data.lastSeenAt; 111 + } 112 + 113 + static fromDomain(result: WorkerHealthResult): WorkerHealth { 114 + return new WorkerHealth(result); 115 + } 116 + } 117 + 118 + @ObjectType() 119 + export class AiProviderStatus { 120 + @Field() 121 + healthy!: boolean; 122 + 123 + @Field() 124 + providerName!: string; 125 + 126 + @Field({ description: "JSON-serialized provider details" }) 127 + detailsJson!: string; 128 + } 129 + 130 + @ObjectType() 131 + export class SystemStatus { 132 + @Field(() => AiProviderStatus, { nullable: true }) 133 + platformProvider!: AiProviderStatus | null; 134 + 135 + @Field(() => [String]) 136 + registeredProviderTypes!: string[]; 137 + 138 + @Field(() => Float, { description: "Server uptime in seconds" }) 139 + serverUptime!: number; 140 + } 141 + 142 + @ObjectType() 143 + export class AiCallLogEntryType { 144 + @Field() 145 + id!: string; 146 + 147 + @Field() 148 + timestamp!: string; 149 + 150 + @Field() 151 + providerName!: string; 152 + 153 + @Field(() => Int) 154 + durationMs!: number; 155 + 156 + @Field(() => Int, { nullable: true }) 157 + promptTokens?: number | undefined; 158 + 159 + @Field(() => Int, { nullable: true }) 160 + completionTokens?: number | undefined; 161 + 162 + @Field(() => String, { nullable: true }) 163 + model?: string | undefined; 164 + 165 + @Field(() => String, { nullable: true }) 166 + finishReason?: string | undefined; 167 + 168 + @Field() 169 + status!: string; 170 + 171 + @Field(() => String, { nullable: true }) 172 + error?: string | undefined; 173 + 174 + @Field(() => String, { nullable: true }) 175 + userId?: string | undefined; 176 + 177 + @Field(() => String, { nullable: true }) 178 + source?: string | undefined; 179 + }
+210
apps/server/src/modules/admin/graphql/admin-lookup.resolver.ts
··· 1 + import { AdminGuard, JwtAuthGuard, VerifiedScopeGuard } from "@cv/auth"; 2 + import type { NamedEntity } from "@cv/system"; 3 + import { ClockService, UuidFactoryService } from "@cv/system"; 4 + import { UseGuards } from "@nestjs/common"; 5 + import { Args, Mutation, Query, Resolver } from "@nestjs/graphql"; 6 + import { ApplicationStatusService } from "@/modules/application/application-status/application-status.service"; 7 + import { ApplicationStatus } from "@/modules/application/application-status/application-status.entity"; 8 + import { SkillService } from "@/modules/job-experience/skill/skill.service"; 9 + import { Skill } from "@/modules/job-experience/skill/skill.entity"; 10 + import { CompanyService } from "@/modules/job-experience/company/company.service"; 11 + import { Company } from "@/modules/job-experience/company/company.entity"; 12 + import { RoleService } from "@/modules/job-experience/role/role.service"; 13 + import { Role } from "@/modules/job-experience/role/role.entity"; 14 + import { LevelService } from "@/modules/job-experience/level/level.service"; 15 + import { Level } from "@/modules/job-experience/level/level.entity"; 16 + import { InstitutionService } from "@/modules/education/institution.service"; 17 + import { Institution } from "@/modules/education/institution.entity"; 18 + import { JobTypeService } from "@/modules/vacancies/job-type/job-type.service"; 19 + import { JobType } from "@/modules/vacancies/job-type/job-type.entity"; 20 + import { OrganizationService } from "@/modules/organization/organization.service"; 21 + import { Organization } from "@/modules/organization/organization.entity"; 22 + import { OrganizationRoleService } from "@/modules/organization/organization-role.service"; 23 + import { OrganizationRole } from "@/modules/organization/organization-role.entity"; 24 + import { AdminEntityType, AdminLookupEntity } from "./admin-lookup.type"; 25 + 26 + type EntityLike = { 27 + id: string; 28 + name: string; 29 + description?: string | null | undefined; 30 + createdAt: Date; 31 + updatedAt: Date; 32 + }; 33 + 34 + interface LookupAdapter { 35 + findMany(searchTerm?: string): Promise<EntityLike[]>; 36 + findByIdOrFail(id: string): Promise<EntityLike>; 37 + save(entity: EntityLike): Promise<EntityLike>; 38 + destroy(entity: EntityLike): Promise<void>; 39 + create(id: string, name: string, now: Date, description?: string): EntityLike; 40 + } 41 + 42 + const namedEntityAdapter = <T extends NamedEntity>( 43 + service: { 44 + findMany(filters?: { searchTerm?: string }): Promise<T[]>; 45 + findByIdOrFail(id: string): Promise<T>; 46 + save(entity: T): Promise<T>; 47 + destroy(entity: T): Promise<void>; 48 + }, 49 + EntityClass: new ( 50 + id: string, 51 + name: string, 52 + createdAt: Date, 53 + updatedAt: Date, 54 + description?: string, 55 + ) => T, 56 + ): LookupAdapter => ({ 57 + findMany: (searchTerm) => 58 + service.findMany(searchTerm ? { searchTerm } : undefined), 59 + findByIdOrFail: (id) => service.findByIdOrFail(id), 60 + save: (entity) => service.save(entity as T), 61 + destroy: (entity) => service.destroy(entity as T), 62 + create: (id, name, now, description) => 63 + new EntityClass(id, name, now, now, description), 64 + }); 65 + 66 + @Resolver() 67 + @UseGuards(JwtAuthGuard, VerifiedScopeGuard, AdminGuard) 68 + export class AdminLookupResolver { 69 + private readonly registry: Record<AdminEntityType, LookupAdapter>; 70 + 71 + constructor( 72 + private readonly uuid: UuidFactoryService, 73 + private readonly clock: ClockService, 74 + skillService: SkillService, 75 + companyService: CompanyService, 76 + roleService: RoleService, 77 + levelService: LevelService, 78 + institutionService: InstitutionService, 79 + jobTypeService: JobTypeService, 80 + applicationStatusService: ApplicationStatusService, 81 + organizationService: OrganizationService, 82 + organizationRoleService: OrganizationRoleService, 83 + ) { 84 + this.registry = { 85 + [AdminEntityType.SKILL]: namedEntityAdapter(skillService, Skill), 86 + [AdminEntityType.COMPANY]: namedEntityAdapter(companyService, Company), 87 + [AdminEntityType.ROLE]: namedEntityAdapter(roleService, Role), 88 + [AdminEntityType.LEVEL]: namedEntityAdapter(levelService, Level), 89 + [AdminEntityType.INSTITUTION]: namedEntityAdapter( 90 + institutionService, 91 + Institution, 92 + ), 93 + [AdminEntityType.JOB_TYPE]: namedEntityAdapter(jobTypeService, JobType), 94 + [AdminEntityType.APPLICATION_STATUS]: namedEntityAdapter( 95 + applicationStatusService, 96 + ApplicationStatus, 97 + ), 98 + [AdminEntityType.ORGANIZATION]: { 99 + findMany: (searchTerm) => 100 + organizationService.findMany().then((orgs) => 101 + searchTerm 102 + ? orgs.filter((o) => 103 + o.name.toLowerCase().includes(searchTerm.toLowerCase()), 104 + ) 105 + : orgs, 106 + ), 107 + findByIdOrFail: (id) => organizationService.findByIdOrFail(id), 108 + save: (entity) => 109 + organizationService.save(entity as unknown as Organization), 110 + destroy: (entity) => 111 + organizationService.destroy(entity as unknown as Organization), 112 + create: (id, name, now, description) => 113 + new Organization(id, name, now, now, description ?? null), 114 + }, 115 + [AdminEntityType.ORGANIZATION_ROLE]: { 116 + findMany: (searchTerm) => 117 + organizationRoleService.findAll().then((roles) => 118 + searchTerm 119 + ? roles.filter((r) => 120 + r.name.toLowerCase().includes(searchTerm.toLowerCase()), 121 + ) 122 + : roles, 123 + ), 124 + findByIdOrFail: (id) => organizationRoleService.findByIdOrFail(id), 125 + save: async (entity) => { 126 + const desc = 127 + entity.description != null ? entity.description : undefined; 128 + const existing = await organizationRoleService.findById(entity.id); 129 + return existing 130 + ? organizationRoleService.update(entity.id, { 131 + name: entity.name, 132 + ...(desc !== undefined ? { description: desc } : {}), 133 + }) 134 + : organizationRoleService.create({ 135 + name: entity.name, 136 + ...(desc !== undefined ? { description: desc } : {}), 137 + }); 138 + }, 139 + destroy: (entity) => organizationRoleService.delete(entity.id), 140 + create: (id, name, now, description) => 141 + new OrganizationRole(id, name, now, now, description ?? null), 142 + }, 143 + }; 144 + } 145 + 146 + @Query(() => [AdminLookupEntity]) 147 + async adminLookupEntities( 148 + @Args("entityType", { type: () => AdminEntityType }) 149 + entityType: AdminEntityType, 150 + @Args("searchTerm", { nullable: true }) searchTerm?: string, 151 + ): Promise<AdminLookupEntity[]> { 152 + const adapter = this.registry[entityType]; 153 + const entities = await adapter.findMany(searchTerm ?? undefined); 154 + return entities.map(AdminLookupEntity.fromDomain); 155 + } 156 + 157 + @Mutation(() => AdminLookupEntity) 158 + async adminCreateLookupEntity( 159 + @Args("entityType", { type: () => AdminEntityType }) 160 + entityType: AdminEntityType, 161 + @Args("name") name: string, 162 + @Args("description", { nullable: true }) description?: string, 163 + ): Promise<AdminLookupEntity> { 164 + const adapter = this.registry[entityType]; 165 + const now = this.clock.now(); 166 + const entity = adapter.create( 167 + this.uuid.generate(), 168 + name, 169 + now, 170 + description, 171 + ); 172 + const saved = await adapter.save(entity); 173 + return AdminLookupEntity.fromDomain(saved); 174 + } 175 + 176 + @Mutation(() => AdminLookupEntity) 177 + async adminUpdateLookupEntity( 178 + @Args("entityType", { type: () => AdminEntityType }) 179 + entityType: AdminEntityType, 180 + @Args("id") id: string, 181 + @Args("name", { nullable: true }) name?: string, 182 + @Args("description", { nullable: true }) description?: string, 183 + ): Promise<AdminLookupEntity> { 184 + const adapter = this.registry[entityType]; 185 + const existing = await adapter.findByIdOrFail(id); 186 + const updated = adapter.create( 187 + existing.id, 188 + name ?? existing.name, 189 + existing.createdAt, 190 + description !== undefined 191 + ? description 192 + : (existing.description ?? undefined), 193 + ); 194 + (updated as { updatedAt: Date }).updatedAt = this.clock.now(); 195 + const saved = await adapter.save(updated); 196 + return AdminLookupEntity.fromDomain(saved); 197 + } 198 + 199 + @Mutation(() => Boolean) 200 + async adminDeleteLookupEntity( 201 + @Args("entityType", { type: () => AdminEntityType }) 202 + entityType: AdminEntityType, 203 + @Args("id") id: string, 204 + ): Promise<boolean> { 205 + const adapter = this.registry[entityType]; 206 + const entity = await adapter.findByIdOrFail(id); 207 + await adapter.destroy(entity); 208 + return true; 209 + } 210 + }
+63
apps/server/src/modules/admin/graphql/admin-lookup.type.ts
··· 1 + import { Field, ID, ObjectType, registerEnumType } from "@nestjs/graphql"; 2 + 3 + export enum AdminEntityType { 4 + SKILL = "SKILL", 5 + COMPANY = "COMPANY", 6 + ROLE = "ROLE", 7 + LEVEL = "LEVEL", 8 + INSTITUTION = "INSTITUTION", 9 + JOB_TYPE = "JOB_TYPE", 10 + APPLICATION_STATUS = "APPLICATION_STATUS", 11 + ORGANIZATION = "ORGANIZATION", 12 + ORGANIZATION_ROLE = "ORGANIZATION_ROLE", 13 + } 14 + 15 + registerEnumType(AdminEntityType, { name: "AdminEntityType" }); 16 + 17 + @ObjectType() 18 + export class AdminLookupEntity { 19 + @Field(() => ID) 20 + id: string; 21 + 22 + @Field() 23 + name: string; 24 + 25 + @Field(() => String, { nullable: true }) 26 + description: string | null; 27 + 28 + @Field(() => Date) 29 + createdAt: Date; 30 + 31 + @Field(() => Date) 32 + updatedAt: Date; 33 + 34 + constructor( 35 + id: string, 36 + name: string, 37 + description: string | null, 38 + createdAt: Date, 39 + updatedAt: Date, 40 + ) { 41 + this.id = id; 42 + this.name = name; 43 + this.description = description; 44 + this.createdAt = createdAt; 45 + this.updatedAt = updatedAt; 46 + } 47 + 48 + static fromDomain(entity: { 49 + id: string; 50 + name: string; 51 + description?: string | null | undefined; 52 + createdAt: Date; 53 + updatedAt: Date; 54 + }): AdminLookupEntity { 55 + return new AdminLookupEntity( 56 + entity.id, 57 + entity.name, 58 + entity.description ?? null, 59 + entity.createdAt, 60 + entity.updatedAt, 61 + ); 62 + } 63 + }
+124
apps/server/src/modules/admin/queue-monitor.service.ts
··· 1 + import { Injectable } from "@nestjs/common"; 2 + import { PrismaService } from "@/modules/database/prisma.service"; 3 + 4 + interface QueueStatsRow { 5 + pending: bigint; 6 + scheduled: bigint; 7 + processing: bigint; 8 + oldest_pending_seconds: number | null; 9 + } 10 + 11 + interface QueueMessageRow { 12 + id: string; 13 + queue_name: string; 14 + message_name: string | null; 15 + created_at: Date; 16 + available_at: Date; 17 + delivered_at: Date | null; 18 + } 19 + 20 + interface WorkerHeartbeatRow { 21 + worker_id: string; 22 + started_at: Date; 23 + last_seen_at: Date; 24 + } 25 + 26 + export interface QueueStatsResult { 27 + pending: number; 28 + scheduled: number; 29 + processing: number; 30 + oldestPendingSeconds: number | null; 31 + } 32 + 33 + export interface QueueMessageResult { 34 + id: string; 35 + queueName: string; 36 + messageName: string | null; 37 + status: "pending" | "scheduled" | "processing"; 38 + createdAt: Date; 39 + } 40 + 41 + export interface WorkerHealthResult { 42 + workerId: string; 43 + status: "healthy" | "stale" | "dead"; 44 + startedAt: Date; 45 + lastSeenAt: Date; 46 + } 47 + 48 + const HEALTHY_THRESHOLD_SECONDS = 60; 49 + const STALE_THRESHOLD_SECONDS = 300; 50 + 51 + const deriveWorkerStatus = ( 52 + lastSeenAt: Date, 53 + ): "healthy" | "stale" | "dead" => { 54 + const ageSeconds = (Date.now() - lastSeenAt.getTime()) / 1000; 55 + if (ageSeconds <= HEALTHY_THRESHOLD_SECONDS) return "healthy"; 56 + if (ageSeconds <= STALE_THRESHOLD_SECONDS) return "stale"; 57 + return "dead"; 58 + }; 59 + 60 + const deriveMessageStatus = ( 61 + row: QueueMessageRow, 62 + ): "pending" | "scheduled" | "processing" => { 63 + if (row.delivered_at) return "processing"; 64 + return row.available_at > new Date() ? "scheduled" : "pending"; 65 + }; 66 + 67 + /** 68 + * Read-only monitoring service for the project-q queue tables. 69 + * Uses raw SQL since queue tables live in the `queue` schema (not managed by Prisma). 70 + */ 71 + @Injectable() 72 + export class QueueMonitorService { 73 + constructor(private readonly prisma: PrismaService) {} 74 + 75 + async getStats(): Promise<QueueStatsResult> { 76 + const rows = await this.prisma.$queryRaw<QueueStatsRow[]>` 77 + SELECT 78 + COUNT(*) FILTER (WHERE delivered_at IS NULL AND available_at <= now()) AS pending, 79 + COUNT(*) FILTER (WHERE delivered_at IS NULL AND available_at > now()) AS scheduled, 80 + COUNT(*) FILTER (WHERE delivered_at IS NOT NULL) AS processing, 81 + EXTRACT(EPOCH FROM now() - MIN(available_at) FILTER (WHERE delivered_at IS NULL AND available_at <= now())) AS oldest_pending_seconds 82 + FROM queue.messages 83 + `; 84 + 85 + const row = rows[0]; 86 + return { 87 + pending: Number(row?.pending ?? 0), 88 + scheduled: Number(row?.scheduled ?? 0), 89 + processing: Number(row?.processing ?? 0), 90 + oldestPendingSeconds: row?.oldest_pending_seconds ?? null, 91 + }; 92 + } 93 + 94 + async getMessages(limit = 50): Promise<QueueMessageResult[]> { 95 + const clampedLimit = Math.max(1, Math.min(limit, 500)); 96 + const rows = await this.prisma.$queryRaw<QueueMessageRow[]>` 97 + SELECT id, queue_name, body->'message'->>'name' AS message_name, 98 + created_at, available_at, delivered_at 99 + FROM queue.messages ORDER BY created_at DESC LIMIT ${clampedLimit} 100 + `; 101 + 102 + return rows.map((row) => ({ 103 + id: String(row.id), 104 + queueName: row.queue_name, 105 + messageName: row.message_name, 106 + status: deriveMessageStatus(row), 107 + createdAt: row.created_at, 108 + })); 109 + } 110 + 111 + async getWorkers(): Promise<WorkerHealthResult[]> { 112 + const rows = await this.prisma.$queryRaw<WorkerHeartbeatRow[]>` 113 + SELECT worker_id, started_at, last_seen_at 114 + FROM queue.worker_heartbeats ORDER BY started_at DESC 115 + `; 116 + 117 + return rows.map((row) => ({ 118 + workerId: row.worker_id, 119 + status: deriveWorkerStatus(row.last_seen_at), 120 + startedAt: row.started_at, 121 + lastSeenAt: row.last_seen_at, 122 + })); 123 + } 124 + }