bring back yahoo pipes!
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 137 lines 3.3 kB view raw
1package transforms 2 3import ( 4 "context" 5 "fmt" 6 "sort" 7 8 "github.com/kierank/pipes/nodes" 9) 10 11type MergeNode struct{} 12 13func (n *MergeNode) Type() string { return "merge" } 14func (n *MergeNode) Label() string { return "Merge" } 15func (n *MergeNode) Description() string { return "Combine multiple feeds into one" } 16func (n *MergeNode) Category() string { return "transform" } 17func (n *MergeNode) Inputs() int { return 2 } 18func (n *MergeNode) Outputs() int { return 1 } 19 20func (n *MergeNode) Execute(ctx context.Context, config map[string]interface{}, inputs [][]interface{}, execCtx *nodes.Context) ([]interface{}, error) { 21 if len(inputs) == 0 { 22 return []interface{}{}, nil 23 } 24 25 var merged []interface{} 26 for _, input := range inputs { 27 merged = append(merged, input...) 28 } 29 30 // Optionally dedupe by a field 31 dedupeField, _ := config["dedupe_field"].(string) 32 if dedupeField != "" { 33 merged = dedupeByField(merged, dedupeField) 34 } 35 36 // Optionally sort by a field 37 sortField, _ := config["sort_field"].(string) 38 sortOrder, _ := config["sort_order"].(string) 39 if sortField != "" { 40 sortItems(merged, sortField, sortOrder == "desc") 41 } 42 43 execCtx.Log("merge", "info", fmt.Sprintf("Merged %d inputs into %d items", len(inputs), len(merged))) 44 45 return merged, nil 46} 47 48func dedupeByField(items []interface{}, field string) []interface{} { 49 seen := make(map[string]bool) 50 var result []interface{} 51 52 for _, item := range items { 53 itemMap, ok := item.(map[string]interface{}) 54 if !ok { 55 result = append(result, item) 56 continue 57 } 58 59 key := fmt.Sprintf("%v", itemMap[field]) 60 if !seen[key] { 61 seen[key] = true 62 result = append(result, item) 63 } 64 } 65 66 return result 67} 68 69func sortItems(items []interface{}, field string, desc bool) { 70 sort.SliceStable(items, func(i, j int) bool { 71 iMap, iOk := items[i].(map[string]interface{}) 72 jMap, jOk := items[j].(map[string]interface{}) 73 if !iOk || !jOk { 74 return false 75 } 76 77 iRaw := getNestedValue(iMap, field) 78 jRaw := getNestedValue(jMap, field) 79 80 // Try numeric comparison first (for timestamps, etc.) 81 iNum, iIsNum := toFloat(iRaw) 82 jNum, jIsNum := toFloat(jRaw) 83 84 if iIsNum && jIsNum { 85 if desc { 86 return iNum > jNum 87 } 88 return iNum < jNum 89 } 90 91 // Fall back to string comparison 92 iVal := fmt.Sprintf("%v", iRaw) 93 jVal := fmt.Sprintf("%v", jRaw) 94 95 if desc { 96 return iVal > jVal 97 } 98 return iVal < jVal 99 }) 100} 101 102func (n *MergeNode) ValidateConfig(config map[string]interface{}) error { 103 return nil 104} 105 106func (n *MergeNode) GetConfigSchema() *nodes.ConfigSchema { 107 return &nodes.ConfigSchema{ 108 Fields: []nodes.ConfigField{ 109 { 110 Name: "dedupe_field", 111 Label: "Dedupe Field", 112 Type: "text", 113 Required: false, 114 Placeholder: "link", 115 HelpText: "Remove duplicates based on this field (e.g., link, guid)", 116 }, 117 { 118 Name: "sort_field", 119 Label: "Sort By", 120 Type: "text", 121 Required: false, 122 Placeholder: "published_at", 123 HelpText: "Field to sort merged results by (use published_at for date sorting)", 124 }, 125 { 126 Name: "sort_order", 127 Label: "Sort Order", 128 Type: "select", 129 Required: false, 130 Options: []nodes.FieldOption{ 131 {Value: "desc", Label: "Newest First"}, 132 {Value: "asc", Label: "Oldest First"}, 133 }, 134 }, 135 }, 136 } 137}