bring back yahoo pipes!
1package outputs
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "net/http"
9 "strings"
10 "time"
11
12 "github.com/kierank/pipes/nodes"
13)
14
15type WebhookOutputNode struct{}
16
17func (n *WebhookOutputNode) Type() string { return "webhook-output" }
18func (n *WebhookOutputNode) Label() string { return "Webhook" }
19func (n *WebhookOutputNode) Description() string { return "POST data to a webhook URL" }
20func (n *WebhookOutputNode) Category() string { return "output" }
21func (n *WebhookOutputNode) Inputs() int { return 1 }
22func (n *WebhookOutputNode) Outputs() int { return 0 }
23
24func (n *WebhookOutputNode) Execute(ctx context.Context, config map[string]interface{}, inputs [][]interface{}, execCtx *nodes.Context) ([]interface{}, error) {
25 if len(inputs) == 0 || len(inputs[0]) == 0 {
26 execCtx.Log("webhook-output", "info", "No input data")
27 return nil, nil
28 }
29
30 url, ok := config["url"].(string)
31 if !ok || url == "" {
32 return nil, fmt.Errorf("webhook URL is required")
33 }
34
35 data := inputs[0]
36
37 payload := map[string]interface{}{
38 "count": len(data),
39 "items": data,
40 }
41
42 jsonData, err := json.Marshal(payload)
43 if err != nil {
44 return nil, fmt.Errorf("marshal payload: %w", err)
45 }
46
47 client := &http.Client{Timeout: 30 * time.Second}
48 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(jsonData))
49 if err != nil {
50 return nil, fmt.Errorf("create request: %w", err)
51 }
52
53 req.Header.Set("Content-Type", "application/json")
54 req.Header.Set("User-Agent", "Pipes/1.0")
55
56 // Add custom headers
57 if headers, ok := config["headers"].(string); ok && headers != "" {
58 for _, line := range strings.Split(headers, "\n") {
59 if parts := strings.SplitN(strings.TrimSpace(line), ":", 2); len(parts) == 2 {
60 req.Header.Set(strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]))
61 }
62 }
63 }
64
65 resp, err := client.Do(req)
66 if err != nil {
67 return nil, fmt.Errorf("webhook request failed: %w", err)
68 }
69 defer resp.Body.Close()
70
71 if resp.StatusCode >= 400 {
72 return nil, fmt.Errorf("webhook returned HTTP %d", resp.StatusCode)
73 }
74
75 execCtx.Log("webhook-output", "info", fmt.Sprintf("Posted %d items to webhook (HTTP %d)", len(data), resp.StatusCode))
76
77 return data, nil
78}
79
80func (n *WebhookOutputNode) ValidateConfig(config map[string]interface{}) error {
81 url, ok := config["url"].(string)
82 if !ok || url == "" {
83 return fmt.Errorf("webhook URL is required")
84 }
85 return nil
86}
87
88func (n *WebhookOutputNode) GetConfigSchema() *nodes.ConfigSchema {
89 return &nodes.ConfigSchema{
90 Fields: []nodes.ConfigField{
91 {
92 Name: "url",
93 Label: "Webhook URL",
94 Type: "url",
95 Required: true,
96 Placeholder: "https://example.com/webhook",
97 HelpText: "URL to POST data to",
98 },
99 {
100 Name: "headers",
101 Label: "Headers",
102 Type: "textarea",
103 Required: false,
104 Placeholder: "Authorization: Bearer token",
105 HelpText: "Custom headers, one per line as Header: Value",
106 },
107 },
108 }
109}