package webhook import ( "github.com/google/uuid" "github.com/jmoiron/sqlx" "log" "net/http" "strings" "time" ) type WebhookService struct { DB *sqlx.DB Events <-chan Event Authorization string } func (w *WebhookService) Listen() { for event := range w.Events { webhooks, _ := w.Find() var usedWebhooks []Webhook for _, webhook := range webhooks { usedWebhooks = append(usedWebhooks, webhook) } w.CreateJobs(usedWebhooks, event) } } func (w *WebhookService) Run() { for { jobs, err := w.findRunningJobs() if err != nil { log.Printf("failed fetching running jobs: %v", err) } for _, job := range jobs { job := w.ExecuteJob(job) if err := w.UpdateJob(job); err != nil { log.Printf("failed updating job %v: %v", job.Id, err) } } time.Sleep(2 * time.Hour) } } func (w *WebhookService) Find() ([]Webhook, error) { var webhooks []Webhook return webhooks, w.DB.Select(&webhooks, `SELECT * FROM "webhook"`) } func (w *WebhookService) FindAllJobs() ([]Job, error) { var jobs []Job return jobs, w.DB.Select(&jobs, `SELECT * FROM "job"`) } func (w *WebhookService) FindJobs(webhookId uuid.UUID) ([]Job, error) { var jobs []Job return jobs, w.DB.Select(&jobs, `SELECT * FROM "job" WHERE id = $1`, webhookId) } func (w *WebhookService) findRunningJobs() ([]Job, error) { var jobs []Job return jobs, w.DB.Select(&jobs, `SELECT * FROM "job" WHERE retry_count > 0 ORDER BY created asc`) } func (w *WebhookService) Create(webhook Webhook) (Webhook, error) { _, err := w.DB.Exec("INSERT INTO webhook (id, created, url, country, retry_count, on_created, on_edited, on_deleted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", &webhook.Id, &webhook.Created, &webhook.Url, &webhook.Country, &webhook.RetryCount, &webhook.OnCreated, &webhook.OnEdited, &webhook.OnDeleted, ) return webhook, err } func (w *WebhookService) Update(webhook Webhook) (Webhook, error) { _, err := w.DB.Exec("UPDATE webhook SET url = $2, country = $3, retry_count = $4, on_created = $5, on_edited = $6, on_deleted = $7 WHERE id = $1", &webhook.Id, &webhook.Url, &webhook.Country, &webhook.RetryCount, &webhook.OnCreated, &webhook.OnEdited, &webhook.OnDeleted, ) return webhook, err } func (w *WebhookService) CreateJobs(webhooks []Webhook, event Event) { json := event.Json() tx, _ := w.DB.Begin() for _, webhook := range webhooks { job := Job{ Id: uuid.Must(uuid.NewRandom()), WebhookId: webhook.Id, Created: time.Now(), Success: false, SuccessTime: nil, RetryCount: webhook.RetryCount, Content: json, } _, err := tx.Exec("INSERT INTO job (id, webhook_id, created, success, success_time, retry_count, content) VALUES ($1, $2, $3, $4, $5, $6, $7)", &job.Id, &job.WebhookId, &job.Created, &job.Success, &job.SuccessTime, &job.RetryCount, &job.Content, ) if err != nil { tx.Rollback() return } } tx.Commit() } func (w *WebhookService) ExecuteJob(job Job) Job { webhook, err := w.FindById(job.WebhookId) if err != nil { panic(err) } log.Printf("executing job [%d:%d] %v: POST %s", webhook.RetryCount-job.RetryCount, webhook.RetryCount, job.Id, webhook.Url) request, err := http.NewRequest("POST", webhook.Url, strings.NewReader(job.Content)) request.Header.Add("authorization", "Api "+w.Authorization) response, err := http.DefaultClient.Do(request) if err != nil || response.StatusCode != http.StatusOK { job.RetryCount-- } else { job.Success = true currentTime := time.Now() job.SuccessTime = ¤tTime } return job } func (w *WebhookService) FindById(webhookId uuid.UUID) (Webhook, error) { var webhook Webhook return webhook, w.DB.Get(&webhook, `SELECT * FROM "webhook" WHERE id = $1`, webhookId) } func (w *WebhookService) UpdateJob(job Job) error { _, err := w.DB.Exec(`UPDATE job SET success = $2, success_time = $3, retry_count = $4 WHERE id = $1`, &job.Id, &job.Success, &job.SuccessTime, &job.RetryCount, ) return err } func (w *WebhookService) Delete(webhookId uuid.UUID) error { _, err := w.DB.Exec(`DELETE FROM "webhook" WHERE id = $1`, &webhookId, ) return err }