bring back yahoo pipes!
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: refactor to go and fix indiko

+3662 -169
+8 -8
.env.example
··· 1 - ORIGIN=https://pipes.yourdomain.com 2 - PORT=3000 3 - NODE_ENV=production 4 - DATABASE_URL=data/pipes.db 1 + # Pipes Secrets 2 + # All other configuration is in config.yaml 3 + # Copy this file to .env and fill in the secrets 5 4 6 - # Indiko OAuth Configuration 7 - INDIKO_CLIENT_ID=ikc_xxxxxxxxxxxxxxxxxxxxx 8 - INDIKO_CLIENT_SECRET=iks_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 9 - INDIKO_ORIGIN=https://indiko.dunkirk.sh 5 + # OAuth (Indiko) 6 + INDIKO_CLIENT_SECRET=your_client_secret_here 7 + 8 + # Session (generate with: openssl rand -base64 32) 9 + SESSION_SECRET=your_random_secret_here
+17 -6
.gitignore
··· 1 - node_modules/ 2 - .wrangler/ 3 - dist/ 4 - .dev.vars 1 + # Go 2 + pipes 3 + *.exe 4 + *.dll 5 + *.so 6 + *.dylib 7 + 8 + # Environment and config 5 9 .env 6 - bun.lockb 7 - data/ 10 + config.yaml 11 + 12 + # Database 8 13 *.db 9 14 *.db-shm 10 15 *.db-wal 16 + data/ 11 17 18 + # OS 12 19 .DS_Store 20 + 21 + # IDE 22 + .idea/ 23 + .vscode/
+72
CLAUDE.md
··· 1 + # Pipes - Project Instructions 2 + 3 + This is a Go application following Herald's architecture patterns. 4 + 5 + ## Tech Stack 6 + 7 + - **Language**: Go 1.24+ 8 + - **Database**: SQLite with direct SQL (no ORM) 9 + - **Auth**: Indiko OAuth 2.0 server 10 + - **Logging**: charmbracelet/log for structured logging 11 + - **Frontend**: Go html/template + Vanilla JavaScript 12 + - **Deployment**: Single static binary 13 + 14 + ## Development Commands 15 + 16 + ```bash 17 + # Build the project 18 + go build -o pipes . 19 + 20 + # Run in development 21 + ./pipes serve -c config.yaml 22 + 23 + # Run with hot reload (using a tool like air) 24 + air 25 + 26 + # Initialize config files 27 + ./pipes init 28 + 29 + # Run tests 30 + go test ./... 31 + ``` 32 + 33 + ## Configuration 34 + 35 + - **YAML Config** (config.yaml): All non-sensitive configuration 36 + - **Environment** (.env): Secrets only (INDIKO_CLIENT_SECRET, SESSION_SECRET) 37 + - YAML supports env var expansion: `${VAR}` syntax 38 + 39 + See config.yaml.example and .env.example for templates. 40 + 41 + ## Architecture 42 + 43 + Follow Herald's patterns: 44 + - Clean separation of concerns (config/, store/, auth/, engine/, nodes/, web/) 45 + - SQLite with WAL mode 46 + - Structured logging with charm log 47 + - Graceful shutdown with signal handling 48 + - Session-based authentication with Indiko OAuth 49 + 50 + ## Code Style 51 + 52 + - Use `gofmt` for formatting 53 + - Structured logging: `logger.Info("message", "key", value)` 54 + - Error wrapping: `fmt.Errorf("context: %w", err)` 55 + - Context propagation for cancellation 56 + - Foreign key constraints in SQLite 57 + 58 + ## Design Aesthetic 59 + 60 + **Neo-Brutalism** - Bold, geometric design: 61 + - Space Grotesk font 62 + - Hard borders (3-4px solid) 63 + - Hard box shadows (no blur) 64 + - High contrast colors 65 + - Sharp, geometric shapes 66 + - Color palette: 67 + - Primary: #2563eb (blue) 68 + - Secondary: #ff6b35 (orange) 69 + - Auth/Indiko: #AB4967 (pink) 70 + - Dark: #26242b (near-black) 71 + - Background: #f5f5f0 (warm off-white) 72 + - White: #fff
+187
CRUSH.md
··· 1 + # Crush Memory - Pipes Project 2 + 3 + ## User Preferences 4 + 5 + - Follow Herald's Go architecture patterns 6 + - Use direct SQL for all database operations (no ORM) 7 + - Follow neo-brutalist design aesthetic (blue/orange for app, pink only for auth) 8 + - Use Indiko for authentication (OAuth 2.0 with PKCE) 9 + - Run on port 3001 (Indiko runs on 3000) 10 + - Use charmbracelet/log for structured logging 11 + 12 + ## Architecture Patterns 13 + 14 + ### Authentication Flow 15 + - OAuth 2.0 client that uses Indiko for authentication 16 + - PKCE flow with code verifier/challenge (required by IndieAuth spec) 17 + - Auto-registration with Indiko (client ID = app URL) 18 + - Session-based auth with 30-day cookies 19 + - Automatic token refresh using refresh tokens 20 + 21 + ### Database (SQLite) 22 + - Direct SQL queries (no ORM like Kysely or GORM) 23 + - SQLite with WAL mode for concurrency 24 + - Foreign key constraints enabled 25 + - Schema created automatically on startup 26 + 27 + ### Project Structure 28 + ``` 29 + pipes/ 30 + ├── main.go # Entry point, CLI setup 31 + ├── go.mod # Dependencies 32 + ├── config/ 33 + │ ├── app.go # Config struct & loading (like Herald) 34 + │ └── validate.go # Config validation 35 + ├── store/ 36 + │ ├── db.go # SQLite setup & schema 37 + │ ├── users.go # User operations 38 + │ ├── pipes.go # Pipe CRUD 39 + │ ├── executions.go # Execution history 40 + │ └── cache.go # Source cache operations 41 + ├── auth/ 42 + │ ├── oauth.go # OAuth 2.0 client (Indiko) 43 + │ ├── session.go # Session management 44 + │ └── middleware.go # Auth middleware 45 + ├── engine/ 46 + │ ├── executor.go # Pipeline execution engine 47 + │ ├── scheduler.go # Cron-based scheduler (Herald pattern) 48 + │ └── registry.go # Node type registry 49 + ├── nodes/ 50 + │ ├── node.go # Node interface 51 + │ ├── sources/ 52 + │ │ ├── rss.go # RSS/Atom source 53 + │ │ └── http.go # HTTP API source 54 + │ └── transforms/ 55 + │ ├── filter.go # Filter transform 56 + │ ├── sort.go # Sort transform 57 + │ ├── limit.go # Limit transform 58 + │ ├── merge.go # Merge sources 59 + │ ├── dedupe.go # Deduplicate items 60 + │ └── extract.go # Extract/transform fields 61 + └── web/ 62 + ├── server.go # HTTP server setup 63 + ├── handlers.go # Route handlers 64 + ├── api.go # JSON API endpoints 65 + └── templates/ 66 + ├── layout.html # Base layout 67 + ├── index.html # Landing page 68 + ├── dashboard.html # User dashboard 69 + ├── editor.html # Visual editor 70 + └── style.css # Frutiger Aero styles 71 + ``` 72 + 73 + ### Database Schema 74 + - **users**: id, indiko_sub, username, name, email, photo, url, role, created_at, updated_at 75 + - `indiko_sub` is the "sub" field from Indiko's userinfo endpoint 76 + - `role` is either "user" or "admin" (synced from Indiko) 77 + - **sessions**: id, user_id, access_token, refresh_token, expires_at, created_at 78 + - 30-day sessions with automatic token refresh 79 + - **pipes**: id, user_id, name, description, config (JSON), is_public, created_at, updated_at 80 + - `config` is JSON: {version, nodes[], connections[], settings} 81 + - **scheduled_jobs**: id, pipe_id, cron_expression, next_run_at, last_run_at, enabled, created_at, updated_at 82 + - **pipe_executions**: id, pipe_id, status, trigger_type, started_at, completed_at, duration_ms, items_processed, error_message, metadata 83 + - **execution_logs**: id, execution_id, node_id, level, message, timestamp, metadata 84 + - **source_cache**: id, pipe_id, node_id, cache_key, data, etag, last_modified, expires_at, created_at 85 + 86 + ### OAuth Configuration 87 + - **Client ID**: App's URL (e.g., `http://localhost:3001`) 88 + - **Client Secret**: Optional, for pre-registered clients (stored in .env) 89 + - **Scopes**: `profile email` 90 + - **PKCE**: Required (S256 code challenge method) 91 + - **Callback URL**: `{ORIGIN}/auth/callback` 92 + 93 + ## Configuration 94 + 95 + ### YAML Config (config.yaml) 96 + Contains all non-sensitive configuration: 97 + ```yaml 98 + # Server settings 99 + host: localhost 100 + port: 3001 101 + origin: http://localhost:3001 102 + env: development 103 + log_level: info # debug, info, warn, error, fatal 104 + 105 + # Database 106 + db_path: pipes.db 107 + 108 + # OAuth (Indiko) 109 + indiko_url: http://localhost:3000 110 + indiko_client_id: http://localhost:3001 111 + indiko_client_secret: ${INDIKO_CLIENT_SECRET} # Loaded from .env 112 + oauth_callback_url: http://localhost:3001/auth/callback 113 + 114 + # Session 115 + session_secret: ${SESSION_SECRET} # Loaded from .env 116 + session_cookie_name: pipes_session 117 + ``` 118 + 119 + ### Environment Variables (.env) 120 + Contains **only secrets**: 121 + ```env 122 + # OAuth (Indiko) 123 + INDIKO_CLIENT_SECRET=your_client_secret_here 124 + 125 + # Session (generate with: openssl rand -base64 32) 126 + SESSION_SECRET=your_random_secret_here 127 + ``` 128 + 129 + ## Routes 130 + 131 + ### Public 132 + - `GET /` - Landing page (redirects to /dashboard if authenticated) 133 + - `GET /auth/login` - Start OAuth flow 134 + - `GET /auth/callback` - OAuth callback handler 135 + 136 + ### Authenticated 137 + - `GET /dashboard` - User dashboard (requires auth) 138 + - `GET /pipes/:id/edit` - Visual editor (requires auth) 139 + - `POST /auth/logout` - End session 140 + - `GET /api/me` - Get current user info 141 + - `GET /api/pipes` - List user's pipes 142 + - `GET /api/pipes/:id` - Get pipe config 143 + - `POST /api/pipes` - Create pipe 144 + - `PUT /api/pipes/:id` - Update pipe 145 + - `DELETE /api/pipes/:id` - Delete pipe 146 + - `POST /api/pipes/:id/execute` - Execute pipe manually 147 + - `GET /api/pipes/:id/executions` - Execution history 148 + - `GET /api/executions/:id/logs` - Execution logs 149 + - `GET /api/node-types` - Available node types 150 + 151 + ## Code Style 152 + 153 + - Use `gofmt` for formatting 154 + - Structured logging with charmbracelet/log: `logger.Info("message", "key", value)` 155 + - Error wrapping: `fmt.Errorf("context: %w", err)` 156 + - Context propagation for cancellation 157 + - Session cookies named `pipes_session` 158 + - Authorization header: `Bearer {token}` 159 + 160 + ## Design Aesthetic 161 + 162 + **Neo-Brutalism** - Bold, geometric design: 163 + - Space Grotesk font family 164 + - Hard borders (3-4px solid #26242b) 165 + - Hard box shadows (6-12px offset, no blur) 166 + - High contrast, sharp edges 167 + - Uppercase text, tight letter-spacing 168 + - Color palette: 169 + - **App colors**: 170 + - Primary: #2563eb (blue) - used for headings, secondary buttons 171 + - Secondary: #ff6b35 (orange) - used for primary buttons, accents 172 + - Background: #f5f5f0 (warm off-white) 173 + - Dark: #26242b (near-black) 174 + - White: #fff 175 + - **Auth/Indiko colors** (login, error pages): 176 + - Auth primary: #AB4967 (muted pink) 177 + - Auth text: #fff (white - for text on pink buttons) 178 + 179 + ## Commands 180 + 181 + ```bash 182 + go build -o pipes . # Build 183 + ./pipes serve # Run server 184 + ./pipes init # Initialize config files 185 + ./pipes help # Show help 186 + ./pipes version # Show version 187 + ```
+187 -23
README.md
··· 1 1 # Pipes 2 2 3 - This is my interperitation of yahoo pipes from back in the day! It is designed to allow you to string together pipelines of data and do cool stuff! 3 + This is my interpretation of Yahoo Pipes from back in the day! It is designed to allow you to string together pipelines of data and do cool stuff with a modern Frutiger Aero aesthetic! 4 4 5 5 The canonical repo for this is hosted on tangled over at [`dunkirk.sh/pipes`](https://tangled.org/@dunkirk.sh/pipes) 6 6 7 + ## Features 8 + 9 + - 🔐 **Passwordless Authentication** - Uses Indiko for OAuth 2.0 authentication with passkeys 10 + - 🌊 **Visual Pipeline Builder** - Create data flows with an intuitive drag-and-drop interface 11 + - ⚡ **Scheduled Execution** - Pipes run automatically on cron schedules 12 + - 📊 **Data Sources** - RSS/Atom feeds and HTTP/REST APIs 13 + - 🔄 **Transform Operations** - Filter, sort, limit, merge, dedupe, and extract data 14 + - 🎨 **Neo-Brutalist Design** - Bold, geometric UI matching Indiko's aesthetic 15 + - 👥 **Role-based Access** - User and admin roles powered by Indiko 16 + 17 + ## Tech Stack 18 + 19 + - **Language**: Go 1.24+ 20 + - **Database**: SQLite with direct SQL 21 + - **Auth**: [Indiko](https://github.com/taciturnaxolotl/indiko) OAuth 2.0 server 22 + - **Frontend**: Go html/template + Vanilla JavaScript 23 + - **Deployment**: Single static binary 24 + 7 25 ## Installation 8 26 9 27 1. Clone the repository: ··· 13 31 cd pipes 14 32 ``` 15 33 16 - 2. Install dependencies: 34 + 2. Build the binary: 35 + 36 + ```bash 37 + go build -o pipes . 38 + ``` 39 + 40 + 3. Initialize configuration: 17 41 18 42 ```bash 19 - bun install 43 + ./pipes init 20 44 ``` 21 45 22 - 3. Create a `.env` file: 46 + This creates a `config.yaml` file with sample configuration and a `.env.example` file for secrets. 47 + 48 + Copy the example and add your secrets: 23 49 24 50 ```bash 25 51 cp .env.example .env 52 + # Edit .env with your actual secrets 26 53 ``` 27 54 28 - Configure the following environment variables: 55 + Example `.env` file: 29 56 30 57 ```env 31 - ORIGIN=https://pipes.yourdomain.com 32 - PORT=3000 33 - NODE_ENV=production 34 - DATABASE_URL=data/pipes.db 58 + # Pipes Secrets 59 + # All other configuration is in config.yaml 60 + # Copy this file to .env and fill in the secrets 61 + 62 + # OAuth (Indiko) 63 + INDIKO_CLIENT_SECRET=your_client_secret_here 35 64 36 - # Indiko OAuth Configuration 37 - INDIKO_CLIENT_ID=ikc_xxxxxxxxxxxxxxxxxxxxx 38 - INDIKO_CLIENT_SECRET=iks_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 39 - INDIKO_ORIGIN=https://indiko.dunkirk.sh 40 - INDIKO_REDIRECT_URI=https://pipes.yourdomain.com/auth/callback 65 + # Session (generate with: openssl rand -base64 32) 66 + SESSION_SECRET=your_random_secret_here 41 67 ``` 42 68 43 - The database will be automatically created at `./data/pipes.db` on first run. 69 + The database will be automatically created at `./pipes.db` on first run. 44 70 45 71 4. Set up Indiko OAuth: 46 - - Go to your Indiko instance 47 - - Navigate to Admin → OAuth Clients 48 - - Create a new client with the redirect URI matching your `INDIKO_REDIRECT_URI` 49 - - Copy the Client ID and Secret to your `.env` file 72 + 73 + Pipes uses auto-registration with Indiko, so you can start using it immediately! The client ID is just your app's URL (`http://localhost:3001`). 74 + 75 + For production or to use role-based access control, ask your Indiko admin to pre-register your client with a client secret. 50 76 51 77 5. Start the server: 52 78 53 79 ```bash 54 - # Development (with hot reload) 55 - bun run dev 80 + ./pipes serve -c config.yaml 81 + ``` 82 + 83 + Or run without specifying a config file (uses environment variables from `.env`): 56 84 57 - # Production 58 - bun run start 85 + ```bash 86 + ./pipes serve 59 87 ``` 60 88 89 + Visit `http://localhost:3001` and sign in with your Indiko account! 90 + 91 + ## Configuration 92 + 93 + Pipes uses a two-file configuration approach (just like Herald): 94 + 95 + ### YAML Config File (config.yaml) 96 + 97 + Contains all non-sensitive configuration: 98 + 99 + ```bash 100 + ./pipes init # Creates config.yaml and .env.example 101 + ./pipes serve -c config.yaml 102 + ``` 103 + 104 + Example `config.yaml`: 105 + ```yaml 106 + # Server settings 107 + host: localhost 108 + port: 3001 109 + origin: http://localhost:3001 110 + env: development 111 + log_level: info # debug, info, warn, error, fatal 112 + 113 + # Database 114 + db_path: pipes.db 115 + 116 + # OAuth (Indiko) 117 + indiko_url: http://localhost:3000 118 + indiko_client_id: http://localhost:3001 119 + indiko_client_secret: ${INDIKO_CLIENT_SECRET} # Loaded from .env 120 + oauth_callback_url: http://localhost:3001/auth/callback 121 + 122 + # Session 123 + session_secret: ${SESSION_SECRET} # Loaded from .env 124 + session_cookie_name: pipes_session 125 + ``` 126 + 127 + ### Environment Variables (.env) 128 + 129 + Contains **only secrets** (never commit this file): 130 + 131 + ```env 132 + # OAuth (Indiko) 133 + INDIKO_CLIENT_SECRET=your_client_secret_here 134 + 135 + # Session (generate with: openssl rand -base64 32) 136 + SESSION_SECRET=your_random_secret_here 137 + ``` 138 + 139 + YAML config supports environment variable expansion using `${VAR}` syntax. Variables are loaded from `.env` file and can be overridden by system environment variables. 140 + 141 + **Configuration precedence:** Environment variables > YAML config > defaults 142 + 143 + ### Log Levels 144 + 145 + Set `LOG_LEVEL` (or `log_level` in YAML) to: 146 + - `debug` - Verbose output for troubleshooting 147 + - `info` - Standard operational messages (default) 148 + - `warn` - Warning messages 149 + - `error` - Error messages only 150 + - `fatal` - Fatal errors (exits immediately) 151 + 152 + Example structured logging output: 153 + ``` 154 + 2026/01/10 10:24:05 INFO starting pipes host=localhost port=3001 db_path=pipes.db 155 + 2026/01/10 10:24:05 INFO user authenticated name="John Doe" email="john@example.com" 156 + ``` 157 + 158 + ## Architecture 159 + 160 + Pipes follows Herald's clean architecture patterns: 161 + 162 + ``` 163 + pipes/ 164 + ├── main.go # CLI entry point 165 + ├── config/ # Configuration management 166 + ├── store/ # Database operations 167 + ├── auth/ # OAuth 2.0 client & session management 168 + ├── engine/ # Pipeline executor & scheduler 169 + ├── nodes/ # Node type definitions 170 + │ ├── sources/ # RSS, HTTP API sources 171 + │ └── transforms/ # Filter, sort, limit operations 172 + └── web/ # HTTP server & handlers 173 + └── templates/ # HTML templates 174 + ``` 175 + 176 + ## OAuth Flow 177 + 178 + 1. User clicks "Sign in with Indiko" 179 + 2. Redirect to Indiko authorization endpoint with PKCE 180 + 3. User authenticates with passkey on Indiko 181 + 4. User approves scopes (profile, email) 182 + 5. Indiko redirects back with authorization code 183 + 6. Exchange code for access + refresh tokens 184 + 7. Create/update user in local database 185 + 8. Create session with 30-day cookie 186 + 187 + ## Pipeline Execution 188 + 189 + Pipelines are executed using topological sort (Kahn's algorithm): 190 + 191 + 1. Parse pipe configuration (nodes + connections) 192 + 2. Build dependency graph 193 + 3. Execute nodes in order, passing data between them 194 + 4. Log execution progress 195 + 5. Store results in database 196 + 197 + The scheduler runs every minute, checking for pipes that need to execute based on their cron schedules. 198 + 199 + ## Available Node Types 200 + 201 + **Sources:** 202 + - RSS Feed - Fetch items from RSS/Atom feeds 203 + - HTTP API - Fetch JSON data from REST APIs (coming soon) 204 + 205 + **Transforms:** 206 + - Filter - Filter items based on field conditions 207 + - Sort - Sort items by field values 208 + - Limit - Limit the number of output items 209 + - Merge - Combine multiple data sources (coming soon) 210 + - Dedupe - Remove duplicate items (coming soon) 211 + - Extract - Transform/extract fields (coming soon) 212 + 213 + ## Development 214 + 215 + Build and run: 216 + 217 + ```bash 218 + go build -o pipes . 219 + ./pipes serve 220 + ``` 221 + 222 + The database schema is automatically created on first run. 223 + 61 224 <p align="center"> 62 225 <img src="https://raw.githubusercontent.com/taciturnaxolotl/carriage/main/.github/images/line-break.svg" /> 63 226 </p> ··· 69 232 <p align="center"> 70 233 <a href="https://tangled.org/dunkirk.sh/indiko/blob/main/LICENSE.md"><img src="https://img.shields.io/static/v1.svg?style=for-the-badge&label=License&message=O'Saasy&logoColor=d9e0ee&colorA=363a4f&colorB=b7bdf8"/></a> 71 234 </p> 235 +
+31
auth/middleware.go
··· 1 + package auth 2 + 3 + import ( 4 + "context" 5 + "net/http" 6 + 7 + "github.com/kierank/pipes/store" 8 + ) 9 + 10 + type contextKey string 11 + 12 + const userContextKey contextKey = "user" 13 + 14 + func (sm *SessionManager) RequireAuth(next http.HandlerFunc) http.HandlerFunc { 15 + return func(w http.ResponseWriter, r *http.Request) { 16 + user, err := sm.GetCurrentUser(r) 17 + if err != nil || user == nil { 18 + http.Redirect(w, r, "/auth/login", http.StatusSeeOther) 19 + return 20 + } 21 + 22 + // Add user to context 23 + ctx := context.WithValue(r.Context(), userContextKey, user) 24 + next(w, r.WithContext(ctx)) 25 + } 26 + } 27 + 28 + func GetUserFromContext(ctx context.Context) *store.User { 29 + user, _ := ctx.Value(userContextKey).(*store.User) 30 + return user 31 + }
+242
auth/oauth.go
··· 1 + package auth 2 + 3 + import ( 4 + "crypto/rand" 5 + "crypto/sha256" 6 + "encoding/base64" 7 + "encoding/json" 8 + "fmt" 9 + "io" 10 + "net/http" 11 + "net/url" 12 + "strings" 13 + "time" 14 + 15 + "github.com/kierank/pipes/config" 16 + "github.com/kierank/pipes/store" 17 + ) 18 + 19 + type OAuthClient struct { 20 + cfg *config.Config 21 + db *store.DB 22 + states map[string]*PKCEState // In-memory for MVP; use Redis in production 23 + } 24 + 25 + type PKCEState struct { 26 + CodeVerifier string 27 + RedirectURI string 28 + CreatedAt time.Time 29 + } 30 + 31 + type TokenResponse struct { 32 + AccessToken string `json:"access_token"` 33 + TokenType string `json:"token_type"` 34 + ExpiresIn int `json:"expires_in"` 35 + RefreshToken string `json:"refresh_token,omitempty"` 36 + Scope string `json:"scope,omitempty"` 37 + } 38 + 39 + type UserInfo struct { 40 + Sub string `json:"sub"` 41 + Username string `json:"username,omitempty"` 42 + Name string `json:"name,omitempty"` 43 + Email string `json:"email,omitempty"` 44 + Photo string `json:"picture,omitempty"` 45 + URL string `json:"profile,omitempty"` 46 + } 47 + 48 + func NewOAuthClient(cfg *config.Config, db *store.DB) *OAuthClient { 49 + return &OAuthClient{ 50 + cfg: cfg, 51 + db: db, 52 + states: make(map[string]*PKCEState), 53 + } 54 + } 55 + 56 + func (c *OAuthClient) GetAuthorizationURL() (string, error) { 57 + state, err := generateRandomString(32) 58 + if err != nil { 59 + return "", fmt.Errorf("generate state: %w", err) 60 + } 61 + 62 + codeVerifier, err := generateRandomString(64) 63 + if err != nil { 64 + return "", fmt.Errorf("generate code verifier: %w", err) 65 + } 66 + 67 + codeChallenge := generateCodeChallenge(codeVerifier) 68 + 69 + // Store PKCE state (in-memory for now) 70 + c.states[state] = &PKCEState{ 71 + CodeVerifier: codeVerifier, 72 + RedirectURI: c.cfg.OAuthCallbackURL, 73 + CreatedAt: time.Now(), 74 + } 75 + 76 + // Clean up old states (older than 10 minutes) 77 + go c.cleanupStates() 78 + 79 + authURL := fmt.Sprintf("%s/auth/authorize?"+ 80 + "response_type=code&"+ 81 + "client_id=%s&"+ 82 + "redirect_uri=%s&"+ 83 + "state=%s&"+ 84 + "code_challenge=%s&"+ 85 + "code_challenge_method=S256&"+ 86 + "scope=profile%%20email", 87 + c.cfg.IndikoURL, 88 + url.QueryEscape(c.cfg.IndikoClientID), 89 + url.QueryEscape(c.cfg.OAuthCallbackURL), 90 + state, 91 + codeChallenge, 92 + ) 93 + 94 + return authURL, nil 95 + } 96 + 97 + func (c *OAuthClient) HandleCallback(state, code string) (*store.User, *store.Session, error) { 98 + // Verify state 99 + pkceState, ok := c.states[state] 100 + if !ok { 101 + return nil, nil, fmt.Errorf("invalid state") 102 + } 103 + 104 + delete(c.states, state) 105 + 106 + // Exchange code for token 107 + tokenResp, err := c.exchangeCode(code, pkceState.CodeVerifier, pkceState.RedirectURI) 108 + if err != nil { 109 + return nil, nil, fmt.Errorf("exchange code: %w", err) 110 + } 111 + 112 + // Fetch user info 113 + userInfo, err := c.fetchUserInfo(tokenResp.AccessToken) 114 + if err != nil { 115 + return nil, nil, fmt.Errorf("fetch user info: %w", err) 116 + } 117 + 118 + // Create or update user 119 + user, err := c.db.GetUserByIndikoSub(userInfo.Sub) 120 + if err != nil { 121 + return nil, nil, fmt.Errorf("get user: %w", err) 122 + } 123 + 124 + if user == nil { 125 + user, err = c.db.CreateUser(userInfo.Sub, userInfo.Username, userInfo.Name, userInfo.Email, userInfo.Photo, userInfo.URL) 126 + if err != nil { 127 + return nil, nil, fmt.Errorf("create user: %w", err) 128 + } 129 + } else { 130 + // Update user info 131 + user.Username = userInfo.Username 132 + user.Name = userInfo.Name 133 + user.Email = userInfo.Email 134 + user.Photo = userInfo.Photo 135 + user.URL = userInfo.URL 136 + if err := c.db.UpdateUser(user); err != nil { 137 + return nil, nil, fmt.Errorf("update user: %w", err) 138 + } 139 + } 140 + 141 + // Create session 142 + expiresAt := time.Now().Add(30 * 24 * time.Hour).Unix() // 30 days 143 + session, err := c.db.CreateSession(user.ID, tokenResp.AccessToken, tokenResp.RefreshToken, expiresAt) 144 + if err != nil { 145 + return nil, nil, fmt.Errorf("create session: %w", err) 146 + } 147 + 148 + return user, session, nil 149 + } 150 + 151 + func (c *OAuthClient) exchangeCode(code, codeVerifier, redirectURI string) (*TokenResponse, error) { 152 + data := url.Values{} 153 + data.Set("grant_type", "authorization_code") 154 + data.Set("code", code) 155 + data.Set("redirect_uri", redirectURI) 156 + data.Set("client_id", c.cfg.IndikoClientID) 157 + data.Set("code_verifier", codeVerifier) 158 + 159 + if c.cfg.IndikoClientSecret != "" { 160 + data.Set("client_secret", c.cfg.IndikoClientSecret) 161 + } 162 + 163 + tokenURL := fmt.Sprintf("%s/auth/token", c.cfg.IndikoURL) 164 + 165 + // Create request with explicit headers 166 + req, err := http.NewRequest("POST", tokenURL, strings.NewReader(data.Encode())) 167 + if err != nil { 168 + return nil, fmt.Errorf("create request: %w", err) 169 + } 170 + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 171 + 172 + client := &http.Client{Timeout: 10 * time.Second} 173 + resp, err := client.Do(req) 174 + if err != nil { 175 + return nil, fmt.Errorf("do request: %w", err) 176 + } 177 + defer resp.Body.Close() 178 + 179 + if resp.StatusCode != http.StatusOK { 180 + body, _ := io.ReadAll(resp.Body) 181 + return nil, fmt.Errorf("token request failed (URL: %s): %s - %s", tokenURL, resp.Status, string(body)) 182 + } 183 + 184 + var tokenResp TokenResponse 185 + if err := json.NewDecoder(resp.Body).Decode(&tokenResp); err != nil { 186 + return nil, fmt.Errorf("decode response: %w", err) 187 + } 188 + 189 + return &tokenResp, nil 190 + } 191 + 192 + func (c *OAuthClient) fetchUserInfo(accessToken string) (*UserInfo, error) { 193 + userInfoURL := fmt.Sprintf("%s/userinfo", c.cfg.IndikoURL) 194 + 195 + req, err := http.NewRequest("GET", userInfoURL, nil) 196 + if err != nil { 197 + return nil, fmt.Errorf("create request: %w", err) 198 + } 199 + 200 + req.Header.Set("Authorization", "Bearer "+accessToken) 201 + 202 + client := &http.Client{Timeout: 10 * time.Second} 203 + resp, err := client.Do(req) 204 + if err != nil { 205 + return nil, fmt.Errorf("do request: %w", err) 206 + } 207 + defer resp.Body.Close() 208 + 209 + if resp.StatusCode != http.StatusOK { 210 + body, _ := io.ReadAll(resp.Body) 211 + return nil, fmt.Errorf("userinfo request failed: %s - %s", resp.Status, string(body)) 212 + } 213 + 214 + var userInfo UserInfo 215 + if err := json.NewDecoder(resp.Body).Decode(&userInfo); err != nil { 216 + return nil, fmt.Errorf("decode response: %w", err) 217 + } 218 + 219 + return &userInfo, nil 220 + } 221 + 222 + func (c *OAuthClient) cleanupStates() { 223 + cutoff := time.Now().Add(-10 * time.Minute) 224 + for state, pkceState := range c.states { 225 + if pkceState.CreatedAt.Before(cutoff) { 226 + delete(c.states, state) 227 + } 228 + } 229 + } 230 + 231 + func generateRandomString(length int) (string, error) { 232 + bytes := make([]byte, length) 233 + if _, err := rand.Read(bytes); err != nil { 234 + return "", err 235 + } 236 + return base64.RawURLEncoding.EncodeToString(bytes), nil 237 + } 238 + 239 + func generateCodeChallenge(verifier string) string { 240 + hash := sha256.Sum256([]byte(verifier)) 241 + return base64.RawURLEncoding.EncodeToString(hash[:]) 242 + }
+77
auth/session.go
··· 1 + package auth 2 + 3 + import ( 4 + "net/http" 5 + 6 + "github.com/gorilla/sessions" 7 + "github.com/kierank/pipes/config" 8 + "github.com/kierank/pipes/store" 9 + ) 10 + 11 + type SessionManager struct { 12 + store *sessions.CookieStore 13 + db *store.DB 14 + cfg *config.Config 15 + } 16 + 17 + func NewSessionManager(cfg *config.Config, db *store.DB) *SessionManager { 18 + store := sessions.NewCookieStore([]byte(cfg.SessionSecret)) 19 + store.Options = &sessions.Options{ 20 + Path: "/", 21 + MaxAge: 30 * 24 * 60 * 60, // 30 days 22 + HttpOnly: true, 23 + SameSite: http.SameSiteLaxMode, 24 + Secure: cfg.Env == "production", 25 + } 26 + 27 + return &SessionManager{ 28 + store: store, 29 + db: db, 30 + cfg: cfg, 31 + } 32 + } 33 + 34 + func (sm *SessionManager) SetSession(w http.ResponseWriter, r *http.Request, sessionID string) error { 35 + session, _ := sm.store.Get(r, sm.cfg.SessionCookieName) 36 + session.Values["session_id"] = sessionID 37 + return session.Save(r, w) 38 + } 39 + 40 + func (sm *SessionManager) GetSessionID(r *http.Request) (string, error) { 41 + session, err := sm.store.Get(r, sm.cfg.SessionCookieName) 42 + if err != nil { 43 + return "", err 44 + } 45 + 46 + sessionID, ok := session.Values["session_id"].(string) 47 + if !ok { 48 + return "", nil 49 + } 50 + 51 + return sessionID, nil 52 + } 53 + 54 + func (sm *SessionManager) ClearSession(w http.ResponseWriter, r *http.Request) error { 55 + session, _ := sm.store.Get(r, sm.cfg.SessionCookieName) 56 + session.Options.MaxAge = -1 57 + return session.Save(r, w) 58 + } 59 + 60 + func (sm *SessionManager) GetCurrentUser(r *http.Request) (*store.User, error) { 61 + sessionID, err := sm.GetSessionID(r) 62 + if err != nil || sessionID == "" { 63 + return nil, nil 64 + } 65 + 66 + session, err := sm.db.GetSessionByID(sessionID) 67 + if err != nil || session == nil { 68 + return nil, err 69 + } 70 + 71 + user, err := sm.db.GetUserByID(session.UserID) 72 + if err != nil { 73 + return nil, err 74 + } 75 + 76 + return user, nil 77 + }
+23
config.yaml.example
··· 1 + # Pipes Configuration 2 + # Secrets are loaded from .env file (see .env.example) 3 + # Environment variables override values in this file 4 + 5 + # Server settings 6 + host: localhost 7 + port: 3001 8 + origin: http://localhost:3001 9 + env: development 10 + log_level: info # debug, info, warn, error, fatal 11 + 12 + # Database 13 + db_path: pipes.db 14 + 15 + # OAuth (Indiko) 16 + indiko_url: https://indiko.example.com # Use HTTPS (HTTP redirects cause issues) 17 + indiko_client_id: http://localhost:3001 18 + indiko_client_secret: ${INDIKO_CLIENT_SECRET} # Loaded from .env 19 + oauth_callback_url: http://localhost:3001/auth/callback 20 + 21 + # Session 22 + session_secret: ${SESSION_SECRET} # Loaded from .env 23 + session_cookie_name: pipes_session
+161
config/app.go
··· 1 + package config 2 + 3 + import ( 4 + "fmt" 5 + "os" 6 + "path/filepath" 7 + "strconv" 8 + 9 + "github.com/joho/godotenv" 10 + "gopkg.in/yaml.v3" 11 + ) 12 + 13 + type Config struct { 14 + // Server 15 + Origin string `yaml:"origin"` 16 + Host string `yaml:"host"` 17 + Port int `yaml:"port"` 18 + Env string `yaml:"env"` 19 + LogLevel string `yaml:"log_level"` 20 + 21 + // Database 22 + DatabasePath string `yaml:"db_path"` 23 + 24 + // OAuth (Indiko) 25 + IndikoURL string `yaml:"indiko_url"` 26 + IndikoClientID string `yaml:"indiko_client_id"` 27 + IndikoClientSecret string `yaml:"indiko_client_secret"` 28 + OAuthCallbackURL string `yaml:"oauth_callback_url"` 29 + 30 + // Session 31 + SessionSecret string `yaml:"session_secret"` 32 + SessionCookieName string `yaml:"session_cookie_name"` 33 + } 34 + 35 + // Default returns a Config with sensible defaults 36 + func Default() *Config { 37 + return &Config{ 38 + Origin: "http://localhost:3001", 39 + Host: "localhost", 40 + Port: 3001, 41 + Env: "development", 42 + LogLevel: "info", 43 + DatabasePath: "pipes.db", 44 + IndikoURL: "http://localhost:3000", 45 + OAuthCallbackURL: "http://localhost:3001/auth/callback", 46 + SessionCookieName: "pipes_session", 47 + } 48 + } 49 + 50 + // Load loads configuration from YAML file (if provided) and environment variables 51 + func Load(path string) (*Config, error) { 52 + cfg := Default() 53 + 54 + // Load .env file if it exists (silently ignore if not found) 55 + if envPath := findEnvFile(path); envPath != "" { 56 + _ = godotenv.Load(envPath) 57 + } 58 + 59 + // Load from YAML config file if provided 60 + if path != "" { 61 + data, err := os.ReadFile(path) 62 + if err != nil { 63 + return nil, fmt.Errorf("failed to read config file: %w", err) 64 + } 65 + 66 + // Expand environment variables in YAML (e.g., ${DATABASE_PATH}) 67 + expanded := os.Expand(string(data), func(key string) string { 68 + return os.Getenv(key) 69 + }) 70 + 71 + if err := yaml.Unmarshal([]byte(expanded), cfg); err != nil { 72 + return nil, fmt.Errorf("failed to parse config file: %w", err) 73 + } 74 + } 75 + 76 + // Apply environment variable overrides 77 + applyEnvOverrides(cfg) 78 + 79 + if err := cfg.Validate(); err != nil { 80 + return nil, err 81 + } 82 + 83 + return cfg, nil 84 + } 85 + 86 + func (c *Config) Validate() error { 87 + if c.SessionSecret == "" { 88 + return fmt.Errorf("session_secret is required (set SESSION_SECRET env var)") 89 + } 90 + 91 + if c.IndikoClientID == "" { 92 + return fmt.Errorf("indiko_client_id is required (set INDIKO_CLIENT_ID env var)") 93 + } 94 + 95 + if c.IndikoURL == "" { 96 + return fmt.Errorf("indiko_url is required (set INDIKO_URL env var)") 97 + } 98 + 99 + return nil 100 + } 101 + 102 + // findEnvFile looks for .env file in the config file's directory or current directory 103 + func findEnvFile(configPath string) string { 104 + // If config path provided, look in its directory 105 + if configPath != "" { 106 + dir := filepath.Dir(configPath) 107 + envPath := filepath.Join(dir, ".env") 108 + if _, err := os.Stat(envPath); err == nil { 109 + return envPath 110 + } 111 + } 112 + 113 + // Look in current directory 114 + if _, err := os.Stat(".env"); err == nil { 115 + return ".env" 116 + } 117 + 118 + return "" 119 + } 120 + 121 + // applyEnvOverrides applies environment variable overrides to config 122 + func applyEnvOverrides(cfg *Config) { 123 + if v := os.Getenv("ORIGIN"); v != "" { 124 + cfg.Origin = v 125 + } 126 + if v := os.Getenv("HOST"); v != "" { 127 + cfg.Host = v 128 + } 129 + if v := os.Getenv("PORT"); v != "" { 130 + if port, err := strconv.Atoi(v); err == nil { 131 + cfg.Port = port 132 + } 133 + } 134 + if v := os.Getenv("NODE_ENV"); v != "" { 135 + cfg.Env = v 136 + } 137 + if v := os.Getenv("LOG_LEVEL"); v != "" { 138 + cfg.LogLevel = v 139 + } 140 + if v := os.Getenv("DATABASE_PATH"); v != "" { 141 + cfg.DatabasePath = v 142 + } 143 + if v := os.Getenv("INDIKO_URL"); v != "" { 144 + cfg.IndikoURL = v 145 + } 146 + if v := os.Getenv("INDIKO_CLIENT_ID"); v != "" { 147 + cfg.IndikoClientID = v 148 + } 149 + if v := os.Getenv("INDIKO_CLIENT_SECRET"); v != "" { 150 + cfg.IndikoClientSecret = v 151 + } 152 + if v := os.Getenv("OAUTH_CALLBACK_URL"); v != "" { 153 + cfg.OAuthCallbackURL = v 154 + } 155 + if v := os.Getenv("SESSION_SECRET"); v != "" { 156 + cfg.SessionSecret = v 157 + } 158 + if v := os.Getenv("SESSION_COOKIE_NAME"); v != "" { 159 + cfg.SessionCookieName = v 160 + } 161 + }
+218
engine/executor.go
··· 1 + package engine 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "time" 8 + 9 + "github.com/google/uuid" 10 + "github.com/kierank/pipes/nodes" 11 + "github.com/kierank/pipes/store" 12 + ) 13 + 14 + type PipeConfig struct { 15 + Version string `json:"version"` 16 + Nodes []Node `json:"nodes"` 17 + Connections []Connection `json:"connections"` 18 + Settings Settings `json:"settings"` 19 + } 20 + 21 + type Node struct { 22 + ID string `json:"id"` 23 + Type string `json:"type"` 24 + Position Position `json:"position"` 25 + Config map[string]interface{} `json:"config"` 26 + Label string `json:"label,omitempty"` 27 + } 28 + 29 + type Connection struct { 30 + ID string `json:"id"` 31 + Source string `json:"source"` 32 + Target string `json:"target"` 33 + SourceHandle string `json:"sourceHandle,omitempty"` 34 + TargetHandle string `json:"targetHandle,omitempty"` 35 + } 36 + 37 + type Position struct { 38 + X float64 `json:"x"` 39 + Y float64 `json:"y"` 40 + } 41 + 42 + type Settings struct { 43 + Schedule string `json:"schedule,omitempty"` 44 + Enabled bool `json:"enabled"` 45 + Timeout int `json:"timeout,omitempty"` 46 + RetryConfig *RetryConfig `json:"retryConfig,omitempty"` 47 + } 48 + 49 + type RetryConfig struct { 50 + MaxRetries int `json:"maxRetries"` 51 + BackoffMs int `json:"backoffMs"` 52 + } 53 + 54 + type Executor struct { 55 + db *store.DB 56 + registry *Registry 57 + } 58 + 59 + func NewExecutor(db *store.DB) *Executor { 60 + return &Executor{ 61 + db: db, 62 + registry: NewRegistry(), 63 + } 64 + } 65 + 66 + func (e *Executor) Execute(ctx context.Context, pipeID string, triggerType string) (string, error) { 67 + executionID := uuid.New().String() 68 + startedAt := time.Now().Unix() 69 + 70 + // Create execution record 71 + if err := e.db.CreateExecution(executionID, pipeID, triggerType, startedAt); err != nil { 72 + return "", fmt.Errorf("create execution: %w", err) 73 + } 74 + 75 + // Fetch pipe configuration 76 + pipe, err := e.db.GetPipe(pipeID) 77 + if err != nil { 78 + return "", fmt.Errorf("get pipe: %w", err) 79 + } 80 + 81 + if pipe == nil { 82 + return "", fmt.Errorf("pipe not found: %s", pipeID) 83 + } 84 + 85 + var config PipeConfig 86 + if err := json.Unmarshal([]byte(pipe.Config), &config); err != nil { 87 + return "", fmt.Errorf("parse config: %w", err) 88 + } 89 + 90 + // Execute pipeline 91 + itemCount, err := e.executePipeline(ctx, executionID, pipeID, &config) 92 + 93 + completedAt := time.Now().Unix() 94 + durationMs := (completedAt - startedAt) * 1000 95 + 96 + if err != nil { 97 + e.db.UpdateExecutionFailed(executionID, completedAt, durationMs, err.Error()) 98 + return executionID, err 99 + } 100 + 101 + e.db.UpdateExecutionSuccess(executionID, completedAt, durationMs, itemCount) 102 + return executionID, nil 103 + } 104 + 105 + func (e *Executor) executePipeline(ctx context.Context, executionID, pipeID string, config *PipeConfig) (int, error) { 106 + // Topological sort to determine execution order 107 + order, err := topologicalSort(config.Nodes, config.Connections) 108 + if err != nil { 109 + return 0, fmt.Errorf("topological sort: %w", err) 110 + } 111 + 112 + nodeResults := make(map[string][]interface{}) 113 + execCtx := nodes.NewContext(executionID, pipeID, e.db) 114 + 115 + for _, nodeID := range order { 116 + node := findNode(config.Nodes, nodeID) 117 + if node == nil { 118 + continue 119 + } 120 + 121 + // Get node implementation 122 + nodeImpl, err := e.registry.Get(node.Type) 123 + if err != nil { 124 + return 0, fmt.Errorf("get node type %s: %w", node.Type, err) 125 + } 126 + 127 + // Gather inputs from connected nodes 128 + inputs := e.gatherInputs(nodeID, config.Connections, nodeResults) 129 + 130 + // Execute node 131 + output, err := nodeImpl.Execute(ctx, node.Config, inputs, execCtx) 132 + if err != nil { 133 + e.db.LogExecution(executionID, nodeID, "error", fmt.Sprintf("Execution failed: %v", err)) 134 + return 0, fmt.Errorf("node %s (%s): %w", nodeID, node.Type, err) 135 + } 136 + 137 + nodeResults[nodeID] = output 138 + e.db.LogExecution(executionID, nodeID, "info", fmt.Sprintf("Processed %d items", len(output))) 139 + } 140 + 141 + // Return item count from last node 142 + if len(order) == 0 { 143 + return 0, nil 144 + } 145 + 146 + lastNodeID := order[len(order)-1] 147 + finalOutput := nodeResults[lastNodeID] 148 + return len(finalOutput), nil 149 + } 150 + 151 + func (e *Executor) gatherInputs(nodeID string, connections []Connection, nodeResults map[string][]interface{}) [][]interface{} { 152 + var inputs [][]interface{} 153 + 154 + for _, conn := range connections { 155 + if conn.Target == nodeID { 156 + if result, ok := nodeResults[conn.Source]; ok { 157 + inputs = append(inputs, result) 158 + } 159 + } 160 + } 161 + 162 + return inputs 163 + } 164 + 165 + func topologicalSort(nodes []Node, connections []Connection) ([]string, error) { 166 + // Kahn's algorithm for topological sorting 167 + inDegree := make(map[string]int) 168 + adjacency := make(map[string][]string) 169 + 170 + // Initialize 171 + for _, n := range nodes { 172 + inDegree[n.ID] = 0 173 + adjacency[n.ID] = []string{} 174 + } 175 + 176 + // Build graph 177 + for _, c := range connections { 178 + adjacency[c.Source] = append(adjacency[c.Source], c.Target) 179 + inDegree[c.Target]++ 180 + } 181 + 182 + // Find sources (nodes with no incoming edges) 183 + queue := []string{} 184 + for id, degree := range inDegree { 185 + if degree == 0 { 186 + queue = append(queue, id) 187 + } 188 + } 189 + 190 + sorted := []string{} 191 + for len(queue) > 0 { 192 + node := queue[0] 193 + queue = queue[1:] 194 + sorted = append(sorted, node) 195 + 196 + for _, neighbor := range adjacency[node] { 197 + inDegree[neighbor]-- 198 + if inDegree[neighbor] == 0 { 199 + queue = append(queue, neighbor) 200 + } 201 + } 202 + } 203 + 204 + if len(sorted) != len(nodes) { 205 + return nil, fmt.Errorf("pipeline contains a cycle") 206 + } 207 + 208 + return sorted, nil 209 + } 210 + 211 + func findNode(nodes []Node, id string) *Node { 212 + for i := range nodes { 213 + if nodes[i].ID == id { 214 + return &nodes[i] 215 + } 216 + } 217 + return nil 218 + }
+59
engine/registry.go
··· 1 + package engine 2 + 3 + import ( 4 + "fmt" 5 + "sync" 6 + 7 + "github.com/kierank/pipes/nodes" 8 + "github.com/kierank/pipes/nodes/sources" 9 + "github.com/kierank/pipes/nodes/transforms" 10 + ) 11 + 12 + type Registry struct { 13 + mu sync.RWMutex 14 + nodeImpls map[string]nodes.Node 15 + } 16 + 17 + func NewRegistry() *Registry { 18 + r := &Registry{ 19 + nodeImpls: make(map[string]nodes.Node), 20 + } 21 + 22 + // Register built-in nodes 23 + r.Register(&sources.RSSSourceNode{}) 24 + r.Register(&transforms.FilterNode{}) 25 + r.Register(&transforms.SortNode{}) 26 + r.Register(&transforms.LimitNode{}) 27 + 28 + return r 29 + } 30 + 31 + func (r *Registry) Register(node nodes.Node) { 32 + r.mu.Lock() 33 + defer r.mu.Unlock() 34 + r.nodeImpls[node.Type()] = node 35 + } 36 + 37 + func (r *Registry) Get(nodeType string) (nodes.Node, error) { 38 + r.mu.RLock() 39 + defer r.mu.RUnlock() 40 + 41 + node, ok := r.nodeImpls[nodeType] 42 + if !ok { 43 + return nil, fmt.Errorf("unknown node type: %s", nodeType) 44 + } 45 + 46 + return node, nil 47 + } 48 + 49 + func (r *Registry) GetAll() []nodes.Node { 50 + r.mu.RLock() 51 + defer r.mu.RUnlock() 52 + 53 + nodeList := make([]nodes.Node, 0, len(r.nodeImpls)) 54 + for _, node := range r.nodeImpls { 55 + nodeList = append(nodeList, node) 56 + } 57 + 58 + return nodeList 59 + }
+93
engine/scheduler.go
··· 1 + package engine 2 + 3 + import ( 4 + "context" 5 + "time" 6 + 7 + "github.com/charmbracelet/log" 8 + "github.com/kierank/pipes/store" 9 + ) 10 + 11 + type Scheduler struct { 12 + db *store.DB 13 + executor *Executor 14 + ticker *time.Ticker 15 + done chan struct{} 16 + logger *log.Logger 17 + } 18 + 19 + func NewScheduler(db *store.DB, logger *log.Logger) *Scheduler { 20 + return &Scheduler{ 21 + db: db, 22 + executor: NewExecutor(db), 23 + done: make(chan struct{}), 24 + logger: logger, 25 + } 26 + } 27 + 28 + func (s *Scheduler) Start() { 29 + s.logger.Info("scheduler starting") 30 + 31 + s.ticker = time.NewTicker(1 * time.Minute) 32 + 33 + // Run immediately on start 34 + go s.tick() 35 + 36 + // Then run every minute 37 + go func() { 38 + for { 39 + select { 40 + case <-s.ticker.C: 41 + s.tick() 42 + case <-s.done: 43 + return 44 + } 45 + } 46 + }() 47 + } 48 + 49 + func (s *Scheduler) tick() { 50 + ctx := context.Background() 51 + now := time.Now().Unix() 52 + 53 + jobs, err := s.db.GetDueJobs(now) 54 + if err != nil { 55 + s.logger.Error("error fetching jobs", "error", err) 56 + return 57 + } 58 + 59 + if len(jobs) > 0 { 60 + s.logger.Info("found jobs to execute", "count", len(jobs)) 61 + } 62 + 63 + for _, job := range jobs { 64 + if err := s.executeJob(ctx, job); err != nil { 65 + s.logger.Error("job execution failed", "job_id", job.ID, "error", err) 66 + } 67 + } 68 + } 69 + 70 + func (s *Scheduler) executeJob(ctx context.Context, job *store.ScheduledJob) error { 71 + // Execute pipeline 72 + _, err := s.executor.Execute(ctx, job.PipeID, "scheduled") 73 + if err != nil { 74 + s.logger.Error("pipeline execution failed", "pipe_id", job.PipeID, "error", err) 75 + } 76 + 77 + // Calculate next run time (simplified: add 1 hour for now) 78 + // In production, use a proper cron parser 79 + nextRun := time.Now().Add(1 * time.Hour).Unix() 80 + 81 + // Update job 82 + now := time.Now().Unix() 83 + return s.db.UpdateJobAfterRun(job.ID, now, nextRun) 84 + } 85 + 86 + func (s *Scheduler) Stop() { 87 + s.logger.Info("scheduler stopping") 88 + if s.ticker != nil { 89 + s.ticker.Stop() 90 + } 91 + close(s.done) 92 + s.logger.Info("scheduler stopped") 93 + }
+40
go.mod
··· 1 + module github.com/kierank/pipes 2 + 3 + go 1.24 4 + 5 + require ( 6 + github.com/google/uuid v1.6.0 7 + github.com/gorilla/sessions v1.4.0 8 + github.com/mattn/go-sqlite3 v1.14.24 9 + github.com/mmcdole/gofeed v1.3.0 10 + ) 11 + 12 + require ( 13 + github.com/PuerkitoBio/goquery v1.8.0 // indirect 14 + github.com/andybalholm/cascadia v1.3.1 // indirect 15 + github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect 16 + github.com/charmbracelet/colorprofile v0.2.3-0.20250311203215-f60798e515dc // indirect 17 + github.com/charmbracelet/lipgloss v1.1.0 // indirect 18 + github.com/charmbracelet/log v0.4.2 // indirect 19 + github.com/charmbracelet/x/ansi v0.8.0 // indirect 20 + github.com/charmbracelet/x/cellbuf v0.0.13-0.20250311204145-2c3ea96c31dd // indirect 21 + github.com/charmbracelet/x/term v0.2.1 // indirect 22 + github.com/go-logfmt/logfmt v0.6.0 // indirect 23 + github.com/gorilla/securecookie v1.1.2 // indirect 24 + github.com/joho/godotenv v1.5.1 // indirect 25 + github.com/json-iterator/go v1.1.12 // indirect 26 + github.com/lucasb-eyer/go-colorful v1.2.0 // indirect 27 + github.com/mattn/go-isatty v0.0.20 // indirect 28 + github.com/mattn/go-runewidth v0.0.16 // indirect 29 + github.com/mmcdole/goxpp v1.1.1-0.20240225020742-a0c311522b23 // indirect 30 + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect 31 + github.com/modern-go/reflect2 v1.0.2 // indirect 32 + github.com/muesli/termenv v0.16.0 // indirect 33 + github.com/rivo/uniseg v0.4.7 // indirect 34 + github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect 35 + golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect 36 + golang.org/x/net v0.4.0 // indirect 37 + golang.org/x/sys v0.30.0 // indirect 38 + golang.org/x/text v0.5.0 // indirect 39 + gopkg.in/yaml.v3 v3.0.1 // indirect 40 + )
+84
go.sum
··· 1 + github.com/PuerkitoBio/goquery v1.8.0 h1:PJTF7AmFCFKk1N6V6jmKfrNH9tV5pNE6lZMkG0gta/U= 2 + github.com/PuerkitoBio/goquery v1.8.0/go.mod h1:ypIiRMtY7COPGk+I/YbZLbxsxn9g5ejnI2HSMtkjZvI= 3 + github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c= 4 + github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA= 5 + github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= 6 + github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= 7 + github.com/charmbracelet/colorprofile v0.2.3-0.20250311203215-f60798e515dc h1:4pZI35227imm7yK2bGPcfpFEmuY1gc2YSTShr4iJBfs= 8 + github.com/charmbracelet/colorprofile v0.2.3-0.20250311203215-f60798e515dc/go.mod h1:X4/0JoqgTIPSFcRA/P6INZzIuyqdFY5rm8tb41s9okk= 9 + github.com/charmbracelet/lipgloss v1.1.0 h1:vYXsiLHVkK7fp74RkV7b2kq9+zDLoEU4MZoFqR/noCY= 10 + github.com/charmbracelet/lipgloss v1.1.0/go.mod h1:/6Q8FR2o+kj8rz4Dq0zQc3vYf7X+B0binUUBwA0aL30= 11 + github.com/charmbracelet/log v0.4.2 h1:hYt8Qj6a8yLnvR+h7MwsJv/XvmBJXiueUcI3cIxsyig= 12 + github.com/charmbracelet/log v0.4.2/go.mod h1:qifHGX/tc7eluv2R6pWIpyHDDrrb/AG71Pf2ysQu5nw= 13 + github.com/charmbracelet/x/ansi v0.8.0 h1:9GTq3xq9caJW8ZrBTe0LIe2fvfLR/bYXKTx2llXn7xE= 14 + github.com/charmbracelet/x/ansi v0.8.0/go.mod h1:wdYl/ONOLHLIVmQaxbIYEC/cRKOQyjTkowiI4blgS9Q= 15 + github.com/charmbracelet/x/cellbuf v0.0.13-0.20250311204145-2c3ea96c31dd h1:vy0GVL4jeHEwG5YOXDmi86oYw2yuYUGqz6a8sLwg0X8= 16 + github.com/charmbracelet/x/cellbuf v0.0.13-0.20250311204145-2c3ea96c31dd/go.mod h1:xe0nKWGd3eJgtqZRaN9RjMtK7xUYchjzPr7q6kcvCCs= 17 + github.com/charmbracelet/x/term v0.2.1 h1:AQeHeLZ1OqSXhrAWpYUtZyX1T3zVxfpZuEQMIQaGIAQ= 18 + github.com/charmbracelet/x/term v0.2.1/go.mod h1:oQ4enTYFV7QN4m0i9mzHrViD7TQKvNEEkHUMCmsxdUg= 19 + github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 20 + github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= 21 + github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 22 + github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= 23 + github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= 24 + github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= 25 + github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= 26 + github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= 27 + github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= 28 + github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= 29 + github.com/gorilla/securecookie v1.1.2 h1:YCIWL56dvtr73r6715mJs5ZvhtnY73hBvEF8kXD8ePA= 30 + github.com/gorilla/securecookie v1.1.2/go.mod h1:NfCASbcHqRSY+3a8tlWJwsQap2VX5pwzwo4h3eOamfo= 31 + github.com/gorilla/sessions v1.4.0 h1:kpIYOp/oi6MG/p5PgxApU8srsSw9tuFbt46Lt7auzqQ= 32 + github.com/gorilla/sessions v1.4.0/go.mod h1:FLWm50oby91+hl7p/wRxDth9bWSuk0qVL2emc7lT5ik= 33 + github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= 34 + github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= 35 + github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= 36 + github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= 37 + github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= 38 + github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= 39 + github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= 40 + github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= 41 + github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= 42 + github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= 43 + github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM= 44 + github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= 45 + github.com/mmcdole/gofeed v1.3.0 h1:5yn+HeqlcvjMeAI4gu6T+crm7d0anY85+M+v6fIFNG4= 46 + github.com/mmcdole/gofeed v1.3.0/go.mod h1:9TGv2LcJhdXePDzxiuMnukhV2/zb6VtnZt1mS+SjkLE= 47 + github.com/mmcdole/goxpp v1.1.1-0.20240225020742-a0c311522b23 h1:Zr92CAlFhy2gL+V1F+EyIuzbQNbSgP4xhTODZtrXUtk= 48 + github.com/mmcdole/goxpp v1.1.1-0.20240225020742-a0c311522b23/go.mod h1:v+25+lT2ViuQ7mVxcncQ8ch1URund48oH+jhjiwEgS8= 49 + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= 50 + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= 51 + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= 52 + github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= 53 + github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= 54 + github.com/muesli/termenv v0.16.0 h1:S5AlUN9dENB57rsbnkPyfdGuWIlkmzJjbFf0Tf5FWUc= 55 + github.com/muesli/termenv v0.16.0/go.mod h1:ZRfOIKPFDYQoDFF4Olj7/QJbW60Ol/kL1pU3VfY/Cnk= 56 + github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= 57 + github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 58 + github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= 59 + github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= 60 + github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= 61 + github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= 62 + github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= 63 + github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= 64 + github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= 65 + github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= 66 + github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= 67 + golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= 68 + golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= 69 + golang.org/x/net v0.0.0-20210916014120-12bc252f5db8/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= 70 + golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU= 71 + golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= 72 + golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= 73 + golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= 74 + golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 75 + golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= 76 + golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= 77 + golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= 78 + golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= 79 + golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= 80 + golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= 81 + golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= 82 + gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 83 + gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= 84 + gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+243
main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "crypto/rand" 6 + "encoding/base64" 7 + "errors" 8 + "fmt" 9 + "net/http" 10 + "os" 11 + "os/signal" 12 + "strings" 13 + "syscall" 14 + "time" 15 + 16 + "github.com/charmbracelet/log" 17 + "github.com/kierank/pipes/config" 18 + "github.com/kierank/pipes/engine" 19 + "github.com/kierank/pipes/store" 20 + "github.com/kierank/pipes/web" 21 + ) 22 + 23 + var ( 24 + version = "dev" 25 + commitHash = "dev" 26 + logger *log.Logger 27 + ) 28 + 29 + func main() { 30 + // Initialize logger with default level 31 + logger = log.NewWithOptions(os.Stderr, log.Options{ 32 + ReportTimestamp: true, 33 + Level: log.InfoLevel, 34 + }) 35 + 36 + if len(os.Args) < 2 { 37 + printUsage() 38 + os.Exit(1) 39 + } 40 + 41 + command := os.Args[1] 42 + 43 + switch command { 44 + case "serve": 45 + configPath := "" 46 + // Check for -c or --config flag 47 + for i := 2; i < len(os.Args); i++ { 48 + if (os.Args[i] == "-c" || os.Args[i] == "--config") && i+1 < len(os.Args) { 49 + configPath = os.Args[i+1] 50 + break 51 + } 52 + } 53 + serve(configPath) 54 + case "init": 55 + initConfig() 56 + case "help", "--help", "-h": 57 + printUsage() 58 + case "version", "--version", "-v": 59 + fmt.Printf("pipes %s (%s)\n", version, commitHash) 60 + default: 61 + fmt.Printf("Unknown command: %s\n\n", command) 62 + printUsage() 63 + os.Exit(1) 64 + } 65 + } 66 + 67 + func printUsage() { 68 + fmt.Println("Pipes - Visual data pipeline builder") 69 + fmt.Println() 70 + fmt.Println("Usage:") 71 + fmt.Println(" pipes <command> [flags]") 72 + fmt.Println() 73 + fmt.Println("Commands:") 74 + fmt.Println(" serve Start the server") 75 + fmt.Println(" init [path] Create a sample config file (default: config.yaml)") 76 + fmt.Println(" version Show version information") 77 + fmt.Println(" help Show this help message") 78 + fmt.Println() 79 + fmt.Println("Serve Flags:") 80 + fmt.Println(" -c, --config PATH Path to config file (optional, uses .env if not specified)") 81 + fmt.Println() 82 + fmt.Println("Examples:") 83 + fmt.Println(" pipes init") 84 + fmt.Println(" pipes serve -c config.yaml") 85 + fmt.Println(" pipes serve # Uses .env file") 86 + fmt.Println() 87 + } 88 + 89 + func serve(configPath string) { 90 + // Load configuration 91 + cfg, err := config.Load(configPath) 92 + if err != nil { 93 + logger.Fatal("failed to load config", "error", err) 94 + } 95 + 96 + // Set log level from config 97 + level := parseLogLevel(cfg.LogLevel) 98 + logger.SetLevel(level) 99 + 100 + logger.Info("starting pipes", 101 + "host", cfg.Host, 102 + "port", cfg.Port, 103 + "db_path", cfg.DatabasePath, 104 + "log_level", cfg.LogLevel, 105 + ) 106 + 107 + // Initialize database 108 + db, err := store.New(cfg.DatabasePath) 109 + if err != nil { 110 + logger.Fatal("failed to initialize database", "error", err) 111 + } 112 + defer db.Close() 113 + 114 + logger.Info("database initialized successfully") 115 + 116 + // Initialize scheduler 117 + scheduler := engine.NewScheduler(db, logger) 118 + scheduler.Start() 119 + defer scheduler.Stop() 120 + 121 + logger.Info("scheduler started") 122 + 123 + // Initialize web server 124 + server := web.NewServer(cfg, db, logger) 125 + 126 + // Start server in goroutine 127 + serverErr := make(chan error, 1) 128 + go func() { 129 + logger.Info("starting server", "address", fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)) 130 + if err := server.Start(); err != nil && !errors.Is(err, http.ErrServerClosed) { 131 + serverErr <- err 132 + } 133 + }() 134 + 135 + // Wait for interrupt signal or server error 136 + sigChan := make(chan os.Signal, 1) 137 + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) 138 + 139 + select { 140 + case <-sigChan: 141 + logger.Info("shutting down gracefully...") 142 + case err := <-serverErr: 143 + logger.Fatal("server error", "error", err) 144 + } 145 + 146 + // Graceful shutdown 147 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 148 + defer cancel() 149 + 150 + if err := server.Shutdown(ctx); err != nil { 151 + logger.Error("server shutdown error", "error", err) 152 + } 153 + 154 + logger.Info("shutdown complete") 155 + } 156 + 157 + func initConfig() { 158 + configPath := "config.yaml" 159 + if len(os.Args) > 2 { 160 + configPath = os.Args[2] 161 + } 162 + 163 + if _, err := os.Stat(configPath); err == nil { 164 + fmt.Printf("Config file already exists at %s\n", configPath) 165 + fmt.Println("Remove it first or specify a different path:") 166 + fmt.Printf(" pipes init %s.new\n", configPath) 167 + os.Exit(1) 168 + } 169 + 170 + secret, err := generateSecret() 171 + if err != nil { 172 + logger.Fatal("failed to generate secret", "error", err) 173 + } 174 + 175 + configContent := `# Pipes Configuration 176 + # See https://github.com/yourusername/pipes for documentation 177 + 178 + # Server settings 179 + host: localhost 180 + port: 3001 181 + origin: http://localhost:3001 182 + env: development 183 + log_level: info # debug, info, warn, error, fatal 184 + 185 + # Database 186 + db_path: pipes.db 187 + 188 + # OAuth (Indiko) 189 + # Set these environment variables or replace with actual values: 190 + indiko_url: ${INDIKO_URL} 191 + indiko_client_id: ${INDIKO_CLIENT_ID} 192 + indiko_client_secret: ${INDIKO_CLIENT_SECRET} 193 + oauth_callback_url: http://localhost:3001/auth/callback 194 + 195 + # Session 196 + session_secret: ` + secret + ` 197 + session_cookie_name: pipes_session 198 + ` 199 + 200 + if err := os.WriteFile(configPath, []byte(configContent), 0644); err != nil { 201 + logger.Fatal("failed to write config", "error", err) 202 + } 203 + 204 + fmt.Printf("✓ Config file created at %s\n\n", configPath) 205 + fmt.Println("Next steps:") 206 + fmt.Println(" 1. Set your Indiko OAuth environment variables:") 207 + fmt.Println(" export INDIKO_URL=http://localhost:3000") 208 + fmt.Println(" export INDIKO_CLIENT_ID=http://localhost:3001") 209 + fmt.Println() 210 + fmt.Println(" 2. Or edit the config file directly to replace ${VAR} placeholders") 211 + fmt.Println() 212 + fmt.Println(" 3. Start the server:") 213 + fmt.Printf(" pipes serve -c %s\n", configPath) 214 + fmt.Println() 215 + fmt.Println(" Or use environment variables with a .env file instead:") 216 + fmt.Println(" cp .env.example .env") 217 + fmt.Println(" pipes serve") 218 + } 219 + 220 + func generateSecret() (string, error) { 221 + bytes := make([]byte, 32) 222 + if _, err := rand.Read(bytes); err != nil { 223 + return "", err 224 + } 225 + return base64.URLEncoding.EncodeToString(bytes), nil 226 + } 227 + 228 + func parseLogLevel(levelStr string) log.Level { 229 + switch strings.ToLower(levelStr) { 230 + case "debug": 231 + return log.DebugLevel 232 + case "info": 233 + return log.InfoLevel 234 + case "warn", "warning": 235 + return log.WarnLevel 236 + case "error": 237 + return log.ErrorLevel 238 + case "fatal": 239 + return log.FatalLevel 240 + default: 241 + return log.InfoLevel 242 + } 243 + }
+61
nodes/node.go
··· 1 + package nodes 2 + 3 + import ( 4 + "context" 5 + 6 + "github.com/kierank/pipes/store" 7 + ) 8 + 9 + type Node interface { 10 + Type() string 11 + Label() string 12 + Description() string 13 + Category() string // source|transform|output 14 + 15 + Inputs() int 16 + Outputs() int 17 + 18 + Execute(ctx context.Context, config map[string]interface{}, inputs [][]interface{}, execCtx *Context) ([]interface{}, error) 19 + 20 + ValidateConfig(config map[string]interface{}) error 21 + 22 + GetConfigSchema() *ConfigSchema 23 + } 24 + 25 + type ConfigSchema struct { 26 + Fields []ConfigField `json:"fields"` 27 + } 28 + 29 + type ConfigField struct { 30 + Name string `json:"name"` 31 + Label string `json:"label"` 32 + Type string `json:"type"` // text|url|number|select|textarea|checkbox 33 + Required bool `json:"required,omitempty"` 34 + DefaultValue interface{} `json:"defaultValue,omitempty"` 35 + Options []FieldOption `json:"options,omitempty"` 36 + Placeholder string `json:"placeholder,omitempty"` 37 + HelpText string `json:"helpText,omitempty"` 38 + } 39 + 40 + type FieldOption struct { 41 + Value string `json:"value"` 42 + Label string `json:"label"` 43 + } 44 + 45 + type Context struct { 46 + ExecutionID string 47 + PipeID string 48 + DB *store.DB 49 + } 50 + 51 + func NewContext(executionID, pipeID string, db *store.DB) *Context { 52 + return &Context{ 53 + ExecutionID: executionID, 54 + PipeID: pipeID, 55 + DB: db, 56 + } 57 + } 58 + 59 + func (c *Context) Log(nodeID, level, message string) { 60 + c.DB.LogExecution(c.ExecutionID, nodeID, level, message) 61 + }
+93
nodes/sources/rss.go
··· 1 + package sources 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + 7 + "github.com/mmcdole/gofeed" 8 + 9 + "github.com/kierank/pipes/nodes" 10 + ) 11 + 12 + type RSSSourceNode struct{} 13 + 14 + func (n *RSSSourceNode) Type() string { return "rss-source" } 15 + func (n *RSSSourceNode) Label() string { return "RSS Feed" } 16 + func (n *RSSSourceNode) Description() string { return "Fetch items from an RSS or Atom feed" } 17 + func (n *RSSSourceNode) Category() string { return "source" } 18 + func (n *RSSSourceNode) Inputs() int { return 0 } 19 + func (n *RSSSourceNode) Outputs() int { return 1 } 20 + 21 + func (n *RSSSourceNode) Execute(ctx context.Context, config map[string]interface{}, inputs [][]interface{}, execCtx *nodes.Context) ([]interface{}, error) { 22 + url, ok := config["url"].(string) 23 + if !ok || url == "" { 24 + return nil, fmt.Errorf("url is required") 25 + } 26 + 27 + execCtx.Log("rss-source", "info", fmt.Sprintf("Fetching %s", url)) 28 + 29 + // Parse feed 30 + fp := gofeed.NewParser() 31 + feed, err := fp.ParseURLWithContext(url, ctx) 32 + if err != nil { 33 + return nil, fmt.Errorf("parse feed: %w", err) 34 + } 35 + 36 + // Convert feed items to generic interface{} slices 37 + var items []interface{} 38 + for _, item := range feed.Items { 39 + items = append(items, map[string]interface{}{ 40 + "title": item.Title, 41 + "description": item.Description, 42 + "link": item.Link, 43 + "author": item.Author, 44 + "published": item.Published, 45 + "updated": item.Updated, 46 + "guid": item.GUID, 47 + "categories": item.Categories, 48 + }) 49 + } 50 + 51 + // Apply limit if specified 52 + if limit, ok := config["limit"].(float64); ok && limit > 0 { 53 + if int(limit) < len(items) { 54 + items = items[:int(limit)] 55 + } 56 + } 57 + 58 + execCtx.Log("rss-source", "info", fmt.Sprintf("Retrieved %d items", len(items))) 59 + 60 + return items, nil 61 + } 62 + 63 + func (n *RSSSourceNode) ValidateConfig(config map[string]interface{}) error { 64 + url, ok := config["url"].(string) 65 + if !ok || url == "" { 66 + return fmt.Errorf("url is required") 67 + } 68 + 69 + return nil 70 + } 71 + 72 + func (n *RSSSourceNode) GetConfigSchema() *nodes.ConfigSchema { 73 + return &nodes.ConfigSchema{ 74 + Fields: []nodes.ConfigField{ 75 + { 76 + Name: "url", 77 + Label: "Feed URL", 78 + Type: "url", 79 + Required: true, 80 + Placeholder: "https://example.com/feed.xml", 81 + HelpText: "URL of the RSS or Atom feed", 82 + }, 83 + { 84 + Name: "limit", 85 + Label: "Item Limit", 86 + Type: "number", 87 + Required: false, 88 + DefaultValue: 50, 89 + HelpText: "Maximum number of items to fetch", 90 + }, 91 + }, 92 + } 93 + }
+123
nodes/transforms/filter.go
··· 1 + package transforms 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "regexp" 7 + "strings" 8 + 9 + "github.com/kierank/pipes/nodes" 10 + ) 11 + 12 + type FilterNode struct{} 13 + 14 + func (n *FilterNode) Type() string { return "filter" } 15 + func (n *FilterNode) Label() string { return "Filter" } 16 + func (n *FilterNode) Description() string { return "Filter items based on conditions" } 17 + func (n *FilterNode) Category() string { return "transform" } 18 + func (n *FilterNode) Inputs() int { return 1 } 19 + func (n *FilterNode) Outputs() int { return 1 } 20 + 21 + func (n *FilterNode) Execute(ctx context.Context, config map[string]interface{}, inputs [][]interface{}, execCtx *nodes.Context) ([]interface{}, error) { 22 + if len(inputs) == 0 { 23 + return []interface{}{}, nil 24 + } 25 + 26 + items := inputs[0] 27 + 28 + field, _ := config["field"].(string) 29 + operator, _ := config["operator"].(string) 30 + value, _ := config["value"].(string) 31 + 32 + if field == "" || operator == "" { 33 + return items, nil 34 + } 35 + 36 + var filtered []interface{} 37 + for _, item := range items { 38 + if matchesFilter(item, field, operator, value) { 39 + filtered = append(filtered, item) 40 + } 41 + } 42 + 43 + execCtx.Log("filter", "info", fmt.Sprintf("Filtered %d -> %d items", len(items), len(filtered))) 44 + 45 + return filtered, nil 46 + } 47 + 48 + func matchesFilter(item interface{}, field, operator, value string) bool { 49 + itemMap, ok := item.(map[string]interface{}) 50 + if !ok { 51 + return false 52 + } 53 + 54 + fieldValue := getNestedValue(itemMap, field) 55 + fieldStr := fmt.Sprintf("%v", fieldValue) 56 + 57 + switch operator { 58 + case "contains": 59 + return strings.Contains(strings.ToLower(fieldStr), strings.ToLower(value)) 60 + case "equals": 61 + return fieldStr == value 62 + case "not-equals": 63 + return fieldStr != value 64 + case "regex": 65 + matched, _ := regexp.MatchString(value, fieldStr) 66 + return matched 67 + default: 68 + return true 69 + } 70 + } 71 + 72 + func getNestedValue(obj map[string]interface{}, path string) interface{} { 73 + parts := strings.Split(path, ".") 74 + var current interface{} = obj 75 + 76 + for _, part := range parts { 77 + if m, ok := current.(map[string]interface{}); ok { 78 + current = m[part] 79 + } else { 80 + return nil 81 + } 82 + } 83 + 84 + return current 85 + } 86 + 87 + func (n *FilterNode) ValidateConfig(config map[string]interface{}) error { 88 + return nil 89 + } 90 + 91 + func (n *FilterNode) GetConfigSchema() *nodes.ConfigSchema { 92 + return &nodes.ConfigSchema{ 93 + Fields: []nodes.ConfigField{ 94 + { 95 + Name: "field", 96 + Label: "Field Path", 97 + Type: "text", 98 + Required: true, 99 + Placeholder: "title", 100 + HelpText: "Field to filter on (use dot notation for nested: author.name)", 101 + }, 102 + { 103 + Name: "operator", 104 + Label: "Operator", 105 + Type: "select", 106 + Required: true, 107 + Options: []nodes.FieldOption{ 108 + {Value: "contains", Label: "Contains"}, 109 + {Value: "equals", Label: "Equals"}, 110 + {Value: "not-equals", Label: "Not Equals"}, 111 + {Value: "regex", Label: "Regex Match"}, 112 + }, 113 + }, 114 + { 115 + Name: "value", 116 + Label: "Value", 117 + Type: "text", 118 + Required: true, 119 + Placeholder: "search term", 120 + }, 121 + }, 122 + } 123 + }
+54
nodes/transforms/limit.go
··· 1 + package transforms 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + 7 + "github.com/kierank/pipes/nodes" 8 + ) 9 + 10 + type LimitNode struct{} 11 + 12 + func (n *LimitNode) Type() string { return "limit" } 13 + func (n *LimitNode) Label() string { return "Limit" } 14 + func (n *LimitNode) Description() string { return "Limit the number of items" } 15 + func (n *LimitNode) Category() string { return "transform" } 16 + func (n *LimitNode) Inputs() int { return 1 } 17 + func (n *LimitNode) Outputs() int { return 1 } 18 + 19 + func (n *LimitNode) Execute(ctx context.Context, config map[string]interface{}, inputs [][]interface{}, execCtx *nodes.Context) ([]interface{}, error) { 20 + if len(inputs) == 0 { 21 + return []interface{}{}, nil 22 + } 23 + 24 + items := inputs[0] 25 + count, _ := config["count"].(float64) 26 + 27 + if count <= 0 || int(count) >= len(items) { 28 + return items, nil 29 + } 30 + 31 + limited := items[:int(count)] 32 + execCtx.Log("limit", "info", fmt.Sprintf("Limited %d -> %d items", len(items), len(limited))) 33 + 34 + return limited, nil 35 + } 36 + 37 + func (n *LimitNode) ValidateConfig(config map[string]interface{}) error { 38 + return nil 39 + } 40 + 41 + func (n *LimitNode) GetConfigSchema() *nodes.ConfigSchema { 42 + return &nodes.ConfigSchema{ 43 + Fields: []nodes.ConfigField{ 44 + { 45 + Name: "count", 46 + Label: "Count", 47 + Type: "number", 48 + Required: true, 49 + DefaultValue: 10, 50 + HelpText: "Maximum number of items to output", 51 + }, 52 + }, 53 + } 54 + }
+91
nodes/transforms/sort.go
··· 1 + package transforms 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "sort" 7 + 8 + "github.com/kierank/pipes/nodes" 9 + ) 10 + 11 + type SortNode struct{} 12 + 13 + func (n *SortNode) Type() string { return "sort" } 14 + func (n *SortNode) Label() string { return "Sort" } 15 + func (n *SortNode) Description() string { return "Sort items by a field" } 16 + func (n *SortNode) Category() string { return "transform" } 17 + func (n *SortNode) Inputs() int { return 1 } 18 + func (n *SortNode) Outputs() int { return 1 } 19 + 20 + func (n *SortNode) Execute(ctx context.Context, config map[string]interface{}, inputs [][]interface{}, execCtx *nodes.Context) ([]interface{}, error) { 21 + if len(inputs) == 0 { 22 + return []interface{}{}, nil 23 + } 24 + 25 + items := inputs[0] 26 + field, _ := config["field"].(string) 27 + order, _ := config["order"].(string) 28 + 29 + if field == "" { 30 + return items, nil 31 + } 32 + 33 + if order == "" { 34 + order = "asc" 35 + } 36 + 37 + // Create a sortable slice 38 + sorted := make([]interface{}, len(items)) 39 + copy(sorted, items) 40 + 41 + sort.SliceStable(sorted, func(i, j int) bool { 42 + iMap, iOk := sorted[i].(map[string]interface{}) 43 + jMap, jOk := sorted[j].(map[string]interface{}) 44 + 45 + if !iOk || !jOk { 46 + return false 47 + } 48 + 49 + iVal := fmt.Sprintf("%v", getNestedValue(iMap, field)) 50 + jVal := fmt.Sprintf("%v", getNestedValue(jMap, field)) 51 + 52 + if order == "desc" { 53 + return iVal > jVal 54 + } 55 + return iVal < jVal 56 + }) 57 + 58 + execCtx.Log("sort", "info", fmt.Sprintf("Sorted %d items by %s (%s)", len(sorted), field, order)) 59 + 60 + return sorted, nil 61 + } 62 + 63 + func (n *SortNode) ValidateConfig(config map[string]interface{}) error { 64 + return nil 65 + } 66 + 67 + func (n *SortNode) GetConfigSchema() *nodes.ConfigSchema { 68 + return &nodes.ConfigSchema{ 69 + Fields: []nodes.ConfigField{ 70 + { 71 + Name: "field", 72 + Label: "Field Path", 73 + Type: "text", 74 + Required: true, 75 + Placeholder: "published", 76 + HelpText: "Field to sort by", 77 + }, 78 + { 79 + Name: "order", 80 + Label: "Order", 81 + Type: "select", 82 + Required: false, 83 + DefaultValue: "asc", 84 + Options: []nodes.FieldOption{ 85 + {Value: "asc", Label: "Ascending"}, 86 + {Value: "desc", Label: "Descending"}, 87 + }, 88 + }, 89 + }, 90 + } 91 + }
-23
package.json
··· 1 - { 2 - "name": "pipes", 3 - "module": "index.ts", 4 - "type": "module", 5 - "private": true, 6 - "scripts": { 7 - "dev": "bun run --hot src/index.ts", 8 - "start": "bun run src/index.ts", 9 - "format": "bun run --bun biome check --write ." 10 - }, 11 - "devDependencies": { 12 - "@types/bun": "latest" 13 - }, 14 - "peerDependencies": { 15 - "typescript": "^5" 16 - }, 17 - "dependencies": { 18 - "bun-sqlite-migrations": "^1.0.2", 19 - "kysely": "^0.28.9", 20 - "kysely-bun-sqlite": "^0.4.0", 21 - "nanoid": "^5.1.6" 22 - } 23 - }
-54
src/index.ts
··· 1 - import { env } from "bun"; 2 - import indexHTML from "./pages/index.html"; 3 - 4 - (() => { 5 - const required = ["ORIGIN"]; 6 - 7 - const missing = required.filter((key) => !process.env[key]); 8 - 9 - if (missing.length > 0) { 10 - console.warn( 11 - `[Startup] Missing required environment variables: ${missing.join(", ")}`, 12 - ); 13 - process.exit(1); 14 - } 15 - 16 - // Validate ORIGIN is HTTPS in production 17 - const origin = process.env.ORIGIN as string; 18 - const nodeEnv = process.env.NODE_ENV || "development"; 19 - 20 - if (nodeEnv === "production" && !origin.startsWith("https://")) { 21 - console.error( 22 - `[Startup] ORIGIN must use HTTPS in production (got: ${origin})`, 23 - ); 24 - process.exit(1); 25 - } 26 - 27 - console.log(`[Startup] Environment validated (${nodeEnv} mode)`); 28 - })(); 29 - 30 - const server = Bun.serve({ 31 - port: env.PORT ? Number.parseInt(env.PORT, 10) : 3000, 32 - routes: { 33 - "/": indexHTML, 34 - }, 35 - development: process.env.NODE_ENV !== "production", 36 - }); 37 - 38 - console.log(`Pipes running on ${env.ORIGIN}`) 39 - 40 - let is_shutting_down = false; 41 - function shutdown(sig: string) { 42 - if (is_shutting_down) return; 43 - is_shutting_down = true; 44 - 45 - console.log(`[Shutdown] triggering shutdown due to ${sig}`); 46 - 47 - server.stop(); 48 - console.log("[Shutdown] stopped server"); 49 - 50 - process.exit(0); 51 - } 52 - 53 - process.on("SIGTERM", () => shutdown("SIGTERM")); 54 - process.on("SIGINT", () => shutdown("SIGINT"));
-15
src/pages/index.html
··· 1 - <!DOCTYPE html> 2 - <html lang="en"> 3 - 4 - <head> 5 - <meta charset="UTF-8"> 6 - <meta name="viewport" content="width=device-width, initial-scale=1.0"> 7 - <title>Pipes</title> 8 - <link rel="icon" type="image/svg+xml" href="../../public/favicon.svg"> 9 - </head> 10 - 11 - <body> 12 - <h1>Pipes</h1> 13 - </body> 14 - 15 - </html>
-7
src/types/env.d.ts
··· 1 - declare module "bun" { 2 - interface Env { 3 - ORIGIN: string; 4 - NODE_ENV?: "dev" | "production"; 5 - PORT?: string; 6 - } 7 - }
+148
store/db.go
··· 1 + package store 2 + 3 + import ( 4 + "database/sql" 5 + "fmt" 6 + 7 + _ "github.com/mattn/go-sqlite3" 8 + ) 9 + 10 + type DB struct { 11 + *sql.DB 12 + } 13 + 14 + func New(path string) (*DB, error) { 15 + db, err := sql.Open("sqlite3", path) 16 + if err != nil { 17 + return nil, fmt.Errorf("open database: %w", err) 18 + } 19 + 20 + // Enable foreign keys and WAL mode 21 + if _, err := db.Exec("PRAGMA foreign_keys = ON"); err != nil { 22 + return nil, fmt.Errorf("enable foreign keys: %w", err) 23 + } 24 + 25 + if _, err := db.Exec("PRAGMA journal_mode = WAL"); err != nil { 26 + return nil, fmt.Errorf("enable WAL mode: %w", err) 27 + } 28 + 29 + store := &DB{DB: db} 30 + 31 + // Initialize schema 32 + if err := store.initSchema(); err != nil { 33 + return nil, fmt.Errorf("init schema: %w", err) 34 + } 35 + 36 + return store, nil 37 + } 38 + 39 + func (db *DB) initSchema() error { 40 + schema := ` 41 + -- Users (OAuth profiles) 42 + CREATE TABLE IF NOT EXISTS users ( 43 + id TEXT PRIMARY KEY, 44 + indiko_sub TEXT UNIQUE NOT NULL, 45 + username TEXT, 46 + name TEXT, 47 + email TEXT, 48 + photo TEXT, 49 + url TEXT, 50 + role TEXT NOT NULL DEFAULT 'user', 51 + created_at INTEGER NOT NULL, 52 + updated_at INTEGER NOT NULL 53 + ); 54 + 55 + -- Sessions (OAuth sessions) 56 + CREATE TABLE IF NOT EXISTS sessions ( 57 + id TEXT PRIMARY KEY, 58 + user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE, 59 + access_token TEXT NOT NULL, 60 + refresh_token TEXT, 61 + expires_at INTEGER NOT NULL, 62 + created_at INTEGER NOT NULL 63 + ); 64 + 65 + CREATE INDEX IF NOT EXISTS idx_sessions_user_id ON sessions(user_id); 66 + 67 + -- Pipes (pipeline configurations) 68 + CREATE TABLE IF NOT EXISTS pipes ( 69 + id TEXT PRIMARY KEY, 70 + user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE, 71 + name TEXT NOT NULL, 72 + description TEXT, 73 + config TEXT NOT NULL, 74 + is_public INTEGER NOT NULL DEFAULT 0, 75 + created_at INTEGER NOT NULL, 76 + updated_at INTEGER NOT NULL 77 + ); 78 + 79 + CREATE INDEX IF NOT EXISTS idx_pipes_user_id ON pipes(user_id); 80 + 81 + -- Scheduled jobs 82 + CREATE TABLE IF NOT EXISTS scheduled_jobs ( 83 + id TEXT PRIMARY KEY, 84 + pipe_id TEXT NOT NULL UNIQUE REFERENCES pipes(id) ON DELETE CASCADE, 85 + cron_expression TEXT NOT NULL, 86 + next_run_at INTEGER NOT NULL, 87 + last_run_at INTEGER, 88 + enabled INTEGER NOT NULL DEFAULT 1, 89 + created_at INTEGER NOT NULL, 90 + updated_at INTEGER NOT NULL 91 + ); 92 + 93 + CREATE INDEX IF NOT EXISTS idx_jobs_next_run ON scheduled_jobs(next_run_at, enabled); 94 + 95 + -- Execution history 96 + CREATE TABLE IF NOT EXISTS pipe_executions ( 97 + id TEXT PRIMARY KEY, 98 + pipe_id TEXT NOT NULL REFERENCES pipes(id) ON DELETE CASCADE, 99 + status TEXT NOT NULL, 100 + trigger_type TEXT NOT NULL, 101 + started_at INTEGER NOT NULL, 102 + completed_at INTEGER, 103 + duration_ms INTEGER, 104 + items_processed INTEGER, 105 + error_message TEXT, 106 + metadata TEXT 107 + ); 108 + 109 + CREATE INDEX IF NOT EXISTS idx_executions_pipe_id ON pipe_executions(pipe_id); 110 + CREATE INDEX IF NOT EXISTS idx_executions_status ON pipe_executions(status); 111 + 112 + -- Execution logs (detailed step logs) 113 + CREATE TABLE IF NOT EXISTS execution_logs ( 114 + id TEXT PRIMARY KEY, 115 + execution_id TEXT NOT NULL REFERENCES pipe_executions(id) ON DELETE CASCADE, 116 + node_id TEXT NOT NULL, 117 + level TEXT NOT NULL, 118 + message TEXT NOT NULL, 119 + timestamp INTEGER NOT NULL, 120 + metadata TEXT 121 + ); 122 + 123 + CREATE INDEX IF NOT EXISTS idx_logs_execution_id ON execution_logs(execution_id); 124 + 125 + -- Source cache (avoid redundant fetches) 126 + CREATE TABLE IF NOT EXISTS source_cache ( 127 + id TEXT PRIMARY KEY, 128 + pipe_id TEXT NOT NULL REFERENCES pipes(id) ON DELETE CASCADE, 129 + node_id TEXT NOT NULL, 130 + cache_key TEXT NOT NULL, 131 + data TEXT NOT NULL, 132 + etag TEXT, 133 + last_modified TEXT, 134 + expires_at INTEGER NOT NULL, 135 + created_at INTEGER NOT NULL 136 + ); 137 + 138 + CREATE INDEX IF NOT EXISTS idx_cache_pipe_node ON source_cache(pipe_id, node_id); 139 + CREATE INDEX IF NOT EXISTS idx_cache_expires ON source_cache(expires_at); 140 + ` 141 + 142 + _, err := db.Exec(schema) 143 + if err != nil { 144 + return fmt.Errorf("execute schema: %w", err) 145 + } 146 + 147 + return nil 148 + }
+221
store/executions.go
··· 1 + package store 2 + 3 + import ( 4 + "database/sql" 5 + "fmt" 6 + "time" 7 + 8 + "github.com/google/uuid" 9 + ) 10 + 11 + type PipeExecution struct { 12 + ID string 13 + PipeID string 14 + Status string 15 + TriggerType string 16 + StartedAt int64 17 + CompletedAt *int64 18 + DurationMs *int64 19 + ItemsProcessed *int 20 + ErrorMessage *string 21 + Metadata *string 22 + } 23 + 24 + type ExecutionLog struct { 25 + ID string 26 + ExecutionID string 27 + NodeID string 28 + Level string 29 + Message string 30 + Timestamp int64 31 + Metadata *string 32 + } 33 + 34 + func (db *DB) CreateExecution(id, pipeID, triggerType string, startedAt int64) error { 35 + _, err := db.Exec(` 36 + INSERT INTO pipe_executions (id, pipe_id, status, trigger_type, started_at) 37 + VALUES (?, ?, ?, ?, ?) 38 + `, id, pipeID, "running", triggerType, startedAt) 39 + 40 + if err != nil { 41 + return fmt.Errorf("insert execution: %w", err) 42 + } 43 + 44 + return nil 45 + } 46 + 47 + func (db *DB) UpdateExecutionSuccess(id string, completedAt, durationMs int64, itemsProcessed int) error { 48 + _, err := db.Exec(` 49 + UPDATE pipe_executions 50 + SET status = ?, completed_at = ?, duration_ms = ?, items_processed = ? 51 + WHERE id = ? 52 + `, "success", completedAt, durationMs, itemsProcessed, id) 53 + 54 + if err != nil { 55 + return fmt.Errorf("update execution: %w", err) 56 + } 57 + 58 + return nil 59 + } 60 + 61 + func (db *DB) UpdateExecutionFailed(id string, completedAt, durationMs int64, errorMessage string) error { 62 + _, err := db.Exec(` 63 + UPDATE pipe_executions 64 + SET status = ?, completed_at = ?, duration_ms = ?, error_message = ? 65 + WHERE id = ? 66 + `, "failed", completedAt, durationMs, errorMessage, id) 67 + 68 + if err != nil { 69 + return fmt.Errorf("update execution: %w", err) 70 + } 71 + 72 + return nil 73 + } 74 + 75 + func (db *DB) GetExecution(id string) (*PipeExecution, error) { 76 + exec := &PipeExecution{} 77 + var completedAt, durationMs sql.NullInt64 78 + var itemsProcessed sql.NullInt64 79 + var errorMessage, metadata sql.NullString 80 + 81 + err := db.QueryRow(` 82 + SELECT id, pipe_id, status, trigger_type, started_at, completed_at, duration_ms, items_processed, error_message, metadata 83 + FROM pipe_executions 84 + WHERE id = ? 85 + `, id).Scan(&exec.ID, &exec.PipeID, &exec.Status, &exec.TriggerType, &exec.StartedAt, &completedAt, &durationMs, &itemsProcessed, &errorMessage, &metadata) 86 + 87 + if err == sql.ErrNoRows { 88 + return nil, nil 89 + } 90 + 91 + if err != nil { 92 + return nil, fmt.Errorf("query execution: %w", err) 93 + } 94 + 95 + if completedAt.Valid { 96 + val := completedAt.Int64 97 + exec.CompletedAt = &val 98 + } 99 + 100 + if durationMs.Valid { 101 + val := durationMs.Int64 102 + exec.DurationMs = &val 103 + } 104 + 105 + if itemsProcessed.Valid { 106 + val := int(itemsProcessed.Int64) 107 + exec.ItemsProcessed = &val 108 + } 109 + 110 + if errorMessage.Valid { 111 + exec.ErrorMessage = &errorMessage.String 112 + } 113 + 114 + if metadata.Valid { 115 + exec.Metadata = &metadata.String 116 + } 117 + 118 + return exec, nil 119 + } 120 + 121 + func (db *DB) GetPipeExecutions(pipeID string, limit int) ([]*PipeExecution, error) { 122 + rows, err := db.Query(` 123 + SELECT id, pipe_id, status, trigger_type, started_at, completed_at, duration_ms, items_processed, error_message, metadata 124 + FROM pipe_executions 125 + WHERE pipe_id = ? 126 + ORDER BY started_at DESC 127 + LIMIT ? 128 + `, pipeID, limit) 129 + 130 + if err != nil { 131 + return nil, fmt.Errorf("query executions: %w", err) 132 + } 133 + defer rows.Close() 134 + 135 + var executions []*PipeExecution 136 + for rows.Next() { 137 + exec := &PipeExecution{} 138 + var completedAt, durationMs sql.NullInt64 139 + var itemsProcessed sql.NullInt64 140 + var errorMessage, metadata sql.NullString 141 + 142 + if err := rows.Scan(&exec.ID, &exec.PipeID, &exec.Status, &exec.TriggerType, &exec.StartedAt, &completedAt, &durationMs, &itemsProcessed, &errorMessage, &metadata); err != nil { 143 + return nil, fmt.Errorf("scan execution: %w", err) 144 + } 145 + 146 + if completedAt.Valid { 147 + val := completedAt.Int64 148 + exec.CompletedAt = &val 149 + } 150 + 151 + if durationMs.Valid { 152 + val := durationMs.Int64 153 + exec.DurationMs = &val 154 + } 155 + 156 + if itemsProcessed.Valid { 157 + val := int(itemsProcessed.Int64) 158 + exec.ItemsProcessed = &val 159 + } 160 + 161 + if errorMessage.Valid { 162 + exec.ErrorMessage = &errorMessage.String 163 + } 164 + 165 + if metadata.Valid { 166 + exec.Metadata = &metadata.String 167 + } 168 + 169 + executions = append(executions, exec) 170 + } 171 + 172 + return executions, nil 173 + } 174 + 175 + func (db *DB) LogExecution(executionID, nodeID, level, message string) error { 176 + logID := uuid.New().String() 177 + timestamp := time.Now().Unix() 178 + 179 + _, err := db.Exec(` 180 + INSERT INTO execution_logs (id, execution_id, node_id, level, message, timestamp) 181 + VALUES (?, ?, ?, ?, ?, ?) 182 + `, logID, executionID, nodeID, level, message, timestamp) 183 + 184 + if err != nil { 185 + return fmt.Errorf("insert log: %w", err) 186 + } 187 + 188 + return nil 189 + } 190 + 191 + func (db *DB) GetExecutionLogs(executionID string) ([]*ExecutionLog, error) { 192 + rows, err := db.Query(` 193 + SELECT id, execution_id, node_id, level, message, timestamp, metadata 194 + FROM execution_logs 195 + WHERE execution_id = ? 196 + ORDER BY timestamp ASC 197 + `, executionID) 198 + 199 + if err != nil { 200 + return nil, fmt.Errorf("query logs: %w", err) 201 + } 202 + defer rows.Close() 203 + 204 + var logs []*ExecutionLog 205 + for rows.Next() { 206 + log := &ExecutionLog{} 207 + var metadata sql.NullString 208 + 209 + if err := rows.Scan(&log.ID, &log.ExecutionID, &log.NodeID, &log.Level, &log.Message, &log.Timestamp, &metadata); err != nil { 210 + return nil, fmt.Errorf("scan log: %w", err) 211 + } 212 + 213 + if metadata.Valid { 214 + log.Metadata = &metadata.String 215 + } 216 + 217 + logs = append(logs, log) 218 + } 219 + 220 + return logs, nil 221 + }
+212
store/pipes.go
··· 1 + package store 2 + 3 + import ( 4 + "database/sql" 5 + "fmt" 6 + "time" 7 + 8 + "github.com/google/uuid" 9 + ) 10 + 11 + type Pipe struct { 12 + ID string 13 + UserID string 14 + Name string 15 + Description string 16 + Config string 17 + IsPublic bool 18 + CreatedAt int64 19 + UpdatedAt int64 20 + } 21 + 22 + type ScheduledJob struct { 23 + ID string 24 + PipeID string 25 + CronExpression string 26 + NextRunAt int64 27 + LastRunAt *int64 28 + Enabled bool 29 + CreatedAt int64 30 + UpdatedAt int64 31 + } 32 + 33 + func (db *DB) CreatePipe(userID, name, description, config string, isPublic bool) (*Pipe, error) { 34 + now := time.Now().Unix() 35 + pipe := &Pipe{ 36 + ID: uuid.New().String(), 37 + UserID: userID, 38 + Name: name, 39 + Description: description, 40 + Config: config, 41 + IsPublic: isPublic, 42 + CreatedAt: now, 43 + UpdatedAt: now, 44 + } 45 + 46 + _, err := db.Exec(` 47 + INSERT INTO pipes (id, user_id, name, description, config, is_public, created_at, updated_at) 48 + VALUES (?, ?, ?, ?, ?, ?, ?, ?) 49 + `, pipe.ID, pipe.UserID, pipe.Name, pipe.Description, pipe.Config, btoi(pipe.IsPublic), pipe.CreatedAt, pipe.UpdatedAt) 50 + 51 + if err != nil { 52 + return nil, fmt.Errorf("insert pipe: %w", err) 53 + } 54 + 55 + return pipe, nil 56 + } 57 + 58 + func (db *DB) GetPipe(id string) (*Pipe, error) { 59 + pipe := &Pipe{} 60 + var isPublic int 61 + 62 + err := db.QueryRow(` 63 + SELECT id, user_id, name, description, config, is_public, created_at, updated_at 64 + FROM pipes 65 + WHERE id = ? 66 + `, id).Scan(&pipe.ID, &pipe.UserID, &pipe.Name, &pipe.Description, &pipe.Config, &isPublic, &pipe.CreatedAt, &pipe.UpdatedAt) 67 + 68 + if err == sql.ErrNoRows { 69 + return nil, nil 70 + } 71 + 72 + if err != nil { 73 + return nil, fmt.Errorf("query pipe: %w", err) 74 + } 75 + 76 + pipe.IsPublic = isPublic == 1 77 + return pipe, nil 78 + } 79 + 80 + func (db *DB) GetUserPipes(userID string) ([]*Pipe, error) { 81 + rows, err := db.Query(` 82 + SELECT id, user_id, name, description, config, is_public, created_at, updated_at 83 + FROM pipes 84 + WHERE user_id = ? 85 + ORDER BY updated_at DESC 86 + `, userID) 87 + 88 + if err != nil { 89 + return nil, fmt.Errorf("query pipes: %w", err) 90 + } 91 + defer rows.Close() 92 + 93 + var pipes []*Pipe 94 + for rows.Next() { 95 + pipe := &Pipe{} 96 + var isPublic int 97 + 98 + if err := rows.Scan(&pipe.ID, &pipe.UserID, &pipe.Name, &pipe.Description, &pipe.Config, &isPublic, &pipe.CreatedAt, &pipe.UpdatedAt); err != nil { 99 + return nil, fmt.Errorf("scan pipe: %w", err) 100 + } 101 + 102 + pipe.IsPublic = isPublic == 1 103 + pipes = append(pipes, pipe) 104 + } 105 + 106 + return pipes, nil 107 + } 108 + 109 + func (db *DB) UpdatePipe(pipe *Pipe) error { 110 + pipe.UpdatedAt = time.Now().Unix() 111 + 112 + _, err := db.Exec(` 113 + UPDATE pipes 114 + SET name = ?, description = ?, config = ?, is_public = ?, updated_at = ? 115 + WHERE id = ? 116 + `, pipe.Name, pipe.Description, pipe.Config, btoi(pipe.IsPublic), pipe.UpdatedAt, pipe.ID) 117 + 118 + if err != nil { 119 + return fmt.Errorf("update pipe: %w", err) 120 + } 121 + 122 + return nil 123 + } 124 + 125 + func (db *DB) DeletePipe(id string) error { 126 + _, err := db.Exec("DELETE FROM pipes WHERE id = ?", id) 127 + if err != nil { 128 + return fmt.Errorf("delete pipe: %w", err) 129 + } 130 + return nil 131 + } 132 + 133 + func (db *DB) CreateScheduledJob(pipeID, cronExpression string, nextRunAt int64) (*ScheduledJob, error) { 134 + now := time.Now().Unix() 135 + job := &ScheduledJob{ 136 + ID: uuid.New().String(), 137 + PipeID: pipeID, 138 + CronExpression: cronExpression, 139 + NextRunAt: nextRunAt, 140 + Enabled: true, 141 + CreatedAt: now, 142 + UpdatedAt: now, 143 + } 144 + 145 + _, err := db.Exec(` 146 + INSERT INTO scheduled_jobs (id, pipe_id, cron_expression, next_run_at, enabled, created_at, updated_at) 147 + VALUES (?, ?, ?, ?, ?, ?, ?) 148 + `, job.ID, job.PipeID, job.CronExpression, job.NextRunAt, btoi(job.Enabled), job.CreatedAt, job.UpdatedAt) 149 + 150 + if err != nil { 151 + return nil, fmt.Errorf("insert scheduled job: %w", err) 152 + } 153 + 154 + return job, nil 155 + } 156 + 157 + func (db *DB) GetDueJobs(now int64) ([]*ScheduledJob, error) { 158 + rows, err := db.Query(` 159 + SELECT id, pipe_id, cron_expression, next_run_at, last_run_at, enabled, created_at, updated_at 160 + FROM scheduled_jobs 161 + WHERE enabled = 1 AND next_run_at <= ? 162 + `, now) 163 + 164 + if err != nil { 165 + return nil, fmt.Errorf("query due jobs: %w", err) 166 + } 167 + defer rows.Close() 168 + 169 + var jobs []*ScheduledJob 170 + for rows.Next() { 171 + job := &ScheduledJob{} 172 + var enabled int 173 + var lastRunAt sql.NullInt64 174 + 175 + if err := rows.Scan(&job.ID, &job.PipeID, &job.CronExpression, &job.NextRunAt, &lastRunAt, &enabled, &job.CreatedAt, &job.UpdatedAt); err != nil { 176 + return nil, fmt.Errorf("scan job: %w", err) 177 + } 178 + 179 + job.Enabled = enabled == 1 180 + if lastRunAt.Valid { 181 + val := lastRunAt.Int64 182 + job.LastRunAt = &val 183 + } 184 + 185 + jobs = append(jobs, job) 186 + } 187 + 188 + return jobs, nil 189 + } 190 + 191 + func (db *DB) UpdateJobAfterRun(id string, lastRunAt, nextRunAt int64) error { 192 + now := time.Now().Unix() 193 + 194 + _, err := db.Exec(` 195 + UPDATE scheduled_jobs 196 + SET last_run_at = ?, next_run_at = ?, updated_at = ? 197 + WHERE id = ? 198 + `, lastRunAt, nextRunAt, now, id) 199 + 200 + if err != nil { 201 + return fmt.Errorf("update job: %w", err) 202 + } 203 + 204 + return nil 205 + } 206 + 207 + func btoi(b bool) int { 208 + if b { 209 + return 1 210 + } 211 + return 0 212 + }
+170
store/users.go
··· 1 + package store 2 + 3 + import ( 4 + "database/sql" 5 + "fmt" 6 + "time" 7 + 8 + "github.com/google/uuid" 9 + ) 10 + 11 + type User struct { 12 + ID string 13 + IndikoSub string 14 + Username string 15 + Name string 16 + Email string 17 + Photo string 18 + URL string 19 + Role string 20 + CreatedAt int64 21 + UpdatedAt int64 22 + } 23 + 24 + type Session struct { 25 + ID string 26 + UserID string 27 + AccessToken string 28 + RefreshToken string 29 + ExpiresAt int64 30 + CreatedAt int64 31 + } 32 + 33 + func (db *DB) CreateUser(indikoSub, username, name, email, photo, url string) (*User, error) { 34 + now := time.Now().Unix() 35 + user := &User{ 36 + ID: uuid.New().String(), 37 + IndikoSub: indikoSub, 38 + Username: username, 39 + Name: name, 40 + Email: email, 41 + Photo: photo, 42 + URL: url, 43 + Role: "user", 44 + CreatedAt: now, 45 + UpdatedAt: now, 46 + } 47 + 48 + _, err := db.Exec(` 49 + INSERT INTO users (id, indiko_sub, username, name, email, photo, url, role, created_at, updated_at) 50 + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 51 + `, user.ID, user.IndikoSub, user.Username, user.Name, user.Email, user.Photo, user.URL, user.Role, user.CreatedAt, user.UpdatedAt) 52 + 53 + if err != nil { 54 + return nil, fmt.Errorf("insert user: %w", err) 55 + } 56 + 57 + return user, nil 58 + } 59 + 60 + func (db *DB) GetUserByIndikoSub(indikoSub string) (*User, error) { 61 + user := &User{} 62 + err := db.QueryRow(` 63 + SELECT id, indiko_sub, username, name, email, photo, url, role, created_at, updated_at 64 + FROM users 65 + WHERE indiko_sub = ? 66 + `, indikoSub).Scan(&user.ID, &user.IndikoSub, &user.Username, &user.Name, &user.Email, &user.Photo, &user.URL, &user.Role, &user.CreatedAt, &user.UpdatedAt) 67 + 68 + if err == sql.ErrNoRows { 69 + return nil, nil 70 + } 71 + 72 + if err != nil { 73 + return nil, fmt.Errorf("query user: %w", err) 74 + } 75 + 76 + return user, nil 77 + } 78 + 79 + func (db *DB) GetUserByID(id string) (*User, error) { 80 + user := &User{} 81 + err := db.QueryRow(` 82 + SELECT id, indiko_sub, username, name, email, photo, url, role, created_at, updated_at 83 + FROM users 84 + WHERE id = ? 85 + `, id).Scan(&user.ID, &user.IndikoSub, &user.Username, &user.Name, &user.Email, &user.Photo, &user.URL, &user.Role, &user.CreatedAt, &user.UpdatedAt) 86 + 87 + if err == sql.ErrNoRows { 88 + return nil, nil 89 + } 90 + 91 + if err != nil { 92 + return nil, fmt.Errorf("query user: %w", err) 93 + } 94 + 95 + return user, nil 96 + } 97 + 98 + func (db *DB) UpdateUser(user *User) error { 99 + user.UpdatedAt = time.Now().Unix() 100 + 101 + _, err := db.Exec(` 102 + UPDATE users 103 + SET username = ?, name = ?, email = ?, photo = ?, url = ?, updated_at = ? 104 + WHERE id = ? 105 + `, user.Username, user.Name, user.Email, user.Photo, user.URL, user.UpdatedAt, user.ID) 106 + 107 + if err != nil { 108 + return fmt.Errorf("update user: %w", err) 109 + } 110 + 111 + return nil 112 + } 113 + 114 + func (db *DB) CreateSession(userID, accessToken, refreshToken string, expiresAt int64) (*Session, error) { 115 + session := &Session{ 116 + ID: uuid.New().String(), 117 + UserID: userID, 118 + AccessToken: accessToken, 119 + RefreshToken: refreshToken, 120 + ExpiresAt: expiresAt, 121 + CreatedAt: time.Now().Unix(), 122 + } 123 + 124 + _, err := db.Exec(` 125 + INSERT INTO sessions (id, user_id, access_token, refresh_token, expires_at, created_at) 126 + VALUES (?, ?, ?, ?, ?, ?) 127 + `, session.ID, session.UserID, session.AccessToken, session.RefreshToken, session.ExpiresAt, session.CreatedAt) 128 + 129 + if err != nil { 130 + return nil, fmt.Errorf("insert session: %w", err) 131 + } 132 + 133 + return session, nil 134 + } 135 + 136 + func (db *DB) GetSessionByID(id string) (*Session, error) { 137 + session := &Session{} 138 + err := db.QueryRow(` 139 + SELECT id, user_id, access_token, refresh_token, expires_at, created_at 140 + FROM sessions 141 + WHERE id = ? 142 + `, id).Scan(&session.ID, &session.UserID, &session.AccessToken, &session.RefreshToken, &session.ExpiresAt, &session.CreatedAt) 143 + 144 + if err == sql.ErrNoRows { 145 + return nil, nil 146 + } 147 + 148 + if err != nil { 149 + return nil, fmt.Errorf("query session: %w", err) 150 + } 151 + 152 + return session, nil 153 + } 154 + 155 + func (db *DB) DeleteSession(id string) error { 156 + _, err := db.Exec("DELETE FROM sessions WHERE id = ?", id) 157 + if err != nil { 158 + return fmt.Errorf("delete session: %w", err) 159 + } 160 + return nil 161 + } 162 + 163 + func (db *DB) DeleteExpiredSessions() error { 164 + now := time.Now().Unix() 165 + _, err := db.Exec("DELETE FROM sessions WHERE expires_at < ?", now) 166 + if err != nil { 167 + return fmt.Errorf("delete expired sessions: %w", err) 168 + } 169 + return nil 170 + }
-33
tsconfig.json
··· 1 - { 2 - "compilerOptions": { 3 - // Environment setup & latest features 4 - "lib": ["ESNext", "DOM", "DOM.Iterable"], 5 - "target": "ESNext", 6 - "module": "Preserve", 7 - "moduleDetection": "force", 8 - "jsx": "preserve", 9 - "allowJs": true, 10 - 11 - // Bundler mode 12 - "moduleResolution": "bundler", 13 - "allowImportingTsExtensions": true, 14 - "verbatimModuleSyntax": true, 15 - "noEmit": true, 16 - 17 - // Decorators 18 - "experimentalDecorators": true, 19 - "useDefineForClassFields": false, 20 - 21 - // Best practices 22 - "strict": true, 23 - "skipLibCheck": true, 24 - "noFallthroughCasesInSwitch": true, 25 - "noUncheckedIndexedAccess": true, 26 - "noImplicitOverride": true, 27 - 28 - // Some stricter flags (disabled by default) 29 - "noUnusedLocals": false, 30 - "noUnusedParameters": false, 31 - "noPropertyAccessFromIndexSignature": false 32 - } 33 - }
+362
web/server.go
··· 1 + package web 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "html/template" 8 + "net/http" 9 + 10 + "github.com/charmbracelet/log" 11 + "github.com/kierank/pipes/auth" 12 + "github.com/kierank/pipes/config" 13 + "github.com/kierank/pipes/engine" 14 + "github.com/kierank/pipes/store" 15 + ) 16 + 17 + type Server struct { 18 + cfg *config.Config 19 + db *store.DB 20 + server *http.Server 21 + sessionManager *auth.SessionManager 22 + oauthClient *auth.OAuthClient 23 + templates *template.Template 24 + logger *log.Logger 25 + } 26 + 27 + func NewServer(cfg *config.Config, db *store.DB, logger *log.Logger) *Server { 28 + return &Server{ 29 + cfg: cfg, 30 + db: db, 31 + sessionManager: auth.NewSessionManager(cfg, db), 32 + oauthClient: auth.NewOAuthClient(cfg, db), 33 + logger: logger, 34 + } 35 + } 36 + 37 + func (s *Server) Start() error { 38 + // Load templates 39 + tmpl, err := template.ParseGlob("web/templates/*.html") 40 + if err != nil { 41 + return fmt.Errorf("failed to load templates: %w", err) 42 + } 43 + s.templates = tmpl 44 + 45 + mux := http.NewServeMux() 46 + 47 + // Static files 48 + mux.Handle("/public/", http.StripPrefix("/public/", http.FileServer(http.Dir("public")))) 49 + 50 + // Public routes 51 + mux.HandleFunc("/", s.handleIndex) 52 + mux.HandleFunc("/health", s.handleHealth) 53 + 54 + // Auth routes 55 + mux.HandleFunc("/auth/login", s.handleLogin) 56 + mux.HandleFunc("/auth/callback", s.handleCallback) 57 + mux.HandleFunc("/auth/logout", s.handleLogout) 58 + 59 + // Protected routes 60 + mux.HandleFunc("/dashboard", s.sessionManager.RequireAuth(s.handleDashboard)) 61 + mux.HandleFunc("/pipes/", s.sessionManager.RequireAuth(s.handlePipeEditor)) 62 + 63 + // API routes 64 + mux.HandleFunc("/api/me", s.sessionManager.RequireAuth(s.handleAPIMe)) 65 + mux.HandleFunc("/api/pipes", s.sessionManager.RequireAuth(s.handleAPIPipes)) 66 + mux.HandleFunc("/api/pipes/", s.sessionManager.RequireAuth(s.handleAPIPipe)) 67 + mux.HandleFunc("/api/node-types", s.handleAPINodeTypes) 68 + 69 + s.server = &http.Server{ 70 + Addr: fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.Port), 71 + Handler: mux, 72 + } 73 + 74 + return s.server.ListenAndServe() 75 + } 76 + 77 + func (s *Server) Shutdown(ctx context.Context) error { 78 + if s.server != nil { 79 + return s.server.Shutdown(ctx) 80 + } 81 + return nil 82 + } 83 + 84 + // Handlers 85 + 86 + func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) { 87 + // Check if user is authenticated 88 + user, _ := s.sessionManager.GetCurrentUser(r) 89 + if user != nil { 90 + http.Redirect(w, r, "/dashboard", http.StatusSeeOther) 91 + return 92 + } 93 + 94 + w.Header().Set("Content-Type", "text/html") 95 + s.templates.ExecuteTemplate(w, "index.html", nil) 96 + } 97 + 98 + func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { 99 + w.Write([]byte("OK")) 100 + } 101 + 102 + func (s *Server) handleLogin(w http.ResponseWriter, r *http.Request) { 103 + authURL, err := s.oauthClient.GetAuthorizationURL() 104 + if err != nil { 105 + s.logger.Error("failed to generate auth URL", "error", err) 106 + s.renderError(w, "Configuration Error", "Failed to start authentication process. Please contact the administrator.", err.Error()) 107 + return 108 + } 109 + 110 + http.Redirect(w, r, authURL, http.StatusSeeOther) 111 + } 112 + 113 + func (s *Server) handleCallback(w http.ResponseWriter, r *http.Request) { 114 + code := r.URL.Query().Get("code") 115 + state := r.URL.Query().Get("state") 116 + 117 + if code == "" || state == "" { 118 + s.renderError(w, "Invalid Request", "Missing authorization code or state parameter.", "") 119 + return 120 + } 121 + 122 + user, session, err := s.oauthClient.HandleCallback(state, code) 123 + if err != nil { 124 + s.logger.Error("oauth callback error", "error", err) 125 + s.renderError(w, "Authentication Failed", "We couldn't sign you in with Indiko. Please try again.", err.Error()) 126 + return 127 + } 128 + 129 + if err := s.sessionManager.SetSession(w, r, session.ID); err != nil { 130 + s.logger.Error("failed to set session", "error", err) 131 + s.renderError(w, "Session Error", "Authentication succeeded, but we couldn't create your session.", err.Error()) 132 + return 133 + } 134 + 135 + s.logger.Info("user authenticated", "name", user.Name, "email", user.Email) 136 + http.Redirect(w, r, "/dashboard", http.StatusSeeOther) 137 + } 138 + 139 + func (s *Server) handleLogout(w http.ResponseWriter, r *http.Request) { 140 + sessionID, _ := s.sessionManager.GetSessionID(r) 141 + if sessionID != "" { 142 + s.db.DeleteSession(sessionID) 143 + } 144 + 145 + s.sessionManager.ClearSession(w, r) 146 + http.Redirect(w, r, "/", http.StatusSeeOther) 147 + } 148 + 149 + func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) { 150 + user := auth.GetUserFromContext(r.Context()) 151 + if user == nil { 152 + http.Redirect(w, r, "/auth/login", http.StatusSeeOther) 153 + return 154 + } 155 + 156 + pipes, err := s.db.GetUserPipes(user.ID) 157 + if err != nil { 158 + s.logger.Error("failed to get pipes", "user_id", user.ID, "error", err) 159 + http.Error(w, "Failed to load pipes", http.StatusInternalServerError) 160 + return 161 + } 162 + 163 + data := map[string]interface{}{ 164 + "User": user, 165 + "Pipes": pipes, 166 + } 167 + 168 + w.Header().Set("Content-Type", "text/html") 169 + s.templates.ExecuteTemplate(w, "dashboard.html", data) 170 + } 171 + 172 + func (s *Server) handlePipeEditor(w http.ResponseWriter, r *http.Request) { 173 + // TODO: Implement pipe editor 174 + w.Write([]byte("Pipe editor - coming soon!")) 175 + } 176 + 177 + func (s *Server) handleAPIMe(w http.ResponseWriter, r *http.Request) { 178 + user := auth.GetUserFromContext(r.Context()) 179 + if user == nil { 180 + http.Error(w, "Unauthorized", http.StatusUnauthorized) 181 + return 182 + } 183 + 184 + w.Header().Set("Content-Type", "application/json") 185 + json.NewEncoder(w).Encode(user) 186 + } 187 + 188 + func (s *Server) handleAPIPipes(w http.ResponseWriter, r *http.Request) { 189 + user := auth.GetUserFromContext(r.Context()) 190 + if user == nil { 191 + http.Error(w, "Unauthorized", http.StatusUnauthorized) 192 + return 193 + } 194 + 195 + switch r.Method { 196 + case "GET": 197 + pipes, err := s.db.GetUserPipes(user.ID) 198 + if err != nil { 199 + http.Error(w, "Failed to load pipes", http.StatusInternalServerError) 200 + return 201 + } 202 + 203 + w.Header().Set("Content-Type", "application/json") 204 + json.NewEncoder(w).Encode(pipes) 205 + 206 + case "POST": 207 + var req struct { 208 + Name string `json:"name"` 209 + Description string `json:"description"` 210 + Config string `json:"config"` 211 + } 212 + 213 + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 214 + http.Error(w, "Invalid request", http.StatusBadRequest) 215 + return 216 + } 217 + 218 + if req.Config == "" { 219 + req.Config = `{"version":"1","nodes":[],"connections":[],"settings":{"enabled":false}}` 220 + } 221 + 222 + pipe, err := s.db.CreatePipe(user.ID, req.Name, req.Description, req.Config, false) 223 + if err != nil { 224 + http.Error(w, "Failed to create pipe", http.StatusInternalServerError) 225 + return 226 + } 227 + 228 + w.Header().Set("Content-Type", "application/json") 229 + w.WriteHeader(http.StatusCreated) 230 + json.NewEncoder(w).Encode(pipe) 231 + 232 + default: 233 + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) 234 + } 235 + } 236 + 237 + func (s *Server) handleAPIPipe(w http.ResponseWriter, r *http.Request) { 238 + user := auth.GetUserFromContext(r.Context()) 239 + if user == nil { 240 + http.Error(w, "Unauthorized", http.StatusUnauthorized) 241 + return 242 + } 243 + 244 + // Extract pipe ID from path 245 + pipeID := r.URL.Path[len("/api/pipes/"):] 246 + 247 + switch r.Method { 248 + case "GET": 249 + pipe, err := s.db.GetPipe(pipeID) 250 + if err != nil || pipe == nil { 251 + http.Error(w, "Pipe not found", http.StatusNotFound) 252 + return 253 + } 254 + 255 + if pipe.UserID != user.ID && !pipe.IsPublic { 256 + http.Error(w, "Forbidden", http.StatusForbidden) 257 + return 258 + } 259 + 260 + w.Header().Set("Content-Type", "application/json") 261 + json.NewEncoder(w).Encode(pipe) 262 + 263 + case "PUT": 264 + pipe, err := s.db.GetPipe(pipeID) 265 + if err != nil || pipe == nil { 266 + http.Error(w, "Pipe not found", http.StatusNotFound) 267 + return 268 + } 269 + 270 + if pipe.UserID != user.ID { 271 + http.Error(w, "Forbidden", http.StatusForbidden) 272 + return 273 + } 274 + 275 + var req struct { 276 + Name string `json:"name"` 277 + Description string `json:"description"` 278 + Config map[string]interface{} `json:"config"` 279 + } 280 + 281 + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 282 + http.Error(w, "Invalid request", http.StatusBadRequest) 283 + return 284 + } 285 + 286 + if req.Name != "" { 287 + pipe.Name = req.Name 288 + } 289 + if req.Description != "" { 290 + pipe.Description = req.Description 291 + } 292 + if req.Config != nil { 293 + configJSON, _ := json.Marshal(req.Config) 294 + pipe.Config = string(configJSON) 295 + } 296 + 297 + if err := s.db.UpdatePipe(pipe); err != nil { 298 + http.Error(w, "Failed to update pipe", http.StatusInternalServerError) 299 + return 300 + } 301 + 302 + w.Header().Set("Content-Type", "application/json") 303 + json.NewEncoder(w).Encode(map[string]bool{"success": true}) 304 + 305 + case "DELETE": 306 + pipe, err := s.db.GetPipe(pipeID) 307 + if err != nil || pipe == nil { 308 + http.Error(w, "Pipe not found", http.StatusNotFound) 309 + return 310 + } 311 + 312 + if pipe.UserID != user.ID { 313 + http.Error(w, "Forbidden", http.StatusForbidden) 314 + return 315 + } 316 + 317 + if err := s.db.DeletePipe(pipeID); err != nil { 318 + http.Error(w, "Failed to delete pipe", http.StatusInternalServerError) 319 + return 320 + } 321 + 322 + w.Header().Set("Content-Type", "application/json") 323 + json.NewEncoder(w).Encode(map[string]bool{"success": true}) 324 + 325 + default: 326 + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) 327 + } 328 + } 329 + 330 + func (s *Server) handleAPINodeTypes(w http.ResponseWriter, r *http.Request) { 331 + registry := engine.NewRegistry() 332 + nodes := registry.GetAll() 333 + 334 + var nodeTypes []map[string]interface{} 335 + for _, node := range nodes { 336 + nodeTypes = append(nodeTypes, map[string]interface{}{ 337 + "type": node.Type(), 338 + "label": node.Label(), 339 + "description": node.Description(), 340 + "category": node.Category(), 341 + "schema": node.GetConfigSchema(), 342 + }) 343 + } 344 + 345 + w.Header().Set("Content-Type", "application/json") 346 + json.NewEncoder(w).Encode(nodeTypes) 347 + } 348 + 349 + // Helper functions 350 + 351 + func (s *Server) renderError(w http.ResponseWriter, title, message, details string) { 352 + w.Header().Set("Content-Type", "text/html") 353 + w.WriteHeader(http.StatusBadRequest) 354 + 355 + data := map[string]interface{}{ 356 + "Title": title, 357 + "Message": message, 358 + "Details": details, 359 + } 360 + 361 + s.templates.ExecuteTemplate(w, "error.html", data) 362 + }
+194
web/templates/dashboard.html
··· 1 + <!DOCTYPE html> 2 + <html lang="en"> 3 + <head> 4 + <meta charset="UTF-8"> 5 + <meta name="viewport" content="width=device-width, initial-scale=1.0"> 6 + <title>Dashboard - Pipes</title> 7 + <link rel="icon" type="image/svg+xml" href="/public/favicon.svg"> 8 + <link rel="preconnect" href="https://fonts.googleapis.com"> 9 + <link rel="preconnect" href="https://fonts.gstatic.com" crossorigin> 10 + <link href="https://fonts.googleapis.com/css2?family=Space+Grotesk:wght@300..700&display=swap" rel="stylesheet"> 11 + <style> 12 + * { margin: 0; padding: 0; box-sizing: border-box; } 13 + body { 14 + font-family: 'Space Grotesk', sans-serif; 15 + background: #f5f5f0; 16 + min-height: 100vh; 17 + padding: 40px 20px; 18 + } 19 + .container { 20 + max-width: 1200px; 21 + margin: 0 auto; 22 + } 23 + header { 24 + background: #fff; 25 + border: 4px solid #26242b; 26 + padding: 20px 30px; 27 + margin-bottom: 30px; 28 + display: flex; 29 + justify-content: space-between; 30 + align-items: center; 31 + box-shadow: 8px 8px 0 #26242b; 32 + } 33 + h1 { 34 + color: #26242b; 35 + font-size: 32px; 36 + font-weight: 700; 37 + text-transform: uppercase; 38 + letter-spacing: -0.02em; 39 + } 40 + h1 .accent { 41 + color: #2563eb; 42 + } 43 + .user-info { 44 + display: flex; 45 + align-items: center; 46 + gap: 20px; 47 + } 48 + .user-name { 49 + color: #26242b; 50 + font-weight: 600; 51 + font-size: 16px; 52 + } 53 + .btn { 54 + display: inline-block; 55 + padding: 12px 24px; 56 + background: #ff6b35; 57 + color: #fff; 58 + border: 3px solid #26242b; 59 + font-size: 14px; 60 + font-weight: 700; 61 + text-decoration: none; 62 + font-family: 'Space Grotesk', sans-serif; 63 + text-transform: uppercase; 64 + letter-spacing: 0.05rem; 65 + box-shadow: 4px 4px 0 #26242b; 66 + cursor: pointer; 67 + transition: all 0.15s ease; 68 + } 69 + .btn:hover { 70 + transform: translate(2px, 2px); 71 + box-shadow: 2px 2px 0 #26242b; 72 + } 73 + .btn:active { 74 + transform: translate(4px, 4px); 75 + box-shadow: 0 0 0 #26242b; 76 + } 77 + .btn-secondary { 78 + background: #2563eb; 79 + color: #fff; 80 + } 81 + .btn-auth { 82 + background: #AB4967; 83 + color: #fff; 84 + } 85 + .content { 86 + background: #fff; 87 + border: 4px solid #26242b; 88 + padding: 40px; 89 + box-shadow: 8px 8px 0 #26242b; 90 + } 91 + h2 { 92 + color: #26242b; 93 + font-size: 28px; 94 + margin-bottom: 30px; 95 + text-transform: uppercase; 96 + font-weight: 700; 97 + letter-spacing: -0.02em; 98 + } 99 + h2 .accent { 100 + color: #ff6b35; 101 + } 102 + .pipes-list { 103 + display: grid; 104 + grid-template-columns: repeat(auto-fill, minmax(320px, 1fr)); 105 + gap: 24px; 106 + } 107 + .pipe-card { 108 + background: #fff; 109 + border: 3px solid #26242b; 110 + padding: 24px; 111 + box-shadow: 6px 6px 0 #26242b; 112 + transition: all 0.15s ease; 113 + } 114 + .pipe-card:hover { 115 + transform: translate(-2px, -2px); 116 + box-shadow: 8px 8px 0 #26242b; 117 + } 118 + .pipe-name { 119 + font-size: 20px; 120 + font-weight: 700; 121 + margin-bottom: 12px; 122 + color: #26242b; 123 + text-transform: uppercase; 124 + letter-spacing: -0.01em; 125 + } 126 + .pipe-desc { 127 + font-size: 14px; 128 + color: #666; 129 + margin-bottom: 20px; 130 + line-height: 1.5; 131 + } 132 + .empty-state { 133 + text-align: center; 134 + padding: 80px 20px; 135 + } 136 + .empty-state p { 137 + font-size: 20px; 138 + margin-bottom: 30px; 139 + color: #666; 140 + font-weight: 500; 141 + } 142 + </style> 143 + </head> 144 + <body> 145 + <div class="container"> 146 + <header> 147 + <h1><span class="accent">Pipes</span> Dashboard</h1> 148 + <div class="user-info"> 149 + <span class="user-name">{{ .User.Name }}</span> 150 + <form action="/auth/logout" method="post" style="display: inline;"> 151 + <button type="submit" class="btn btn-auth">Logout</button> 152 + </form> 153 + </div> 154 + </header> 155 + 156 + <div class="content"> 157 + <h2>Your <span class="accent">Pipes</span></h2> 158 + {{if .Pipes}} 159 + <div class="pipes-list"> 160 + {{range .Pipes}} 161 + <div class="pipe-card"> 162 + <div class="pipe-name">{{.Name}}</div> 163 + {{if .Description}} 164 + <div class="pipe-desc">{{.Description}}</div> 165 + {{end}} 166 + <a href="/pipes/{{.ID}}/edit" class="btn btn-secondary">Edit</a> 167 + </div> 168 + {{end}} 169 + </div> 170 + {{else}} 171 + <div class="empty-state"> 172 + <p>You haven't created any pipes yet!</p> 173 + <button class="btn" onclick="createPipe()">Create Your First Pipe</button> 174 + </div> 175 + {{end}} 176 + </div> 177 + </div> 178 + 179 + <script> 180 + function createPipe() { 181 + fetch('/api/pipes', { 182 + method: 'POST', 183 + headers: { 'Content-Type': 'application/json' }, 184 + body: JSON.stringify({ name: 'Untitled Pipe' }) 185 + }) 186 + .then(r => r.json()) 187 + .then(pipe => { 188 + window.location.href = '/pipes/' + pipe.id + '/edit'; 189 + }) 190 + .catch(err => alert('Failed to create pipe: ' + err)); 191 + } 192 + </script> 193 + </body> 194 + </html>
+112
web/templates/error.html
··· 1 + <!DOCTYPE html> 2 + <html lang="en"> 3 + <head> 4 + <meta charset="UTF-8"> 5 + <meta name="viewport" content="width=device-width, initial-scale=1.0"> 6 + <title>Error - Pipes</title> 7 + <link rel="icon" type="image/svg+xml" href="/public/favicon.svg"> 8 + <link rel="preconnect" href="https://fonts.googleapis.com"> 9 + <link rel="preconnect" href="https://fonts.gstatic.com" crossorigin> 10 + <link href="https://fonts.googleapis.com/css2?family=Space+Grotesk:wght@300..700&display=swap" rel="stylesheet"> 11 + <style> 12 + * { margin: 0; padding: 0; box-sizing: border-box; } 13 + body { 14 + font-family: 'Space Grotesk', sans-serif; 15 + background: #f5f5f0; 16 + min-height: 100vh; 17 + display: flex; 18 + align-items: center; 19 + justify-content: center; 20 + padding: 20px; 21 + } 22 + .container { 23 + background: #fff; 24 + border: 4px solid #26242b; 25 + padding: 60px 80px; 26 + box-shadow: 12px 12px 0 #26242b; 27 + text-align: center; 28 + max-width: 600px; 29 + } 30 + h1 { 31 + color: #AB4967; 32 + font-size: 48px; 33 + font-weight: 700; 34 + margin-bottom: 16px; 35 + text-transform: uppercase; 36 + letter-spacing: -0.02em; 37 + } 38 + .error-title { 39 + color: #26242b; 40 + font-size: 24px; 41 + font-weight: 700; 42 + margin-bottom: 16px; 43 + text-transform: uppercase; 44 + } 45 + .error-message { 46 + color: #4a4a4a; 47 + font-size: 16px; 48 + margin-bottom: 40px; 49 + font-weight: 500; 50 + line-height: 1.6; 51 + } 52 + .error-details { 53 + background: #f5f5f0; 54 + border: 3px solid #26242b; 55 + padding: 16px; 56 + margin-bottom: 40px; 57 + text-align: left; 58 + font-family: monospace; 59 + font-size: 14px; 60 + color: #666; 61 + word-break: break-word; 62 + } 63 + .btn { 64 + display: inline-block; 65 + padding: 1rem 2rem; 66 + background: #AB4967; 67 + color: #fff; 68 + border: 4px solid #26242b; 69 + font-size: 1rem; 70 + font-weight: 700; 71 + text-decoration: none; 72 + font-family: 'Space Grotesk', sans-serif; 73 + text-transform: uppercase; 74 + letter-spacing: 0.1rem; 75 + box-shadow: 6px 6px 0 #26242b; 76 + transition: all 0.15s ease; 77 + } 78 + .btn:hover { 79 + transform: translate(3px, 3px); 80 + box-shadow: 3px 3px 0 #26242b; 81 + } 82 + .btn:active { 83 + transform: translate(6px, 6px); 84 + box-shadow: 0 0 0 #26242b; 85 + } 86 + .btn-secondary { 87 + background: #fff; 88 + color: #26242b; 89 + } 90 + .button-group { 91 + display: flex; 92 + flex-direction: column; 93 + gap: 16px; 94 + align-items: center; 95 + } 96 + </style> 97 + </head> 98 + <body> 99 + <div class="container"> 100 + <h1>Error</h1> 101 + <div class="error-title">{{.Title}}</div> 102 + <div class="error-message">{{.Message}}</div> 103 + {{if .Details}} 104 + <div class="error-details">{{.Details}}</div> 105 + {{end}} 106 + <div class="button-group"> 107 + <a href="/" class="btn btn-secondary">Go Home</a> 108 + <a href="/auth/login" class="btn">Try Again</a> 109 + </div> 110 + </div> 111 + </body> 112 + </html>
+79
web/templates/index.html
··· 1 + <!DOCTYPE html> 2 + <html lang="en"> 3 + <head> 4 + <meta charset="UTF-8"> 5 + <meta name="viewport" content="width=device-width, initial-scale=1.0"> 6 + <title>Pipes - Visual Data Pipeline Builder</title> 7 + <link rel="icon" type="image/svg+xml" href="/public/favicon.svg"> 8 + <link rel="preconnect" href="https://fonts.googleapis.com"> 9 + <link rel="preconnect" href="https://fonts.gstatic.com" crossorigin> 10 + <link href="https://fonts.googleapis.com/css2?family=Space+Grotesk:wght@300..700&display=swap" rel="stylesheet"> 11 + <style> 12 + * { margin: 0; padding: 0; box-sizing: border-box; } 13 + body { 14 + font-family: 'Space Grotesk', sans-serif; 15 + background: #f5f5f0; 16 + min-height: 100vh; 17 + display: flex; 18 + align-items: center; 19 + justify-content: center; 20 + padding: 20px; 21 + } 22 + .container { 23 + background: #fff; 24 + border: 4px solid #26242b; 25 + padding: 60px 80px; 26 + box-shadow: 12px 12px 0 #26242b; 27 + text-align: center; 28 + max-width: 600px; 29 + } 30 + h1 { 31 + color: #26242b; 32 + font-size: 64px; 33 + font-weight: 700; 34 + margin-bottom: 16px; 35 + text-transform: uppercase; 36 + letter-spacing: -0.02em; 37 + } 38 + p { 39 + color: #4a4a4a; 40 + font-size: 20px; 41 + margin-bottom: 40px; 42 + font-weight: 500; 43 + } 44 + .btn { 45 + display: inline-block; 46 + padding: 1rem 2rem; 47 + background: #AB4967; 48 + color: #fff; 49 + border: 4px solid #26242b; 50 + font-size: 1rem; 51 + font-weight: 700; 52 + text-decoration: none; 53 + font-family: 'Space Grotesk', sans-serif; 54 + text-transform: uppercase; 55 + letter-spacing: 0.1rem; 56 + box-shadow: 6px 6px 0 #26242b; 57 + transition: all 0.15s ease; 58 + } 59 + .btn:hover { 60 + transform: translate(3px, 3px); 61 + box-shadow: 3px 3px 0 #26242b; 62 + } 63 + .btn:active { 64 + transform: translate(6px, 6px); 65 + box-shadow: 0 0 0 #26242b; 66 + } 67 + .accent { 68 + color: #AB4967; 69 + } 70 + </style> 71 + </head> 72 + <body> 73 + <div class="container"> 74 + <h1><span class="accent">Pipes</span></h1> 75 + <p>A visual data pipeline builder inspired by Yahoo Pipes</p> 76 + <a href="/auth/login" class="btn">Sign in with Indiko</a> 77 + </div> 78 + </body> 79 + </html>