my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add cleanup flow for old terminal flow runs

adapted from PrefectHQ/canary-flows database-cleanup.py — API-only
(no SQL/events path needed at this scale). dry_run=true by default,
runs weekly sundays at 2am UTC, keeps last 30 days.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

zzstoatzz 36718f34 79e446a4

+148
+134
flows/cleanup.py
··· 1 + """ 2 + Flow run cleanup — deletes old terminal flow runs via the Prefect API. 3 + 4 + Adapted from https://github.com/PrefectHQ/canary-flows/blob/main/flows/database-cleanup.py 5 + """ 6 + 7 + import asyncio 8 + from datetime import datetime, timedelta, timezone 9 + from typing import Literal 10 + 11 + from prefect import flow, get_run_logger, task 12 + from prefect.client.orchestration import get_client 13 + from prefect.client.schemas.filters import ( 14 + FlowRunFilter, 15 + FlowRunFilterStartTime, 16 + FlowRunFilterState, 17 + FlowRunFilterStateName, 18 + ) 19 + from prefect.exceptions import ObjectNotFound 20 + from pydantic import BaseModel, Field 21 + 22 + 23 + class RetentionConfig(BaseModel): 24 + days_to_keep: int = Field( 25 + default=30, 26 + ge=1, 27 + le=365, 28 + description="Days of flow run history to retain (1-365)", 29 + json_schema_extra={"position": 0}, 30 + ) 31 + states_to_clean: list[Literal["Completed", "Failed", "Cancelled", "Crashed"]] = Field( 32 + default=["Completed", "Failed", "Cancelled", "Crashed"], 33 + description="Terminal states to include in cleanup", 34 + json_schema_extra={"position": 1}, 35 + ) 36 + batch_size: int = Field( 37 + default=100, 38 + ge=10, 39 + le=1000, 40 + description="Deletes per batch (10-1000)", 41 + json_schema_extra={"position": 2}, 42 + ) 43 + rate_limit_delay: float = Field( 44 + default=0.5, 45 + ge=0.0, 46 + le=10.0, 47 + description="Seconds between batches (0-10)", 48 + json_schema_extra={"position": 3}, 49 + ) 50 + dry_run: bool = Field( 51 + default=True, 52 + description="Preview only — no deletions", 53 + json_schema_extra={"position": 4}, 54 + ) 55 + 56 + 57 + @task 58 + async def delete_old_flow_runs(config: RetentionConfig) -> dict: 59 + logger = get_run_logger() 60 + cutoff = datetime.now(timezone.utc) - timedelta(days=config.days_to_keep) 61 + logger.info(f"cutoff: {cutoff.strftime('%Y-%m-%d %H:%M')} UTC") 62 + logger.info(f"states: {', '.join(config.states_to_clean)}") 63 + 64 + async with get_client() as client: 65 + flow_run_filter = FlowRunFilter( 66 + start_time=FlowRunFilterStartTime(before_=cutoff), 67 + state=FlowRunFilterState( 68 + name=FlowRunFilterStateName(any_=config.states_to_clean) 69 + ), 70 + ) 71 + 72 + flow_runs = await client.read_flow_runs( 73 + flow_run_filter=flow_run_filter, limit=config.batch_size 74 + ) 75 + 76 + if not flow_runs: 77 + logger.info("nothing to clean up") 78 + return {"deleted": 0, "failed": 0} 79 + 80 + more = "+" if len(flow_runs) == config.batch_size else "" 81 + logger.info(f"found: {len(flow_runs)}{more} runs to clean") 82 + 83 + if config.dry_run: 84 + for fr in flow_runs[:5]: 85 + logger.info(f" [dry run] would delete: {fr.name} ({fr.state.name}, started {fr.start_time})") 86 + if len(flow_runs) > 5: 87 + logger.info(f" ... and {len(flow_runs) - 5} more") 88 + return {"deleted": 0, "failed": 0, "dry_run": True} 89 + 90 + deleted_total = failed_total = 0 91 + 92 + while flow_runs: 93 + async def _delete(fr_id): 94 + try: 95 + await client.delete_flow_run(fr_id) 96 + return None 97 + except ObjectNotFound: 98 + return None # already gone, idempotent 99 + except Exception as e: 100 + return str(e) 101 + 102 + results = await asyncio.gather(*[_delete(fr.id) for fr in flow_runs]) 103 + errors = [e for e in results if e] 104 + deleted_total += len(flow_runs) - len(errors) 105 + failed_total += len(errors) 106 + logger.info(f"batch: {len(flow_runs) - len(errors)}/{len(flow_runs)} | total deleted: {deleted_total:,}") 107 + for err in errors[:3]: 108 + logger.warning(f" error: {err}") 109 + 110 + if config.rate_limit_delay > 0: 111 + await asyncio.sleep(config.rate_limit_delay) 112 + 113 + flow_runs = await client.read_flow_runs( 114 + flow_run_filter=flow_run_filter, limit=config.batch_size 115 + ) 116 + 117 + logger.info(f"done: {deleted_total:,} deleted, {failed_total:,} failed") 118 + return {"deleted": deleted_total, "failed": failed_total} 119 + 120 + 121 + @flow(name="cleanup", log_prints=True) 122 + async def cleanup(config: RetentionConfig = RetentionConfig()) -> dict: 123 + """Delete old terminal flow runs. 124 + 125 + Defaults to dry_run=True — set dry_run=False to actually delete. 126 + """ 127 + logger = get_run_logger() 128 + mode = "DRY RUN" if config.dry_run else "LIVE" 129 + logger.info(f"mode: {mode} | retention: {config.days_to_keep} days") 130 + return await delete_old_flow_runs(config) 131 + 132 + 133 + if __name__ == "__main__": 134 + asyncio.run(cleanup())
+14
prefect.yaml
··· 14 14 name: kubernetes-pool 15 15 schedules: 16 16 - cron: "*/5 * * * *" 17 + 18 + - name: cleanup 19 + entrypoint: flows/cleanup.py:cleanup 20 + work_pool: 21 + name: kubernetes-pool 22 + schedules: 23 + - cron: "0 2 * * 0" 24 + parameters: 25 + config: 26 + days_to_keep: 30 27 + states_to_clean: ["Completed", "Failed", "Cancelled", "Crashed"] 28 + batch_size: 100 29 + rate_limit_delay: 0.5 30 + dry_run: true