removeedd waitgroup, using sync/errgroup instead (to pass error messages back). returning errors in test notifications endpoint payload. Adding failure tests for webhooks, scripts & shoutrr.

This commit is contained in:
Jason Kulatunga
2020-10-03 10:40:27 -06:00
parent 732eb039da
commit a3438297e6
5 changed files with 111 additions and 43 deletions
+36 -43
View File
@@ -3,17 +3,18 @@ package notify
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/analogj/go-util/utils"
"github.com/analogj/scrutiny/webapp/backend/pkg/config"
"github.com/containrrr/shoutrrr"
shoutrrrTypes "github.com/containrrr/shoutrrr/pkg/types"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
)
@@ -102,62 +103,65 @@ func (n *Notify) Send() error {
n.Logger.Debugf("Configured shoutrrr: %v", notifyShoutrrr)
//run all scripts, webhooks and shoutrr commands in parallel
var wg sync.WaitGroup
//var wg sync.WaitGroup
var eg errgroup.Group
for _, notifyWebhook := range notifyWebhooks {
// execute collection in parallel go-routines
wg.Add(1)
go n.SendWebhookNotification(&wg, notifyWebhook)
eg.Go(func() error { return n.SendWebhookNotification(notifyWebhook) })
}
for _, notifyScript := range notifyScripts {
// execute collection in parallel go-routines
wg.Add(1)
go n.SendScriptNotification(&wg, notifyScript)
eg.Go(func() error { return n.SendScriptNotification(notifyScript) })
}
for _, shoutrrrUrl := range notifyShoutrrr {
wg.Add(1)
go n.SendShoutrrrNotification(&wg, shoutrrrUrl)
eg.Go(func() error { return n.SendShoutrrrNotification(shoutrrrUrl) })
}
//and wait for completion, error or timeout.
n.Logger.Debugf("Main: waiting for notifications to complete.")
//wg.Wait()
if waitTimeout(&wg, time.Minute) { //wait for 1 minute
fmt.Println("Timed out while sending notifications")
if err := eg.Wait(); err == nil {
n.Logger.Info("Successfully sent notifications. Check logs for more information.")
return nil
} else {
fmt.Println("Sent notifications. Check logs for more information.")
n.Logger.Error("One or more notifications failed to send successfully. See logs for more information.")
return err
}
return nil
////wg.Wait()
//if waitTimeout(&wg, time.Minute) { //wait for 1 minute
// fmt.Println("Timed out while sending notifications")
//} else {
//}
//return nil
}
func (n *Notify) SendWebhookNotification(wg *sync.WaitGroup, webhookUrl string) {
defer wg.Done()
func (n *Notify) SendWebhookNotification(webhookUrl string) error {
n.Logger.Infof("Sending Webhook to %s", webhookUrl)
requestBody, err := json.Marshal(n.Payload)
if err != nil {
n.Logger.Errorf("An error occurred while sending Webhook to %s: %v", webhookUrl, err)
return
return err
}
resp, err := http.Post(webhookUrl, "application/json", bytes.NewBuffer(requestBody))
if err != nil {
n.Logger.Errorf("An error occurred while sending Webhook to %s: %v", webhookUrl, err)
return
return err
}
defer resp.Body.Close()
//we don't care about resp body content, but maybe we should log it?
return nil
}
func (n *Notify) SendScriptNotification(wg *sync.WaitGroup, scriptUrl string) {
defer wg.Done()
func (n *Notify) SendScriptNotification(scriptUrl string) error {
//check if the script exists.
scriptPath := strings.TrimPrefix(scriptUrl, "script://")
n.Logger.Infof("Executing Script %s", scriptPath)
if !utils.FileExists(scriptPath) {
n.Logger.Errorf("Script does not exist: %s", scriptPath)
return
return errors.New(fmt.Sprintf("custom script path does not exist: %s", scriptPath))
}
copyEnv := os.Environ()
@@ -171,20 +175,20 @@ func (n *Notify) SendScriptNotification(wg *sync.WaitGroup, scriptUrl string) {
err := utils.CmdExec(scriptPath, []string{}, "", copyEnv, "")
if err != nil {
n.Logger.Errorf("An error occurred while executing script %s: %v", scriptPath, err)
return err
}
return
return nil
}
func (n *Notify) SendShoutrrrNotification(wg *sync.WaitGroup, shoutrrrUrl string) {
func (n *Notify) SendShoutrrrNotification(shoutrrrUrl string) error {
fmt.Printf("Sending Notifications to %v", shoutrrrUrl)
n.Logger.Infof("Sending notifications to %v", shoutrrrUrl)
defer wg.Done()
sender, err := shoutrrr.CreateSender(shoutrrrUrl)
if err != nil {
n.Logger.Errorf("An error occurred while sending notifications %v: %v", shoutrrrUrl, err)
return
return err
}
//sender.SetLogger(n.Logger.)
@@ -193,15 +197,21 @@ func (n *Notify) SendShoutrrrNotification(wg *sync.WaitGroup, shoutrrrUrl string
if err != nil {
n.Logger.Errorf("An error occurred occurred while generating notification payload for %s:\n %v", serviceName, shoutrrrUrl, err)
return err
}
errs := sender.Send(n.Payload.Message, params)
if len(errs) > 0 {
n.Logger.Errorf("One or more errors occurred occurred while sending notifications for %s:\n %v", shoutrrrUrl, errs)
n.Logger.Errorf("One or more errors occurred occurred while sending notifications for %s:", shoutrrrUrl)
var errstrings []string
for _, err := range errs {
n.Logger.Error(err)
errstrings = append(errstrings, err.Error())
}
return errors.New(strings.Join(errstrings, "\n"))
}
return nil
}
func (n *Notify) GenShoutrrrNotificationParams(shoutrrrUrl string) (string, *shoutrrrTypes.Params, error) {
@@ -243,20 +253,3 @@ func (n *Notify) GenShoutrrrNotificationParams(shoutrrrUrl string) (string, *sho
return serviceName, params, nil
}
//utility functions
// waitTimeout waits for the waitgroup for the specified max timeout.
// Returns true if waiting timed out.
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}