bring back yahoo pipes!
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 109 lines 3.0 kB view raw
1package outputs 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "net/http" 9 "strings" 10 "time" 11 12 "github.com/kierank/pipes/nodes" 13) 14 15type WebhookOutputNode struct{} 16 17func (n *WebhookOutputNode) Type() string { return "webhook-output" } 18func (n *WebhookOutputNode) Label() string { return "Webhook" } 19func (n *WebhookOutputNode) Description() string { return "POST data to a webhook URL" } 20func (n *WebhookOutputNode) Category() string { return "output" } 21func (n *WebhookOutputNode) Inputs() int { return 1 } 22func (n *WebhookOutputNode) Outputs() int { return 0 } 23 24func (n *WebhookOutputNode) Execute(ctx context.Context, config map[string]interface{}, inputs [][]interface{}, execCtx *nodes.Context) ([]interface{}, error) { 25 if len(inputs) == 0 || len(inputs[0]) == 0 { 26 execCtx.Log("webhook-output", "info", "No input data") 27 return nil, nil 28 } 29 30 url, ok := config["url"].(string) 31 if !ok || url == "" { 32 return nil, fmt.Errorf("webhook URL is required") 33 } 34 35 data := inputs[0] 36 37 payload := map[string]interface{}{ 38 "count": len(data), 39 "items": data, 40 } 41 42 jsonData, err := json.Marshal(payload) 43 if err != nil { 44 return nil, fmt.Errorf("marshal payload: %w", err) 45 } 46 47 client := &http.Client{Timeout: 30 * time.Second} 48 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(jsonData)) 49 if err != nil { 50 return nil, fmt.Errorf("create request: %w", err) 51 } 52 53 req.Header.Set("Content-Type", "application/json") 54 req.Header.Set("User-Agent", "Pipes/1.0") 55 56 // Add custom headers 57 if headers, ok := config["headers"].(string); ok && headers != "" { 58 for _, line := range strings.Split(headers, "\n") { 59 if parts := strings.SplitN(strings.TrimSpace(line), ":", 2); len(parts) == 2 { 60 req.Header.Set(strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1])) 61 } 62 } 63 } 64 65 resp, err := client.Do(req) 66 if err != nil { 67 return nil, fmt.Errorf("webhook request failed: %w", err) 68 } 69 defer resp.Body.Close() 70 71 if resp.StatusCode >= 400 { 72 return nil, fmt.Errorf("webhook returned HTTP %d", resp.StatusCode) 73 } 74 75 execCtx.Log("webhook-output", "info", fmt.Sprintf("Posted %d items to webhook (HTTP %d)", len(data), resp.StatusCode)) 76 77 return data, nil 78} 79 80func (n *WebhookOutputNode) ValidateConfig(config map[string]interface{}) error { 81 url, ok := config["url"].(string) 82 if !ok || url == "" { 83 return fmt.Errorf("webhook URL is required") 84 } 85 return nil 86} 87 88func (n *WebhookOutputNode) GetConfigSchema() *nodes.ConfigSchema { 89 return &nodes.ConfigSchema{ 90 Fields: []nodes.ConfigField{ 91 { 92 Name: "url", 93 Label: "Webhook URL", 94 Type: "url", 95 Required: true, 96 Placeholder: "https://example.com/webhook", 97 HelpText: "URL to POST data to", 98 }, 99 { 100 Name: "headers", 101 Label: "Headers", 102 Type: "textarea", 103 Required: false, 104 Placeholder: "Authorization: Bearer token", 105 HelpText: "Custom headers, one per line as Header: Value", 106 }, 107 }, 108 } 109}