my prefect server setup prefect-metrics.waow.tech
python orchestration
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

parameterize all bare dict type annotations across flows

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+22 -19
+4 -3
flows/brief.py
··· 4 4 from dataclasses import dataclass 5 5 from datetime import datetime, timedelta, timezone 6 6 from pathlib import Path 7 + from typing import Any 7 8 8 9 import duckdb 9 10 from pydantic_ai import Agent ··· 24 25 def compute_key( 25 26 self, 26 27 task_ctx: TaskRunContext, 27 - inputs: dict, 28 - flow_parameters: dict, 29 - **kwargs, 28 + inputs: dict[str, Any], 29 + flow_parameters: dict[str, Any], 30 + **kwargs: Any, 30 31 ) -> str | None: 31 32 items_text = inputs.get("items_text") 32 33 if items_text is None:
+3 -3
flows/cleanup.py
··· 6 6 7 7 import asyncio 8 8 from datetime import datetime, timedelta, timezone 9 - from typing import Literal 9 + from typing import Any, Literal 10 10 11 11 from prefect import flow, get_run_logger, task 12 12 from prefect.client.orchestration import get_client ··· 55 55 56 56 57 57 @task 58 - async def delete_old_flow_runs(config: RetentionConfig) -> dict: 58 + async def delete_old_flow_runs(config: RetentionConfig) -> dict[str, Any]: 59 59 logger = get_run_logger() 60 60 cutoff = datetime.now(timezone.utc) - timedelta(days=config.days_to_keep) 61 61 logger.info(f"cutoff: {cutoff.strftime('%Y-%m-%d %H:%M')} UTC") ··· 128 128 129 129 130 130 @flow(name="cleanup", flow_run_name=_cleanup_run_name, log_prints=True) 131 - async def cleanup(config: RetentionConfig = RetentionConfig()) -> dict: 131 + async def cleanup(config: RetentionConfig = RetentionConfig()) -> dict[str, Any]: 132 132 """Delete old terminal flow runs. 133 133 134 134 Defaults to dry_run=True — set dry_run=False to actually delete.
+11 -10
flows/compact.py
··· 11 11 import os 12 12 import shutil 13 13 from datetime import datetime, timedelta, timezone 14 + from typing import Any 14 15 15 16 import duckdb 16 17 import httpx ··· 49 50 def compute_key( 50 51 self, 51 52 task_ctx: TaskRunContext, 52 - inputs: dict, 53 - flow_parameters: dict, 54 - **kwargs, 53 + inputs: dict[str, Any], 54 + flow_parameters: dict[str, Any], 55 + **kwargs: Any, 55 56 ) -> str | None: 56 57 handle = inputs.get("handle") 57 58 observations_text = inputs.get("observations_text") ··· 70 71 71 72 72 73 @task 73 - def load_user_profiles(snap_path: str) -> list[dict]: 74 + def load_user_profiles(snap_path: str) -> list[dict[str, Any]]: 74 75 """Read per-user profiles from the dbt enrichment model.""" 75 76 db = duckdb.connect(snap_path, read_only=True) 76 77 rows = db.execute( ··· 126 127 127 128 128 129 @task 129 - def resolve_bsky_profile(handle: str) -> dict | None: 130 + def resolve_bsky_profile(handle: str) -> dict[str, str] | None: 130 131 """Fetch display name and bio from the public Bluesky API.""" 131 132 try: 132 133 resp = httpx.get( ··· 145 146 return None 146 147 147 148 148 - def _format_stats(profile: dict) -> str: 149 + def _format_stats(profile: dict[str, Any]) -> str: 149 150 tags = ", ".join(profile.get("top_tags") or []) 150 151 return ( 151 152 f"observations: {profile['observation_count']}, " ··· 169 170 observations_text: str, 170 171 interactions_text: str, 171 172 api_key: str, 172 - bsky_profile: dict | None = None, 173 + bsky_profile: dict[str, str] | None = None, 173 174 ) -> str: 174 175 """LLM synthesis of a relationship summary. Cached by observations hash.""" 175 176 model = AnthropicModel("claude-haiku-4-5", provider=AnthropicProvider(api_key=api_key)) ··· 203 204 def compute_key( 204 205 self, 205 206 task_ctx: TaskRunContext, 206 - inputs: dict, 207 - flow_parameters: dict, 208 - **kwargs, 207 + inputs: dict[str, Any], 208 + flow_parameters: dict[str, Any], 209 + **kwargs: Any, 209 210 ) -> str | None: 210 211 handle = inputs.get("handle") 211 212 summary = inputs.get("summary")
+4 -3
flows/ingest.py
··· 14 14 import datetime 15 15 import os 16 16 from dataclasses import dataclass 17 + from typing import Any 17 18 18 19 import httpx 19 20 from prefect import flow, get_run_logger, task, unmapped ··· 51 52 def compute_key( 52 53 self, 53 54 task_ctx: TaskRunContext, 54 - inputs: dict, 55 - flow_parameters: dict, 56 - **kwargs, 55 + inputs: dict[str, Any], 56 + flow_parameters: dict[str, Any], 57 + **kwargs: Any, 57 58 ) -> str | None: 58 59 ref: IssueRef | None = inputs.get("ref") 59 60 if ref is None: