bring back yahoo pipes!
1package transforms
2
3import (
4 "context"
5 "fmt"
6 "sort"
7
8 "github.com/kierank/pipes/nodes"
9)
10
11type MergeNode struct{}
12
13func (n *MergeNode) Type() string { return "merge" }
14func (n *MergeNode) Label() string { return "Merge" }
15func (n *MergeNode) Description() string { return "Combine multiple feeds into one" }
16func (n *MergeNode) Category() string { return "transform" }
17func (n *MergeNode) Inputs() int { return 2 }
18func (n *MergeNode) Outputs() int { return 1 }
19
20func (n *MergeNode) Execute(ctx context.Context, config map[string]interface{}, inputs [][]interface{}, execCtx *nodes.Context) ([]interface{}, error) {
21 if len(inputs) == 0 {
22 return []interface{}{}, nil
23 }
24
25 var merged []interface{}
26 for _, input := range inputs {
27 merged = append(merged, input...)
28 }
29
30 // Optionally dedupe by a field
31 dedupeField, _ := config["dedupe_field"].(string)
32 if dedupeField != "" {
33 merged = dedupeByField(merged, dedupeField)
34 }
35
36 // Optionally sort by a field
37 sortField, _ := config["sort_field"].(string)
38 sortOrder, _ := config["sort_order"].(string)
39 if sortField != "" {
40 sortItems(merged, sortField, sortOrder == "desc")
41 }
42
43 execCtx.Log("merge", "info", fmt.Sprintf("Merged %d inputs into %d items", len(inputs), len(merged)))
44
45 return merged, nil
46}
47
48func dedupeByField(items []interface{}, field string) []interface{} {
49 seen := make(map[string]bool)
50 var result []interface{}
51
52 for _, item := range items {
53 itemMap, ok := item.(map[string]interface{})
54 if !ok {
55 result = append(result, item)
56 continue
57 }
58
59 key := fmt.Sprintf("%v", itemMap[field])
60 if !seen[key] {
61 seen[key] = true
62 result = append(result, item)
63 }
64 }
65
66 return result
67}
68
69func sortItems(items []interface{}, field string, desc bool) {
70 sort.SliceStable(items, func(i, j int) bool {
71 iMap, iOk := items[i].(map[string]interface{})
72 jMap, jOk := items[j].(map[string]interface{})
73 if !iOk || !jOk {
74 return false
75 }
76
77 iRaw := getNestedValue(iMap, field)
78 jRaw := getNestedValue(jMap, field)
79
80 // Try numeric comparison first (for timestamps, etc.)
81 iNum, iIsNum := toFloat(iRaw)
82 jNum, jIsNum := toFloat(jRaw)
83
84 if iIsNum && jIsNum {
85 if desc {
86 return iNum > jNum
87 }
88 return iNum < jNum
89 }
90
91 // Fall back to string comparison
92 iVal := fmt.Sprintf("%v", iRaw)
93 jVal := fmt.Sprintf("%v", jRaw)
94
95 if desc {
96 return iVal > jVal
97 }
98 return iVal < jVal
99 })
100}
101
102func (n *MergeNode) ValidateConfig(config map[string]interface{}) error {
103 return nil
104}
105
106func (n *MergeNode) GetConfigSchema() *nodes.ConfigSchema {
107 return &nodes.ConfigSchema{
108 Fields: []nodes.ConfigField{
109 {
110 Name: "dedupe_field",
111 Label: "Dedupe Field",
112 Type: "text",
113 Required: false,
114 Placeholder: "link",
115 HelpText: "Remove duplicates based on this field (e.g., link, guid)",
116 },
117 {
118 Name: "sort_field",
119 Label: "Sort By",
120 Type: "text",
121 Required: false,
122 Placeholder: "published_at",
123 HelpText: "Field to sort merged results by (use published_at for date sorting)",
124 },
125 {
126 Name: "sort_order",
127 Label: "Sort Order",
128 Type: "select",
129 Required: false,
130 Options: []nodes.FieldOption{
131 {Value: "desc", Label: "Newest First"},
132 {Value: "asc", Label: "Oldest First"},
133 },
134 },
135 },
136 }
137}