bring back yahoo pipes!
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add better outputs

+228 -1
+4
nodes/node.go
··· 59 59 func (c *Context) Log(nodeID, level, message string) { 60 60 c.DB.LogExecution(c.ExecutionID, nodeID, level, message) 61 61 } 62 + 63 + func (c *Context) SaveOutput(format, content, contentType string) error { 64 + return c.DB.SavePipeOutput(c.PipeID, format, content, contentType) 65 + }
+5 -1
nodes/outputs/json.go
··· 33 33 return nil, err 34 34 } 35 35 36 + // Save output to database for public access 37 + if err := execCtx.SaveOutput("json", string(jsonData), "application/json"); err != nil { 38 + execCtx.Log("json-output", "error", "Failed to save output: "+err.Error()) 39 + } 40 + 36 41 execCtx.Log("json-output", "info", string(jsonData)) 37 42 38 - // Return the data (for potential chaining) 39 43 return data, nil 40 44 } 41 45
+6
nodes/outputs/rss.go
··· 87 87 } 88 88 89 89 rssOutput := fmt.Sprintf("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n%s", string(xmlData)) 90 + 91 + // Save output to database for public access 92 + if err := execCtx.SaveOutput("rss", rssOutput, "application/rss+xml"); err != nil { 93 + execCtx.Log("rss-output", "error", "Failed to save output: "+err.Error()) 94 + } 95 + 90 96 execCtx.Log("rss-output", "info", rssOutput) 91 97 92 98 return data, nil
+13
store/db.go
··· 78 78 79 79 CREATE INDEX IF NOT EXISTS idx_pipes_user_id ON pipes(user_id); 80 80 81 + -- Pipe outputs (cached output for public feeds) 82 + CREATE TABLE IF NOT EXISTS pipe_outputs ( 83 + id TEXT PRIMARY KEY, 84 + pipe_id TEXT NOT NULL REFERENCES pipes(id) ON DELETE CASCADE, 85 + format TEXT NOT NULL, 86 + content TEXT NOT NULL, 87 + content_type TEXT NOT NULL, 88 + created_at INTEGER NOT NULL 89 + ); 90 + 91 + CREATE INDEX IF NOT EXISTS idx_outputs_pipe_id ON pipe_outputs(pipe_id); 92 + CREATE UNIQUE INDEX IF NOT EXISTS idx_outputs_pipe_format ON pipe_outputs(pipe_id, format); 93 + 81 94 -- Scheduled jobs 82 95 CREATE TABLE IF NOT EXISTS scheduled_jobs ( 83 96 id TEXT PRIMARY KEY,
+49
store/pipes.go
··· 19 19 UpdatedAt int64 `json:"updated_at"` 20 20 } 21 21 22 + type PipeOutput struct { 23 + ID string `json:"id"` 24 + PipeID string `json:"pipe_id"` 25 + Format string `json:"format"` 26 + Content string `json:"content"` 27 + ContentType string `json:"content_type"` 28 + CreatedAt int64 `json:"created_at"` 29 + } 30 + 22 31 type ScheduledJob struct { 23 32 ID string 24 33 PipeID string ··· 128 137 return fmt.Errorf("delete pipe: %w", err) 129 138 } 130 139 return nil 140 + } 141 + 142 + func (db *DB) SavePipeOutput(pipeID, format, content, contentType string) error { 143 + now := time.Now().Unix() 144 + id := uuid.New().String() 145 + 146 + _, err := db.Exec(` 147 + INSERT INTO pipe_outputs (id, pipe_id, format, content, content_type, created_at) 148 + VALUES (?, ?, ?, ?, ?, ?) 149 + ON CONFLICT(pipe_id, format) DO UPDATE SET 150 + content = excluded.content, 151 + content_type = excluded.content_type, 152 + created_at = excluded.created_at 153 + `, id, pipeID, format, content, contentType, now) 154 + 155 + if err != nil { 156 + return fmt.Errorf("save pipe output: %w", err) 157 + } 158 + 159 + return nil 160 + } 161 + 162 + func (db *DB) GetPipeOutput(pipeID, format string) (*PipeOutput, error) { 163 + output := &PipeOutput{} 164 + 165 + err := db.QueryRow(` 166 + SELECT id, pipe_id, format, content, content_type, created_at 167 + FROM pipe_outputs 168 + WHERE pipe_id = ? AND format = ? 169 + `, pipeID, format).Scan(&output.ID, &output.PipeID, &output.Format, &output.Content, &output.ContentType, &output.CreatedAt) 170 + 171 + if err == sql.ErrNoRows { 172 + return nil, nil 173 + } 174 + 175 + if err != nil { 176 + return nil, fmt.Errorf("get pipe output: %w", err) 177 + } 178 + 179 + return output, nil 131 180 } 132 181 133 182 func (db *DB) CreateScheduledJob(pipeID, cronExpression string, nextRunAt int64) (*ScheduledJob, error) {
+78
web/server.go
··· 7 7 "html/template" 8 8 "net/http" 9 9 "strconv" 10 + "strings" 10 11 11 12 "github.com/charmbracelet/log" 12 13 "github.com/kierank/pipes/auth" ··· 67 68 mux.HandleFunc("/api/pipes/", s.sessionManager.RequireAuth(s.handleAPIPipe)) 68 69 mux.HandleFunc("/api/node-types", s.handleAPINodeTypes) 69 70 mux.HandleFunc("/api/executions/", s.sessionManager.RequireAuth(s.handleAPIExecution)) 71 + 72 + // Public feed routes 73 + mux.HandleFunc("/feeds/", s.handlePublicFeed) 70 74 71 75 s.server = &http.Server{ 72 76 Addr: fmt.Sprintf("%s:%d", s.cfg.Host, s.cfg.Port), ··· 322 326 Name string `json:"name"` 323 327 Description string `json:"description"` 324 328 Config map[string]interface{} `json:"config"` 329 + IsPublic *bool `json:"is_public"` 325 330 } 326 331 327 332 if err := json.NewDecoder(r.Body).Decode(&req); err != nil { ··· 339 344 configJSON, _ := json.Marshal(req.Config) 340 345 pipe.Config = string(configJSON) 341 346 } 347 + if req.IsPublic != nil { 348 + pipe.IsPublic = *req.IsPublic 349 + } 342 350 343 351 if err := s.db.UpdatePipe(pipe); err != nil { 344 352 http.Error(w, "Failed to update pipe", http.StatusInternalServerError) ··· 518 526 519 527 w.Header().Set("Content-Type", "application/json") 520 528 json.NewEncoder(w).Encode(logs) 529 + } 530 + 531 + func (s *Server) handlePublicFeed(w http.ResponseWriter, r *http.Request) { 532 + // Parse path: /feeds/{id}.{format} or /feeds/{id}/{format} 533 + path := strings.TrimPrefix(r.URL.Path, "/feeds/") 534 + if path == "" { 535 + http.Error(w, "Not found", http.StatusNotFound) 536 + return 537 + } 538 + 539 + var pipeID, format string 540 + 541 + // Check for extension format: id.json or id.rss 542 + if strings.Contains(path, ".") { 543 + parts := strings.SplitN(path, ".", 2) 544 + pipeID = parts[0] 545 + format = parts[1] 546 + } else if strings.Contains(path, "/") { 547 + // Check for path format: id/json or id/rss 548 + parts := strings.SplitN(path, "/", 2) 549 + pipeID = parts[0] 550 + format = parts[1] 551 + } else { 552 + // Default to json if no format specified 553 + pipeID = path 554 + format = "json" 555 + } 556 + 557 + // Look up pipe by ID 558 + pipe, err := s.db.GetPipe(pipeID) 559 + if err != nil { 560 + s.logger.Error("failed to get pipe", "pipe_id", pipeID, "error", err) 561 + http.Error(w, "Internal server error", http.StatusInternalServerError) 562 + return 563 + } 564 + 565 + if pipe == nil || !pipe.IsPublic { 566 + http.Error(w, "Feed not found", http.StatusNotFound) 567 + return 568 + } 569 + 570 + // Get the cached output 571 + output, err := s.db.GetPipeOutput(pipe.ID, format) 572 + if err != nil { 573 + s.logger.Error("failed to get pipe output", "pipe_id", pipe.ID, "format", format, "error", err) 574 + http.Error(w, "Internal server error", http.StatusInternalServerError) 575 + return 576 + } 577 + 578 + // Auto-run if no output exists 579 + if output == nil { 580 + executor := engine.NewExecutor(s.db) 581 + _, err := executor.Execute(r.Context(), pipe.ID, "auto") 582 + if err != nil { 583 + s.logger.Error("auto-execute failed", "pipe_id", pipe.ID, "error", err) 584 + http.Error(w, "Failed to generate feed", http.StatusInternalServerError) 585 + return 586 + } 587 + 588 + // Try to get output again 589 + output, err = s.db.GetPipeOutput(pipe.ID, format) 590 + if err != nil || output == nil { 591 + http.Error(w, "Feed not available in requested format", http.StatusNotFound) 592 + return 593 + } 594 + } 595 + 596 + w.Header().Set("Content-Type", output.ContentType) 597 + w.Header().Set("Cache-Control", "public, max-age=300") 598 + w.Write([]byte(output.Content)) 521 599 } 522 600 523 601 // Helper functions
+73
web/templates/editor.html
··· 373 373 <span class="accent">{{.Pipe.Name}}</span> 374 374 </div> 375 375 <div class="header-actions"> 376 + <label style="display: flex; align-items: center; gap: 6px; font-size: 12px; cursor: pointer;"> 377 + <input type="checkbox" id="is-public" onchange="togglePublic()" {{if .Pipe.IsPublic}}checked{{end}}> 378 + Public 379 + </label> 376 380 <button onclick="executePipe()" class="btn btn-small">▶ Run</button> 377 381 <button onclick="savePipe()" class="btn btn-small btn-secondary">💾 Save</button> 378 382 <a href="/dashboard" class="btn btn-small" style="text-decoration: none;">← Back</a> ··· 1004 1008 form.appendChild(group); 1005 1009 }); 1006 1010 1011 + // Show feed URL for output nodes 1012 + if (nodeType.category === 'output') { 1013 + const feedSection = document.createElement('div'); 1014 + feedSection.className = 'output-section'; 1015 + feedSection.style.marginTop = '20px'; 1016 + 1017 + const feedTitle = document.createElement('div'); 1018 + feedTitle.className = 'output-title'; 1019 + feedTitle.textContent = 'Public Feed URL'; 1020 + feedSection.appendChild(feedTitle); 1021 + 1022 + const format = node.type === 'rss-output' ? 'rss' : 'json'; 1023 + const feedUrl = `${window.location.origin}/feeds/${pipeID}.${format}`; 1024 + 1025 + const feedLink = document.createElement('div'); 1026 + feedLink.style.cssText = 'display: flex; gap: 8px; align-items: center;'; 1027 + 1028 + const feedInput = document.createElement('input'); 1029 + feedInput.className = 'form-input'; 1030 + feedInput.type = 'text'; 1031 + feedInput.value = feedUrl; 1032 + feedInput.readOnly = true; 1033 + feedInput.style.flex = '1'; 1034 + feedLink.appendChild(feedInput); 1035 + 1036 + const copyBtn = document.createElement('button'); 1037 + copyBtn.className = 'btn btn-small'; 1038 + copyBtn.textContent = '📋'; 1039 + copyBtn.title = 'Copy URL'; 1040 + copyBtn.onclick = () => { 1041 + navigator.clipboard.writeText(feedUrl); 1042 + showToast('URL copied!', 'success'); 1043 + }; 1044 + feedLink.appendChild(copyBtn); 1045 + 1046 + const openBtn = document.createElement('button'); 1047 + openBtn.className = 'btn btn-small'; 1048 + openBtn.textContent = '↗'; 1049 + openBtn.title = 'Open feed'; 1050 + openBtn.onclick = () => window.open(feedUrl, '_blank'); 1051 + feedLink.appendChild(openBtn); 1052 + 1053 + feedSection.appendChild(feedLink); 1054 + 1055 + const feedHelp = document.createElement('div'); 1056 + feedHelp.className = 'form-help'; 1057 + feedHelp.textContent = 'Make pipe public and run it to generate the feed.'; 1058 + feedSection.appendChild(feedHelp); 1059 + 1060 + form.appendChild(feedSection); 1061 + } 1062 + 1007 1063 // Data display section (always visible) 1008 1064 const dataSection = document.createElement('div'); 1009 1065 dataSection.className = 'output-section'; ··· 1057 1113 } 1058 1114 1059 1115 let executionStatusInterval = null; 1116 + 1117 + async function togglePublic() { 1118 + const isPublic = document.getElementById('is-public').checked; 1119 + 1120 + const res = await fetch(`/api/pipes/${pipeID}`, { 1121 + method: 'PUT', 1122 + headers: { 'Content-Type': 'application/json' }, 1123 + body: JSON.stringify({ is_public: isPublic }) 1124 + }); 1125 + 1126 + if (res.ok) { 1127 + showToast(isPublic ? 'Pipe is now public' : 'Pipe is now private', 'success'); 1128 + } else { 1129 + showToast('Failed to update visibility', 'error'); 1130 + document.getElementById('is-public').checked = !isPublic; 1131 + } 1132 + } 1060 1133 1061 1134 async function viewNodeData(nodeID) { 1062 1135 const dataContent = document.getElementById(`data-content-${nodeID}`);