bring back yahoo pipes!
1package main
2
3import (
4 "context"
5 "crypto/rand"
6 "encoding/base64"
7 "errors"
8 "fmt"
9 "net/http"
10 "os"
11 "os/signal"
12 "strings"
13 "syscall"
14 "time"
15
16 "github.com/charmbracelet/log"
17 "github.com/kierank/pipes/config"
18 "github.com/kierank/pipes/engine"
19 "github.com/kierank/pipes/store"
20 "github.com/kierank/pipes/web"
21)
22
23var (
24 version = "dev"
25 commitHash = "dev"
26 logger *log.Logger
27)
28
29func main() {
30 // Initialize logger with default level
31 logger = log.NewWithOptions(os.Stderr, log.Options{
32 ReportTimestamp: true,
33 Level: log.InfoLevel,
34 })
35
36 if len(os.Args) < 2 {
37 printUsage()
38 os.Exit(1)
39 }
40
41 command := os.Args[1]
42
43 switch command {
44 case "serve":
45 configPath := ""
46 // Check for -c or --config flag
47 for i := 2; i < len(os.Args); i++ {
48 if (os.Args[i] == "-c" || os.Args[i] == "--config") && i+1 < len(os.Args) {
49 configPath = os.Args[i+1]
50 break
51 }
52 }
53 serve(configPath)
54 case "init":
55 initConfig()
56 case "help", "--help", "-h":
57 printUsage()
58 case "version", "--version", "-v":
59 fmt.Printf("pipes %s (%s)\n", version, commitHash)
60 default:
61 fmt.Printf("Unknown command: %s\n\n", command)
62 printUsage()
63 os.Exit(1)
64 }
65}
66
67func printUsage() {
68 fmt.Println("Pipes - Visual data pipeline builder")
69 fmt.Println()
70 fmt.Println("Usage:")
71 fmt.Println(" pipes <command> [flags]")
72 fmt.Println()
73 fmt.Println("Commands:")
74 fmt.Println(" serve Start the server")
75 fmt.Println(" init [path] Create a sample config file (default: config.yaml)")
76 fmt.Println(" version Show version information")
77 fmt.Println(" help Show this help message")
78 fmt.Println()
79 fmt.Println("Serve Flags:")
80 fmt.Println(" -c, --config PATH Path to config file (optional, uses .env if not specified)")
81 fmt.Println()
82 fmt.Println("Examples:")
83 fmt.Println(" pipes init")
84 fmt.Println(" pipes serve -c config.yaml")
85 fmt.Println(" pipes serve # Uses .env file")
86 fmt.Println()
87}
88
89func serve(configPath string) {
90 // Load configuration
91 cfg, err := config.Load(configPath)
92 if err != nil {
93 logger.Fatal("failed to load config", "error", err)
94 }
95
96 // Set log level from config
97 level := parseLogLevel(cfg.LogLevel)
98 logger.SetLevel(level)
99
100 logger.Info("starting pipes",
101 "host", cfg.Host,
102 "port", cfg.Port,
103 "db_path", cfg.DatabasePath,
104 "log_level", cfg.LogLevel,
105 )
106
107 // Initialize database
108 db, err := store.New(cfg.DatabasePath)
109 if err != nil {
110 logger.Fatal("failed to initialize database", "error", err)
111 }
112 defer db.Close()
113
114 logger.Info("database initialized successfully")
115
116 // Initialize scheduler
117 scheduler := engine.NewScheduler(db, logger)
118 scheduler.Start()
119 defer scheduler.Stop()
120
121 logger.Info("scheduler started")
122
123 // Initialize web server
124 server := web.NewServer(cfg, db, logger)
125
126 // Start server in goroutine
127 serverErr := make(chan error, 1)
128 go func() {
129 logger.Info("starting server", "address", fmt.Sprintf("%s:%d", cfg.Host, cfg.Port))
130 if err := server.Start(); err != nil && !errors.Is(err, http.ErrServerClosed) {
131 serverErr <- err
132 }
133 }()
134
135 // Wait for interrupt signal or server error
136 sigChan := make(chan os.Signal, 1)
137 signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
138
139 select {
140 case <-sigChan:
141 logger.Info("shutting down gracefully...")
142 case err := <-serverErr:
143 logger.Fatal("server error", "error", err)
144 }
145
146 // Graceful shutdown
147 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
148 defer cancel()
149
150 if err := server.Shutdown(ctx); err != nil {
151 logger.Error("server shutdown error", "error", err)
152 }
153
154 logger.Info("shutdown complete")
155}
156
157func initConfig() {
158 configPath := "config.yaml"
159 if len(os.Args) > 2 {
160 configPath = os.Args[2]
161 }
162
163 if _, err := os.Stat(configPath); err == nil {
164 fmt.Printf("Config file already exists at %s\n", configPath)
165 fmt.Println("Remove it first or specify a different path:")
166 fmt.Printf(" pipes init %s.new\n", configPath)
167 os.Exit(1)
168 }
169
170 secret, err := generateSecret()
171 if err != nil {
172 logger.Fatal("failed to generate secret", "error", err)
173 }
174
175 configContent := `# Pipes Configuration
176# See https://github.com/yourusername/pipes for documentation
177
178# Server settings
179host: localhost
180port: 3001
181origin: http://localhost:3001
182env: development
183log_level: info # debug, info, warn, error, fatal
184
185# Database
186db_path: pipes.db
187
188# OAuth (Indiko)
189# Set these environment variables or replace with actual values:
190indiko_url: ${INDIKO_URL}
191indiko_client_id: ${INDIKO_CLIENT_ID}
192indiko_client_secret: ${INDIKO_CLIENT_SECRET}
193oauth_callback_url: http://localhost:3001/auth/callback
194
195# Session
196session_secret: ` + secret + `
197session_cookie_name: pipes_session
198`
199
200 if err := os.WriteFile(configPath, []byte(configContent), 0644); err != nil {
201 logger.Fatal("failed to write config", "error", err)
202 }
203
204 fmt.Printf("✓ Config file created at %s\n\n", configPath)
205 fmt.Println("Next steps:")
206 fmt.Println(" 1. Set your Indiko OAuth environment variables:")
207 fmt.Println(" export INDIKO_URL=http://localhost:3000")
208 fmt.Println(" export INDIKO_CLIENT_ID=http://localhost:3001")
209 fmt.Println()
210 fmt.Println(" 2. Or edit the config file directly to replace ${VAR} placeholders")
211 fmt.Println()
212 fmt.Println(" 3. Start the server:")
213 fmt.Printf(" pipes serve -c %s\n", configPath)
214 fmt.Println()
215 fmt.Println(" Or use environment variables with a .env file instead:")
216 fmt.Println(" cp .env.example .env")
217 fmt.Println(" pipes serve")
218}
219
220func generateSecret() (string, error) {
221 bytes := make([]byte, 32)
222 if _, err := rand.Read(bytes); err != nil {
223 return "", err
224 }
225 return base64.URLEncoding.EncodeToString(bytes), nil
226}
227
228func parseLogLevel(levelStr string) log.Level {
229 switch strings.ToLower(levelStr) {
230 case "debug":
231 return log.DebugLevel
232 case "info":
233 return log.InfoLevel
234 case "warn", "warning":
235 return log.WarnLevel
236 case "error":
237 return log.ErrorLevel
238 case "fatal":
239 return log.FatalLevel
240 default:
241 return log.InfoLevel
242 }
243}