bring back yahoo pipes!
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add more nodes

+963 -44
+6
engine/registry.go
··· 23 23 // Register built-in nodes 24 24 // Sources 25 25 r.Register(&sources.RSSSourceNode{}) 26 + r.Register(&sources.HTTPSourceNode{}) 26 27 27 28 // Transforms 28 29 r.Register(&transforms.FilterNode{}) 29 30 r.Register(&transforms.SortNode{}) 30 31 r.Register(&transforms.LimitNode{}) 32 + r.Register(&transforms.MergeNode{}) 33 + r.Register(&transforms.MapNode{}) 34 + r.Register(&transforms.RegexNode{}) 35 + r.Register(&transforms.TruncateNode{}) 31 36 32 37 // Outputs 33 38 r.Register(&outputs.JSONOutputNode{}) 34 39 r.Register(&outputs.RSSOutputNode{}) 40 + r.Register(&outputs.WebhookOutputNode{}) 35 41 36 42 return r 37 43 }
+109
nodes/outputs/webhook.go
··· 1 + package outputs 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "net/http" 9 + "strings" 10 + "time" 11 + 12 + "github.com/kierank/pipes/nodes" 13 + ) 14 + 15 + type WebhookOutputNode struct{} 16 + 17 + func (n *WebhookOutputNode) Type() string { return "webhook-output" } 18 + func (n *WebhookOutputNode) Label() string { return "Webhook" } 19 + func (n *WebhookOutputNode) Description() string { return "POST data to a webhook URL" } 20 + func (n *WebhookOutputNode) Category() string { return "output" } 21 + func (n *WebhookOutputNode) Inputs() int { return 1 } 22 + func (n *WebhookOutputNode) Outputs() int { return 0 } 23 + 24 + func (n *WebhookOutputNode) Execute(ctx context.Context, config map[string]interface{}, inputs [][]interface{}, execCtx *nodes.Context) ([]interface{}, error) { 25 + if len(inputs) == 0 || len(inputs[0]) == 0 { 26 + execCtx.Log("webhook-output", "info", "No input data") 27 + return nil, nil 28 + } 29 + 30 + url, ok := config["url"].(string) 31 + if !ok || url == "" { 32 + return nil, fmt.Errorf("webhook URL is required") 33 + } 34 + 35 + data := inputs[0] 36 + 37 + payload := map[string]interface{}{ 38 + "count": len(data), 39 + "items": data, 40 + } 41 + 42 + jsonData, err := json.Marshal(payload) 43 + if err != nil { 44 + return nil, fmt.Errorf("marshal payload: %w", err) 45 + } 46 + 47 + client := &http.Client{Timeout: 30 * time.Second} 48 + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(jsonData)) 49 + if err != nil { 50 + return nil, fmt.Errorf("create request: %w", err) 51 + } 52 + 53 + req.Header.Set("Content-Type", "application/json") 54 + req.Header.Set("User-Agent", "Pipes/1.0") 55 + 56 + // Add custom headers 57 + if headers, ok := config["headers"].(string); ok && headers != "" { 58 + for _, line := range strings.Split(headers, "\n") { 59 + if parts := strings.SplitN(strings.TrimSpace(line), ":", 2); len(parts) == 2 { 60 + req.Header.Set(strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1])) 61 + } 62 + } 63 + } 64 + 65 + resp, err := client.Do(req) 66 + if err != nil { 67 + return nil, fmt.Errorf("webhook request failed: %w", err) 68 + } 69 + defer resp.Body.Close() 70 + 71 + if resp.StatusCode >= 400 { 72 + return nil, fmt.Errorf("webhook returned HTTP %d", resp.StatusCode) 73 + } 74 + 75 + execCtx.Log("webhook-output", "info", fmt.Sprintf("Posted %d items to webhook (HTTP %d)", len(data), resp.StatusCode)) 76 + 77 + return data, nil 78 + } 79 + 80 + func (n *WebhookOutputNode) ValidateConfig(config map[string]interface{}) error { 81 + url, ok := config["url"].(string) 82 + if !ok || url == "" { 83 + return fmt.Errorf("webhook URL is required") 84 + } 85 + return nil 86 + } 87 + 88 + func (n *WebhookOutputNode) GetConfigSchema() *nodes.ConfigSchema { 89 + return &nodes.ConfigSchema{ 90 + Fields: []nodes.ConfigField{ 91 + { 92 + Name: "url", 93 + Label: "Webhook URL", 94 + Type: "url", 95 + Required: true, 96 + Placeholder: "https://example.com/webhook", 97 + HelpText: "URL to POST data to", 98 + }, 99 + { 100 + Name: "headers", 101 + Label: "Headers", 102 + Type: "textarea", 103 + Required: false, 104 + Placeholder: "Authorization: Bearer token", 105 + HelpText: "Custom headers, one per line as Header: Value", 106 + }, 107 + }, 108 + } 109 + }
+165
nodes/sources/http.go
··· 1 + package sources 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "io" 8 + "net/http" 9 + "strings" 10 + "time" 11 + 12 + "github.com/kierank/pipes/nodes" 13 + ) 14 + 15 + type HTTPSourceNode struct{} 16 + 17 + func (n *HTTPSourceNode) Type() string { return "http-source" } 18 + func (n *HTTPSourceNode) Label() string { return "HTTP/JSON" } 19 + func (n *HTTPSourceNode) Description() string { return "Fetch data from a JSON API" } 20 + func (n *HTTPSourceNode) Category() string { return "source" } 21 + func (n *HTTPSourceNode) Inputs() int { return 0 } 22 + func (n *HTTPSourceNode) Outputs() int { return 1 } 23 + 24 + func (n *HTTPSourceNode) Execute(ctx context.Context, config map[string]interface{}, inputs [][]interface{}, execCtx *nodes.Context) ([]interface{}, error) { 25 + url, ok := config["url"].(string) 26 + if !ok || url == "" { 27 + return nil, fmt.Errorf("url is required") 28 + } 29 + 30 + execCtx.Log("http-source", "info", fmt.Sprintf("Fetching %s", url)) 31 + 32 + client := &http.Client{Timeout: 30 * time.Second} 33 + 34 + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 35 + if err != nil { 36 + return nil, fmt.Errorf("create request: %w", err) 37 + } 38 + 39 + // Add custom headers 40 + if headers, ok := config["headers"].(string); ok && headers != "" { 41 + for _, line := range strings.Split(headers, "\n") { 42 + if parts := strings.SplitN(strings.TrimSpace(line), ":", 2); len(parts) == 2 { 43 + req.Header.Set(strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1])) 44 + } 45 + } 46 + } 47 + 48 + req.Header.Set("User-Agent", "Pipes/1.0") 49 + 50 + resp, err := client.Do(req) 51 + if err != nil { 52 + return nil, fmt.Errorf("fetch: %w", err) 53 + } 54 + defer resp.Body.Close() 55 + 56 + if resp.StatusCode != http.StatusOK { 57 + return nil, fmt.Errorf("HTTP %d", resp.StatusCode) 58 + } 59 + 60 + body, err := io.ReadAll(resp.Body) 61 + if err != nil { 62 + return nil, fmt.Errorf("read body: %w", err) 63 + } 64 + 65 + // Parse JSON 66 + var data interface{} 67 + if err := json.Unmarshal(body, &data); err != nil { 68 + return nil, fmt.Errorf("parse JSON: %w", err) 69 + } 70 + 71 + // Extract items from a path if specified 72 + itemsPath, _ := config["items_path"].(string) 73 + if itemsPath != "" { 74 + data = extractPath(data, itemsPath) 75 + } 76 + 77 + // Convert to array 78 + var items []interface{} 79 + switch v := data.(type) { 80 + case []interface{}: 81 + items = v 82 + case map[string]interface{}: 83 + items = []interface{}{v} 84 + default: 85 + items = []interface{}{data} 86 + } 87 + 88 + // Apply limit 89 + if limit, ok := config["limit"].(float64); ok && limit > 0 && int(limit) < len(items) { 90 + items = items[:int(limit)] 91 + } 92 + 93 + execCtx.Log("http-source", "info", fmt.Sprintf("Retrieved %d items", len(items))) 94 + return items, nil 95 + } 96 + 97 + func extractPath(data interface{}, path string) interface{} { 98 + parts := strings.Split(path, ".") 99 + current := data 100 + 101 + for _, part := range parts { 102 + if m, ok := current.(map[string]interface{}); ok { 103 + current = m[part] 104 + } else if arr, ok := current.([]interface{}); ok { 105 + // Try to access array by index if part is numeric 106 + var idx int 107 + if _, err := fmt.Sscanf(part, "%d", &idx); err == nil && idx < len(arr) { 108 + current = arr[idx] 109 + } else { 110 + return nil 111 + } 112 + } else { 113 + return nil 114 + } 115 + } 116 + 117 + return current 118 + } 119 + 120 + func (n *HTTPSourceNode) ValidateConfig(config map[string]interface{}) error { 121 + url, ok := config["url"].(string) 122 + if !ok || url == "" { 123 + return fmt.Errorf("url is required") 124 + } 125 + return nil 126 + } 127 + 128 + func (n *HTTPSourceNode) GetConfigSchema() *nodes.ConfigSchema { 129 + return &nodes.ConfigSchema{ 130 + Fields: []nodes.ConfigField{ 131 + { 132 + Name: "url", 133 + Label: "URL", 134 + Type: "url", 135 + Required: true, 136 + Placeholder: "https://api.example.com/data.json", 137 + HelpText: "URL of the JSON API endpoint", 138 + }, 139 + { 140 + Name: "items_path", 141 + Label: "Items Path", 142 + Type: "text", 143 + Required: false, 144 + Placeholder: "data.items", 145 + HelpText: "Dot-notation path to the array of items (e.g., results, data.posts)", 146 + }, 147 + { 148 + Name: "headers", 149 + Label: "Headers", 150 + Type: "textarea", 151 + Required: false, 152 + Placeholder: "Authorization: Bearer token\nAccept: application/json", 153 + HelpText: "Custom headers, one per line as Header: Value", 154 + }, 155 + { 156 + Name: "limit", 157 + Label: "Limit", 158 + Type: "number", 159 + Required: false, 160 + DefaultValue: 50, 161 + HelpText: "Maximum number of items", 162 + }, 163 + }, 164 + } 165 + }
+82 -9
nodes/sources/rss.go
··· 3 3 import ( 4 4 "context" 5 5 "fmt" 6 + "time" 6 7 7 8 "github.com/mmcdole/gofeed" 8 9 ··· 41 42 if item.Author != nil { 42 43 author = item.Author.Name 43 44 } 44 - 45 + 46 + // Parse dates to Unix timestamps for proper sorting 47 + var publishedAt int64 48 + var updatedAt int64 49 + if item.PublishedParsed != nil { 50 + publishedAt = item.PublishedParsed.Unix() 51 + } else if item.Published != "" { 52 + if t, err := parseDate(item.Published); err == nil { 53 + publishedAt = t.Unix() 54 + } 55 + } 56 + if item.UpdatedParsed != nil { 57 + updatedAt = item.UpdatedParsed.Unix() 58 + } else if item.Updated != "" { 59 + if t, err := parseDate(item.Updated); err == nil { 60 + updatedAt = t.Unix() 61 + } 62 + } 63 + 64 + // Extract content - prefer Content over Description 65 + content := item.Description 66 + if item.Content != "" { 67 + content = item.Content 68 + } 69 + 70 + // Build enclosures array (for media like images, audio, video) 71 + var enclosures []map[string]interface{} 72 + if len(item.Enclosures) > 0 { 73 + for _, enc := range item.Enclosures { 74 + enclosures = append(enclosures, map[string]interface{}{ 75 + "url": enc.URL, 76 + "type": enc.Type, 77 + "length": enc.Length, 78 + }) 79 + } 80 + } 81 + 82 + // Extract image URL if available 83 + var imageURL string 84 + if item.Image != nil { 85 + imageURL = item.Image.URL 86 + } 87 + 45 88 items = append(items, map[string]interface{}{ 46 - "title": item.Title, 47 - "description": item.Description, 48 - "link": item.Link, 49 - "author": author, 50 - "published": item.Published, 51 - "updated": item.Updated, 52 - "guid": item.GUID, 53 - "categories": item.Categories, 89 + "title": item.Title, 90 + "description": item.Description, 91 + "content": content, 92 + "link": item.Link, 93 + "author": author, 94 + "published": item.Published, 95 + "published_at": publishedAt, 96 + "updated": item.Updated, 97 + "updated_at": updatedAt, 98 + "guid": item.GUID, 99 + "categories": item.Categories, 100 + "enclosures": enclosures, 101 + "image": imageURL, 54 102 }) 55 103 } 56 104 ··· 97 145 }, 98 146 } 99 147 } 148 + 149 + // parseDate tries multiple date formats 150 + func parseDate(s string) (time.Time, error) { 151 + formats := []string{ 152 + time.RFC1123Z, 153 + time.RFC1123, 154 + time.RFC3339, 155 + time.RFC822Z, 156 + time.RFC822, 157 + "Mon, 2 Jan 2006 15:04:05 MST", 158 + "Mon, 2 Jan 2006 15:04:05 -0700", 159 + "2006-01-02T15:04:05Z", 160 + "2006-01-02T15:04:05-07:00", 161 + "2006-01-02 15:04:05", 162 + "2006-01-02", 163 + } 164 + 165 + for _, format := range formats { 166 + if t, err := time.Parse(format, s); err == nil { 167 + return t, nil 168 + } 169 + } 170 + 171 + return time.Time{}, fmt.Errorf("unable to parse date: %s", s) 172 + }
-15
nodes/transforms/filter.go
··· 69 69 } 70 70 } 71 71 72 - func getNestedValue(obj map[string]interface{}, path string) interface{} { 73 - parts := strings.Split(path, ".") 74 - var current interface{} = obj 75 - 76 - for _, part := range parts { 77 - if m, ok := current.(map[string]interface{}); ok { 78 - current = m[part] 79 - } else { 80 - return nil 81 - } 82 - } 83 - 84 - return current 85 - } 86 - 87 72 func (n *FilterNode) ValidateConfig(config map[string]interface{}) error { 88 73 return nil 89 74 }
+37
nodes/transforms/helpers.go
··· 1 + package transforms 2 + 3 + import "strings" 4 + 5 + // getNestedValue retrieves a value from a nested map using dot notation 6 + func getNestedValue(obj map[string]interface{}, path string) interface{} { 7 + parts := strings.Split(path, ".") 8 + var current interface{} = obj 9 + 10 + for _, part := range parts { 11 + if m, ok := current.(map[string]interface{}); ok { 12 + current = m[part] 13 + } else { 14 + return nil 15 + } 16 + } 17 + 18 + return current 19 + } 20 + 21 + // toFloat attempts to convert various numeric types to float64 22 + func toFloat(v interface{}) (float64, bool) { 23 + switch val := v.(type) { 24 + case float64: 25 + return val, true 26 + case float32: 27 + return float64(val), true 28 + case int: 29 + return float64(val), true 30 + case int64: 31 + return float64(val), true 32 + case int32: 33 + return float64(val), true 34 + default: 35 + return 0, false 36 + } 37 + }
+104
nodes/transforms/map.go
··· 1 + package transforms 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "strings" 7 + 8 + "github.com/kierank/pipes/nodes" 9 + ) 10 + 11 + type MapNode struct{} 12 + 13 + func (n *MapNode) Type() string { return "map" } 14 + func (n *MapNode) Label() string { return "Map Fields" } 15 + func (n *MapNode) Description() string { return "Rename, extract, or create new fields" } 16 + func (n *MapNode) Category() string { return "transform" } 17 + func (n *MapNode) Inputs() int { return 1 } 18 + func (n *MapNode) Outputs() int { return 1 } 19 + 20 + func (n *MapNode) Execute(ctx context.Context, config map[string]interface{}, inputs [][]interface{}, execCtx *nodes.Context) ([]interface{}, error) { 21 + if len(inputs) == 0 || len(inputs[0]) == 0 { 22 + return []interface{}{}, nil 23 + } 24 + 25 + items := inputs[0] 26 + mappings, _ := config["mappings"].(string) 27 + keepOriginal, _ := config["keep_original"].(bool) 28 + 29 + if mappings == "" { 30 + return items, nil 31 + } 32 + 33 + // Parse mappings: "newField:sourceField, title:name" 34 + fieldMap := parseMappings(mappings) 35 + 36 + var result []interface{} 37 + for _, item := range items { 38 + itemMap, ok := item.(map[string]interface{}) 39 + if !ok { 40 + result = append(result, item) 41 + continue 42 + } 43 + 44 + var newItem map[string]interface{} 45 + if keepOriginal { 46 + newItem = make(map[string]interface{}) 47 + for k, v := range itemMap { 48 + newItem[k] = v 49 + } 50 + } else { 51 + newItem = make(map[string]interface{}) 52 + } 53 + 54 + for newField, sourceField := range fieldMap { 55 + if val := getNestedValue(itemMap, sourceField); val != nil { 56 + newItem[newField] = val 57 + } 58 + } 59 + 60 + result = append(result, newItem) 61 + } 62 + 63 + execCtx.Log("map", "info", fmt.Sprintf("Mapped %d items", len(result))) 64 + return result, nil 65 + } 66 + 67 + func parseMappings(s string) map[string]string { 68 + result := make(map[string]string) 69 + parts := strings.Split(s, ",") 70 + for _, part := range parts { 71 + part = strings.TrimSpace(part) 72 + if kv := strings.SplitN(part, ":", 2); len(kv) == 2 { 73 + result[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1]) 74 + } 75 + } 76 + return result 77 + } 78 + 79 + func (n *MapNode) ValidateConfig(config map[string]interface{}) error { 80 + return nil 81 + } 82 + 83 + func (n *MapNode) GetConfigSchema() *nodes.ConfigSchema { 84 + return &nodes.ConfigSchema{ 85 + Fields: []nodes.ConfigField{ 86 + { 87 + Name: "mappings", 88 + Label: "Field Mappings", 89 + Type: "textarea", 90 + Required: true, 91 + Placeholder: "title:name, url:link, summary:description", 92 + HelpText: "Map fields as newField:sourceField, separated by commas. Use dot notation for nested fields.", 93 + }, 94 + { 95 + Name: "keep_original", 96 + Label: "Keep Original Fields", 97 + Type: "checkbox", 98 + Required: false, 99 + DefaultValue: true, 100 + HelpText: "Keep all original fields in addition to mapped ones", 101 + }, 102 + }, 103 + } 104 + }
+137
nodes/transforms/merge.go
··· 1 + package transforms 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "sort" 7 + 8 + "github.com/kierank/pipes/nodes" 9 + ) 10 + 11 + type MergeNode struct{} 12 + 13 + func (n *MergeNode) Type() string { return "merge" } 14 + func (n *MergeNode) Label() string { return "Merge" } 15 + func (n *MergeNode) Description() string { return "Combine multiple feeds into one" } 16 + func (n *MergeNode) Category() string { return "transform" } 17 + func (n *MergeNode) Inputs() int { return 2 } 18 + func (n *MergeNode) Outputs() int { return 1 } 19 + 20 + func (n *MergeNode) Execute(ctx context.Context, config map[string]interface{}, inputs [][]interface{}, execCtx *nodes.Context) ([]interface{}, error) { 21 + if len(inputs) == 0 { 22 + return []interface{}{}, nil 23 + } 24 + 25 + var merged []interface{} 26 + for _, input := range inputs { 27 + merged = append(merged, input...) 28 + } 29 + 30 + // Optionally dedupe by a field 31 + dedupeField, _ := config["dedupe_field"].(string) 32 + if dedupeField != "" { 33 + merged = dedupeByField(merged, dedupeField) 34 + } 35 + 36 + // Optionally sort by a field 37 + sortField, _ := config["sort_field"].(string) 38 + sortOrder, _ := config["sort_order"].(string) 39 + if sortField != "" { 40 + sortItems(merged, sortField, sortOrder == "desc") 41 + } 42 + 43 + execCtx.Log("merge", "info", fmt.Sprintf("Merged %d inputs into %d items", len(inputs), len(merged))) 44 + 45 + return merged, nil 46 + } 47 + 48 + func dedupeByField(items []interface{}, field string) []interface{} { 49 + seen := make(map[string]bool) 50 + var result []interface{} 51 + 52 + for _, item := range items { 53 + itemMap, ok := item.(map[string]interface{}) 54 + if !ok { 55 + result = append(result, item) 56 + continue 57 + } 58 + 59 + key := fmt.Sprintf("%v", itemMap[field]) 60 + if !seen[key] { 61 + seen[key] = true 62 + result = append(result, item) 63 + } 64 + } 65 + 66 + return result 67 + } 68 + 69 + func sortItems(items []interface{}, field string, desc bool) { 70 + sort.SliceStable(items, func(i, j int) bool { 71 + iMap, iOk := items[i].(map[string]interface{}) 72 + jMap, jOk := items[j].(map[string]interface{}) 73 + if !iOk || !jOk { 74 + return false 75 + } 76 + 77 + iRaw := getNestedValue(iMap, field) 78 + jRaw := getNestedValue(jMap, field) 79 + 80 + // Try numeric comparison first (for timestamps, etc.) 81 + iNum, iIsNum := toFloat(iRaw) 82 + jNum, jIsNum := toFloat(jRaw) 83 + 84 + if iIsNum && jIsNum { 85 + if desc { 86 + return iNum > jNum 87 + } 88 + return iNum < jNum 89 + } 90 + 91 + // Fall back to string comparison 92 + iVal := fmt.Sprintf("%v", iRaw) 93 + jVal := fmt.Sprintf("%v", jRaw) 94 + 95 + if desc { 96 + return iVal > jVal 97 + } 98 + return iVal < jVal 99 + }) 100 + } 101 + 102 + func (n *MergeNode) ValidateConfig(config map[string]interface{}) error { 103 + return nil 104 + } 105 + 106 + func (n *MergeNode) GetConfigSchema() *nodes.ConfigSchema { 107 + return &nodes.ConfigSchema{ 108 + Fields: []nodes.ConfigField{ 109 + { 110 + Name: "dedupe_field", 111 + Label: "Dedupe Field", 112 + Type: "text", 113 + Required: false, 114 + Placeholder: "link", 115 + HelpText: "Remove duplicates based on this field (e.g., link, guid)", 116 + }, 117 + { 118 + Name: "sort_field", 119 + Label: "Sort By", 120 + Type: "text", 121 + Required: false, 122 + Placeholder: "published_at", 123 + HelpText: "Field to sort merged results by (use published_at for date sorting)", 124 + }, 125 + { 126 + Name: "sort_order", 127 + Label: "Sort Order", 128 + Type: "select", 129 + Required: false, 130 + Options: []nodes.FieldOption{ 131 + {Value: "desc", Label: "Newest First"}, 132 + {Value: "asc", Label: "Oldest First"}, 133 + }, 134 + }, 135 + }, 136 + } 137 + }
+108
nodes/transforms/regex.go
··· 1 + package transforms 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "regexp" 7 + 8 + "github.com/kierank/pipes/nodes" 9 + ) 10 + 11 + type RegexNode struct{} 12 + 13 + func (n *RegexNode) Type() string { return "regex" } 14 + func (n *RegexNode) Label() string { return "Regex Replace" } 15 + func (n *RegexNode) Description() string { return "Search and replace text using regex" } 16 + func (n *RegexNode) Category() string { return "transform" } 17 + func (n *RegexNode) Inputs() int { return 1 } 18 + func (n *RegexNode) Outputs() int { return 1 } 19 + 20 + func (n *RegexNode) Execute(ctx context.Context, config map[string]interface{}, inputs [][]interface{}, execCtx *nodes.Context) ([]interface{}, error) { 21 + if len(inputs) == 0 || len(inputs[0]) == 0 { 22 + return []interface{}{}, nil 23 + } 24 + 25 + items := inputs[0] 26 + field, _ := config["field"].(string) 27 + pattern, _ := config["pattern"].(string) 28 + replacement, _ := config["replacement"].(string) 29 + 30 + if field == "" || pattern == "" { 31 + return items, nil 32 + } 33 + 34 + re, err := regexp.Compile(pattern) 35 + if err != nil { 36 + return nil, fmt.Errorf("invalid regex: %w", err) 37 + } 38 + 39 + var result []interface{} 40 + modified := 0 41 + 42 + for _, item := range items { 43 + itemMap, ok := item.(map[string]interface{}) 44 + if !ok { 45 + result = append(result, item) 46 + continue 47 + } 48 + 49 + newItem := make(map[string]interface{}) 50 + for k, v := range itemMap { 51 + newItem[k] = v 52 + } 53 + 54 + if val, ok := newItem[field].(string); ok { 55 + newVal := re.ReplaceAllString(val, replacement) 56 + if newVal != val { 57 + modified++ 58 + } 59 + newItem[field] = newVal 60 + } 61 + 62 + result = append(result, newItem) 63 + } 64 + 65 + execCtx.Log("regex", "info", fmt.Sprintf("Modified %d of %d items", modified, len(result))) 66 + return result, nil 67 + } 68 + 69 + func (n *RegexNode) ValidateConfig(config map[string]interface{}) error { 70 + pattern, _ := config["pattern"].(string) 71 + if pattern != "" { 72 + if _, err := regexp.Compile(pattern); err != nil { 73 + return fmt.Errorf("invalid regex pattern: %w", err) 74 + } 75 + } 76 + return nil 77 + } 78 + 79 + func (n *RegexNode) GetConfigSchema() *nodes.ConfigSchema { 80 + return &nodes.ConfigSchema{ 81 + Fields: []nodes.ConfigField{ 82 + { 83 + Name: "field", 84 + Label: "Field", 85 + Type: "text", 86 + Required: true, 87 + Placeholder: "title", 88 + HelpText: "Field to apply regex to", 89 + }, 90 + { 91 + Name: "pattern", 92 + Label: "Pattern", 93 + Type: "text", 94 + Required: true, 95 + Placeholder: "\\[.*?\\]", 96 + HelpText: "Regex pattern to match", 97 + }, 98 + { 99 + Name: "replacement", 100 + Label: "Replacement", 101 + Type: "text", 102 + Required: false, 103 + Placeholder: "", 104 + HelpText: "Text to replace matches with (use $1, $2 for groups)", 105 + }, 106 + }, 107 + } 108 + }
+24 -8
nodes/transforms/sort.go
··· 31 31 } 32 32 33 33 if order == "" { 34 - order = "asc" 34 + order = "desc" 35 35 } 36 36 37 37 // Create a sortable slice ··· 46 46 return false 47 47 } 48 48 49 - iVal := fmt.Sprintf("%v", getNestedValue(iMap, field)) 50 - jVal := fmt.Sprintf("%v", getNestedValue(jMap, field)) 49 + iRaw := getNestedValue(iMap, field) 50 + jRaw := getNestedValue(jMap, field) 51 + 52 + // Try numeric comparison first (for timestamps, etc.) 53 + iNum, iIsNum := toFloat(iRaw) 54 + jNum, jIsNum := toFloat(jRaw) 55 + 56 + if iIsNum && jIsNum { 57 + if order == "desc" { 58 + return iNum > jNum 59 + } 60 + return iNum < jNum 61 + } 62 + 63 + // Fall back to string comparison 64 + iVal := fmt.Sprintf("%v", iRaw) 65 + jVal := fmt.Sprintf("%v", jRaw) 51 66 52 67 if order == "desc" { 53 68 return iVal > jVal ··· 72 87 Label: "Field Path", 73 88 Type: "text", 74 89 Required: true, 75 - Placeholder: "published", 76 - HelpText: "Field to sort by", 90 + Placeholder: "published_at", 91 + HelpText: "Field to sort by (use published_at or updated_at for date sorting)", 77 92 }, 78 93 { 79 94 Name: "order", 80 95 Label: "Order", 81 96 Type: "select", 82 97 Required: false, 83 - DefaultValue: "asc", 98 + DefaultValue: "desc", 99 + HelpText: "Descending = newest first (for dates), Ascending = oldest first", 84 100 Options: []nodes.FieldOption{ 85 - {Value: "asc", Label: "Ascending"}, 86 - {Value: "desc", Label: "Descending"}, 101 + {Value: "desc", Label: "Descending (newest first)"}, 102 + {Value: "asc", Label: "Ascending (oldest first)"}, 87 103 }, 88 104 }, 89 105 },
+123
nodes/transforms/truncate.go
··· 1 + package transforms 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "strings" 7 + 8 + "github.com/kierank/pipes/nodes" 9 + ) 10 + 11 + type TruncateNode struct{} 12 + 13 + func (n *TruncateNode) Type() string { return "truncate" } 14 + func (n *TruncateNode) Label() string { return "Truncate" } 15 + func (n *TruncateNode) Description() string { return "Limit text length in a field" } 16 + func (n *TruncateNode) Category() string { return "transform" } 17 + func (n *TruncateNode) Inputs() int { return 1 } 18 + func (n *TruncateNode) Outputs() int { return 1 } 19 + 20 + func (n *TruncateNode) Execute(ctx context.Context, config map[string]interface{}, inputs [][]interface{}, execCtx *nodes.Context) ([]interface{}, error) { 21 + if len(inputs) == 0 || len(inputs[0]) == 0 { 22 + return []interface{}{}, nil 23 + } 24 + 25 + items := inputs[0] 26 + field, _ := config["field"].(string) 27 + maxLength := 200 28 + if ml, ok := config["max_length"].(float64); ok { 29 + maxLength = int(ml) 30 + } 31 + suffix, _ := config["suffix"].(string) 32 + if suffix == "" { 33 + suffix = "..." 34 + } 35 + 36 + if field == "" { 37 + return items, nil 38 + } 39 + 40 + var result []interface{} 41 + for _, item := range items { 42 + itemMap, ok := item.(map[string]interface{}) 43 + if !ok { 44 + result = append(result, item) 45 + continue 46 + } 47 + 48 + newItem := make(map[string]interface{}) 49 + for k, v := range itemMap { 50 + newItem[k] = v 51 + } 52 + 53 + if val, ok := newItem[field].(string); ok { 54 + // Strip HTML tags first 55 + val = stripHTML(val) 56 + if len(val) > maxLength { 57 + // Find last space before maxLength to avoid cutting words 58 + cutoff := maxLength 59 + if idx := strings.LastIndex(val[:maxLength], " "); idx > maxLength/2 { 60 + cutoff = idx 61 + } 62 + newItem[field] = strings.TrimSpace(val[:cutoff]) + suffix 63 + } else { 64 + newItem[field] = val 65 + } 66 + } 67 + 68 + result = append(result, newItem) 69 + } 70 + 71 + execCtx.Log("truncate", "info", fmt.Sprintf("Truncated %d items", len(result))) 72 + return result, nil 73 + } 74 + 75 + func stripHTML(s string) string { 76 + var result strings.Builder 77 + inTag := false 78 + for _, r := range s { 79 + if r == '<' { 80 + inTag = true 81 + } else if r == '>' { 82 + inTag = false 83 + } else if !inTag { 84 + result.WriteRune(r) 85 + } 86 + } 87 + return strings.TrimSpace(result.String()) 88 + } 89 + 90 + func (n *TruncateNode) ValidateConfig(config map[string]interface{}) error { 91 + return nil 92 + } 93 + 94 + func (n *TruncateNode) GetConfigSchema() *nodes.ConfigSchema { 95 + return &nodes.ConfigSchema{ 96 + Fields: []nodes.ConfigField{ 97 + { 98 + Name: "field", 99 + Label: "Field", 100 + Type: "text", 101 + Required: true, 102 + Placeholder: "description", 103 + HelpText: "Field to truncate", 104 + }, 105 + { 106 + Name: "max_length", 107 + Label: "Max Length", 108 + Type: "number", 109 + Required: false, 110 + DefaultValue: 200, 111 + HelpText: "Maximum character length", 112 + }, 113 + { 114 + Name: "suffix", 115 + Label: "Suffix", 116 + Type: "text", 117 + Required: false, 118 + DefaultValue: "...", 119 + HelpText: "Text to append when truncated", 120 + }, 121 + }, 122 + } 123 + }
+25
web/server.go
··· 14 14 "github.com/kierank/pipes/config" 15 15 "github.com/kierank/pipes/engine" 16 16 "github.com/kierank/pipes/store" 17 + "github.com/mmcdole/gofeed" 17 18 ) 18 19 19 20 type Server struct { ··· 68 69 mux.HandleFunc("/api/pipes/", s.sessionManager.RequireAuth(s.handleAPIPipe)) 69 70 mux.HandleFunc("/api/node-types", s.handleAPINodeTypes) 70 71 mux.HandleFunc("/api/executions/", s.sessionManager.RequireAuth(s.handleAPIExecution)) 72 + mux.HandleFunc("/api/feed-info", s.sessionManager.RequireAuth(s.handleAPIFeedInfo)) 71 73 72 74 // Public feed routes 73 75 mux.HandleFunc("/feeds/", s.handlePublicFeed) ··· 398 400 399 401 w.Header().Set("Content-Type", "application/json") 400 402 json.NewEncoder(w).Encode(nodeTypes) 403 + } 404 + 405 + func (s *Server) handleAPIFeedInfo(w http.ResponseWriter, r *http.Request) { 406 + url := r.URL.Query().Get("url") 407 + if url == "" { 408 + http.Error(w, "url parameter required", http.StatusBadRequest) 409 + return 410 + } 411 + 412 + fp := gofeed.NewParser() 413 + feed, err := fp.ParseURLWithContext(url, r.Context()) 414 + if err != nil { 415 + http.Error(w, fmt.Sprintf("Failed to parse feed: %v", err), http.StatusBadRequest) 416 + return 417 + } 418 + 419 + w.Header().Set("Content-Type", "application/json") 420 + json.NewEncoder(w).Encode(map[string]interface{}{ 421 + "title": feed.Title, 422 + "description": feed.Description, 423 + "link": feed.Link, 424 + "item_count": len(feed.Items), 425 + }) 401 426 } 402 427 403 428 func (s *Server) handlePipeExecute(w http.ResponseWriter, r *http.Request, pipeID string, user *store.User) {
+43 -12
web/templates/editor.html
··· 818 818 if (!draggedNode) return; 819 819 820 820 const containerRect = document.getElementById('canvas-container').getBoundingClientRect(); 821 - draggedNode.position.x = e.clientX - containerRect.left - dragOffset.x; 822 - draggedNode.position.y = e.clientY - containerRect.top - dragOffset.y; 821 + draggedNode.position.x = e.clientX - containerRect.left - dragOffset.x - panOffset.x; 822 + draggedNode.position.y = e.clientY - containerRect.top - dragOffset.y - panOffset.y; 823 823 824 824 render(); 825 825 } ··· 867 867 868 868 function onConnectionMouseMove(e) { 869 869 const containerRect = document.getElementById('canvas-container').getBoundingClientRect(); 870 - const mouseX = e.clientX - containerRect.left; 871 - const mouseY = e.clientY - containerRect.top; 870 + const mouseX = e.clientX - containerRect.left - panOffset.x; 871 + const mouseY = e.clientY - containerRect.top - panOffset.y; 872 872 873 873 connectionDrag = { x: mouseX, y: mouseY }; 874 874 ··· 992 992 } 993 993 994 994 input.value = node.config[field.name] || field.defaultValue || ''; 995 - input.addEventListener('change', (e) => { 995 + input.addEventListener('change', async (e) => { 996 996 node.config[field.name] = e.target.value; 997 + 998 + // Auto-fetch feed title for RSS source URL field 999 + if (node.type === 'rss-source' && field.name === 'url' && e.target.value) { 1000 + try { 1001 + const res = await fetch(`/api/feed-info?url=${encodeURIComponent(e.target.value)}`); 1002 + if (res.ok) { 1003 + const info = await res.json(); 1004 + if (info.title && node.label === 'RSS Feed') { 1005 + node.label = info.title; 1006 + render(); 1007 + showToast(`Feed: ${info.title}`, 'success'); 1008 + } 1009 + } 1010 + } catch (err) { 1011 + // Ignore fetch errors 1012 + } 1013 + } 997 1014 }); 998 1015 999 1016 group.appendChild(input); ··· 1232 1249 } 1233 1250 1234 1251 async function executePipe() { 1252 + // Save first 1253 + await savePipe(); 1254 + 1235 1255 const res = await fetch(`/api/pipes/${pipeID}/execute`, { 1236 1256 method: 'POST' 1237 1257 }); ··· 1249 1269 // Poll for completion 1250 1270 pollExecutionStatus(data.executionId); 1251 1271 } else { 1252 - showToast('Failed to execute pipe', 'error'); 1272 + const errorText = await res.text(); 1273 + showToast(errorText || 'Failed to execute pipe', 'error'); 1253 1274 } 1254 1275 } 1255 1276 ··· 1315 1336 const toast = document.createElement('div'); 1316 1337 toast.className = `toast ${type}`; 1317 1338 toast.textContent = message; 1339 + 1340 + // Click to dismiss 1341 + toast.style.cursor = 'pointer'; 1342 + toast.onclick = () => { 1343 + toast.classList.add('exit'); 1344 + setTimeout(() => container.removeChild(toast), 200); 1345 + }; 1318 1346 1319 1347 container.appendChild(toast); 1320 1348 1321 - // Auto-dismiss after 3 seconds 1349 + // Auto-dismiss: errors stay longer 1350 + const duration = type === 'error' ? 8000 : 3000; 1322 1351 setTimeout(() => { 1323 - toast.classList.add('exit'); 1324 - setTimeout(() => { 1325 - container.removeChild(toast); 1326 - }, 200); // Match animation duration 1327 - }, 3000); 1352 + if (toast.parentNode) { 1353 + toast.classList.add('exit'); 1354 + setTimeout(() => { 1355 + if (toast.parentNode) container.removeChild(toast); 1356 + }, 200); 1357 + } 1358 + }, duration); 1328 1359 } 1329 1360 </script> 1330 1361