bring back yahoo pipes!
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 243 lines 6.0 kB view raw
1package main 2 3import ( 4 "context" 5 "crypto/rand" 6 "encoding/base64" 7 "errors" 8 "fmt" 9 "net/http" 10 "os" 11 "os/signal" 12 "strings" 13 "syscall" 14 "time" 15 16 "github.com/charmbracelet/log" 17 "github.com/kierank/pipes/config" 18 "github.com/kierank/pipes/engine" 19 "github.com/kierank/pipes/store" 20 "github.com/kierank/pipes/web" 21) 22 23var ( 24 version = "dev" 25 commitHash = "dev" 26 logger *log.Logger 27) 28 29func main() { 30 // Initialize logger with default level 31 logger = log.NewWithOptions(os.Stderr, log.Options{ 32 ReportTimestamp: true, 33 Level: log.InfoLevel, 34 }) 35 36 if len(os.Args) < 2 { 37 printUsage() 38 os.Exit(1) 39 } 40 41 command := os.Args[1] 42 43 switch command { 44 case "serve": 45 configPath := "" 46 // Check for -c or --config flag 47 for i := 2; i < len(os.Args); i++ { 48 if (os.Args[i] == "-c" || os.Args[i] == "--config") && i+1 < len(os.Args) { 49 configPath = os.Args[i+1] 50 break 51 } 52 } 53 serve(configPath) 54 case "init": 55 initConfig() 56 case "help", "--help", "-h": 57 printUsage() 58 case "version", "--version", "-v": 59 fmt.Printf("pipes %s (%s)\n", version, commitHash) 60 default: 61 fmt.Printf("Unknown command: %s\n\n", command) 62 printUsage() 63 os.Exit(1) 64 } 65} 66 67func printUsage() { 68 fmt.Println("Pipes - Visual data pipeline builder") 69 fmt.Println() 70 fmt.Println("Usage:") 71 fmt.Println(" pipes <command> [flags]") 72 fmt.Println() 73 fmt.Println("Commands:") 74 fmt.Println(" serve Start the server") 75 fmt.Println(" init [path] Create a sample config file (default: config.yaml)") 76 fmt.Println(" version Show version information") 77 fmt.Println(" help Show this help message") 78 fmt.Println() 79 fmt.Println("Serve Flags:") 80 fmt.Println(" -c, --config PATH Path to config file (optional, uses .env if not specified)") 81 fmt.Println() 82 fmt.Println("Examples:") 83 fmt.Println(" pipes init") 84 fmt.Println(" pipes serve -c config.yaml") 85 fmt.Println(" pipes serve # Uses .env file") 86 fmt.Println() 87} 88 89func serve(configPath string) { 90 // Load configuration 91 cfg, err := config.Load(configPath) 92 if err != nil { 93 logger.Fatal("failed to load config", "error", err) 94 } 95 96 // Set log level from config 97 level := parseLogLevel(cfg.LogLevel) 98 logger.SetLevel(level) 99 100 logger.Info("starting pipes", 101 "host", cfg.Host, 102 "port", cfg.Port, 103 "db_path", cfg.DatabasePath, 104 "log_level", cfg.LogLevel, 105 ) 106 107 // Initialize database 108 db, err := store.New(cfg.DatabasePath) 109 if err != nil { 110 logger.Fatal("failed to initialize database", "error", err) 111 } 112 defer db.Close() 113 114 logger.Info("database initialized successfully") 115 116 // Initialize scheduler 117 scheduler := engine.NewScheduler(db, logger) 118 scheduler.Start() 119 defer scheduler.Stop() 120 121 logger.Info("scheduler started") 122 123 // Initialize web server 124 server := web.NewServer(cfg, db, logger) 125 126 // Start server in goroutine 127 serverErr := make(chan error, 1) 128 go func() { 129 logger.Info("starting server", "address", fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)) 130 if err := server.Start(); err != nil && !errors.Is(err, http.ErrServerClosed) { 131 serverErr <- err 132 } 133 }() 134 135 // Wait for interrupt signal or server error 136 sigChan := make(chan os.Signal, 1) 137 signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) 138 139 select { 140 case <-sigChan: 141 logger.Info("shutting down gracefully...") 142 case err := <-serverErr: 143 logger.Fatal("server error", "error", err) 144 } 145 146 // Graceful shutdown 147 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 148 defer cancel() 149 150 if err := server.Shutdown(ctx); err != nil { 151 logger.Error("server shutdown error", "error", err) 152 } 153 154 logger.Info("shutdown complete") 155} 156 157func initConfig() { 158 configPath := "config.yaml" 159 if len(os.Args) > 2 { 160 configPath = os.Args[2] 161 } 162 163 if _, err := os.Stat(configPath); err == nil { 164 fmt.Printf("Config file already exists at %s\n", configPath) 165 fmt.Println("Remove it first or specify a different path:") 166 fmt.Printf(" pipes init %s.new\n", configPath) 167 os.Exit(1) 168 } 169 170 secret, err := generateSecret() 171 if err != nil { 172 logger.Fatal("failed to generate secret", "error", err) 173 } 174 175 configContent := `# Pipes Configuration 176# See https://github.com/yourusername/pipes for documentation 177 178# Server settings 179host: localhost 180port: 3001 181origin: http://localhost:3001 182env: development 183log_level: info # debug, info, warn, error, fatal 184 185# Database 186db_path: pipes.db 187 188# OAuth (Indiko) 189# Set these environment variables or replace with actual values: 190indiko_url: ${INDIKO_URL} 191indiko_client_id: ${INDIKO_CLIENT_ID} 192indiko_client_secret: ${INDIKO_CLIENT_SECRET} 193oauth_callback_url: http://localhost:3001/auth/callback 194 195# Session 196session_secret: ` + secret + ` 197session_cookie_name: pipes_session 198` 199 200 if err := os.WriteFile(configPath, []byte(configContent), 0644); err != nil { 201 logger.Fatal("failed to write config", "error", err) 202 } 203 204 fmt.Printf("✓ Config file created at %s\n\n", configPath) 205 fmt.Println("Next steps:") 206 fmt.Println(" 1. Set your Indiko OAuth environment variables:") 207 fmt.Println(" export INDIKO_URL=http://localhost:3000") 208 fmt.Println(" export INDIKO_CLIENT_ID=http://localhost:3001") 209 fmt.Println() 210 fmt.Println(" 2. Or edit the config file directly to replace ${VAR} placeholders") 211 fmt.Println() 212 fmt.Println(" 3. Start the server:") 213 fmt.Printf(" pipes serve -c %s\n", configPath) 214 fmt.Println() 215 fmt.Println(" Or use environment variables with a .env file instead:") 216 fmt.Println(" cp .env.example .env") 217 fmt.Println(" pipes serve") 218} 219 220func generateSecret() (string, error) { 221 bytes := make([]byte, 32) 222 if _, err := rand.Read(bytes); err != nil { 223 return "", err 224 } 225 return base64.URLEncoding.EncodeToString(bytes), nil 226} 227 228func parseLogLevel(levelStr string) log.Level { 229 switch strings.ToLower(levelStr) { 230 case "debug": 231 return log.DebugLevel 232 case "info": 233 return log.InfoLevel 234 case "warn", "warning": 235 return log.WarnLevel 236 case "error": 237 return log.ErrorLevel 238 case "fatal": 239 return log.FatalLevel 240 default: 241 return log.InfoLevel 242 } 243}