holiday-api/webhook/webhook_service.go

142 lines
4.0 KiB
Go

package webhook
import (
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"log"
"net/http"
"strings"
"time"
)
type WebhookService struct {
DB *sqlx.DB
Events <-chan Event
Authorization string
}
func (w *WebhookService) Listen() {
for event := range w.Events {
webhooks, _ := w.Find()
var usedWebhooks []Webhook
for _, webhook := range webhooks {
usedWebhooks = append(usedWebhooks, webhook)
}
w.CreateJobs(usedWebhooks, event)
}
}
func (w *WebhookService) Run() {
for {
jobs, err := w.findRunningJobs()
if err != nil {
log.Printf("failed fetching running jobs: %v", err)
}
for _, job := range jobs {
job := w.ExecuteJob(job)
if err := w.UpdateJob(job); err != nil {
log.Printf("failed updating job %v: %v", job.Id, err)
}
}
time.Sleep(2 * time.Hour)
}
}
func (w *WebhookService) Find() ([]Webhook, error) {
var webhooks []Webhook
return webhooks, w.DB.Select(&webhooks, `SELECT * FROM "webhook"`)
}
func (w *WebhookService) FindAllJobs() ([]Job, error) {
var jobs []Job
return jobs, w.DB.Select(&jobs, `SELECT * FROM "job"`)
}
func (w *WebhookService) FindJobs(webhookId uuid.UUID) ([]Job, error) {
var jobs []Job
return jobs, w.DB.Select(&jobs, `SELECT * FROM "job" WHERE id = $1`, webhookId)
}
func (w *WebhookService) findRunningJobs() ([]Job, error) {
var jobs []Job
return jobs, w.DB.Select(&jobs, `SELECT * FROM "job" WHERE retry_count > 0 ORDER BY created asc`)
}
func (w *WebhookService) Create(webhook Webhook) (Webhook, error) {
_, err := w.DB.Exec("INSERT INTO webhook (id, created, url, country, retry_count, on_created, on_edited, on_deleted) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
&webhook.Id, &webhook.Created, &webhook.Url, &webhook.Country, &webhook.RetryCount, &webhook.OnCreated, &webhook.OnEdited, &webhook.OnDeleted,
)
return webhook, err
}
func (w *WebhookService) Update(webhook Webhook) (Webhook, error) {
_, err := w.DB.Exec("UPDATE webhook SET url = $2, country = $3, retry_count = $4, on_created = $5, on_edited = $6, on_deleted = $7 WHERE id = $1",
&webhook.Id, &webhook.Url, &webhook.Country, &webhook.RetryCount, &webhook.OnCreated, &webhook.OnEdited, &webhook.OnDeleted,
)
return webhook, err
}
func (w *WebhookService) CreateJobs(webhooks []Webhook, event Event) {
json := event.Json()
tx, _ := w.DB.Begin()
for _, webhook := range webhooks {
job := Job{
Id: uuid.Must(uuid.NewRandom()),
WebhookId: webhook.Id,
Created: time.Now(),
Success: false,
SuccessTime: nil,
RetryCount: webhook.RetryCount,
Content: json,
}
_, err := tx.Exec("INSERT INTO job (id, webhook_id, created, success, success_time, retry_count, content) VALUES ($1, $2, $3, $4, $5, $6, $7)",
&job.Id, &job.WebhookId, &job.Created, &job.Success, &job.SuccessTime, &job.RetryCount, &job.Content,
)
if err != nil {
tx.Rollback()
return
}
}
tx.Commit()
}
func (w *WebhookService) ExecuteJob(job Job) Job {
webhook, err := w.FindById(job.WebhookId)
if err != nil {
panic(err)
}
log.Printf("executing job [%d:%d] %v: POST %s", webhook.RetryCount-job.RetryCount, webhook.RetryCount, job.Id, webhook.Url)
request, err := http.NewRequest("POST", webhook.Url, strings.NewReader(job.Content))
request.Header.Add("authorization", "Api "+w.Authorization)
response, err := http.DefaultClient.Do(request)
if err != nil || response.StatusCode != http.StatusOK {
job.RetryCount--
} else {
job.Success = true
currentTime := time.Now()
job.SuccessTime = &currentTime
}
return job
}
func (w *WebhookService) FindById(webhookId uuid.UUID) (Webhook, error) {
var webhook Webhook
return webhook, w.DB.Get(&webhook, `SELECT * FROM "webhook" WHERE id = $1`, webhookId)
}
func (w *WebhookService) UpdateJob(job Job) error {
_, err := w.DB.Exec(`UPDATE job SET success = $2, success_time = $3, retry_count = $4 WHERE id = $1`,
&job.Id, &job.Success, &job.SuccessTime, &job.RetryCount,
)
return err
}
func (w *WebhookService) Delete(webhookId uuid.UUID) error {
_, err := w.DB.Exec(`DELETE FROM "webhook" WHERE id = $1`,
&webhookId,
)
return err
}