diff --git a/cmd/web/main.go b/cmd/web/main.go index abec315..c71130f 100644 --- a/cmd/web/main.go +++ b/cmd/web/main.go @@ -3,7 +3,6 @@ package main import ( "context" "crypto/tls" - "encoding/json" "fmt" "log" "log/slog" @@ -57,24 +56,25 @@ func main() { } }() - c.Tasks.Register(tasks.TypeExample, func(ctx context.Context, m []byte) error { - var t tasks.ExampleTask - if err := json.Unmarshal(m, &t); err != nil { - return err - } - slog.Info("Example task received", "message", t.Message) - return nil - }) + q := services.NewQueue[tasks.ExampleTask]( + func(ctx context.Context, task tasks.ExampleTask) error { + slog.Info("Example task received", "message", task.Message) + return nil + }, + ) + c.Tasks.Register(q) // Start the scheduler service to queue periodic tasks - c.Tasks.StartRunner(context.Background()) // use main context + ctx, cancel := context.WithCancel(context.Background()) + c.Tasks.StartRunner(ctx) - // Wait for interrupt signal to gracefully shutdown the server with a timeout of 10 seconds. + // Wait for interrupt signal to gracefully shut down the server with a timeout of 10 seconds. quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt) signal.Notify(quit, os.Kill) <-quit - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + cancel() + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := c.Web.Shutdown(ctx); err != nil { log.Fatal(err) diff --git a/config/config.go b/config/config.go index abcb11f..69f7344 100644 --- a/config/config.go +++ b/config/config.go @@ -57,6 +57,7 @@ type ( App AppConfig Cache CacheConfig Database DatabaseConfig + Tasks TasksConfig Mail MailConfig } @@ -103,6 +104,16 @@ type ( TestConnection string } + // TasksConfig stores the tasks configuration + TasksConfig struct { + Driver string + Connection string + TestConnection string + PollInterval time.Duration + MaxRetries int + Goroutines int + } + // MailConfig stores the mail configuration MailConfig struct { Hostname string diff --git a/config/config.yaml b/config/config.yaml index 3ab2d2a..1ae4ba9 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -31,6 +31,14 @@ database: connection: "dbs/main.db?_journal=WAL&_timeout=5000&_fk=true" testConnection: ":memory:?_journal=WAL&_timeout=5000&_fk=true" +tasks: + driver: "sqlite3" + connection: "dbs/jobs.db?_journal=WAL&_timeout=5000&_fk=true" + testConnection: ":memory:?_journal=WAL&_timeout=5000&_fk=true" + pollInterval: "1s" + maxRetries: 10 + goRoutines: 1 + mail: hostname: "localhost" port: 25 diff --git a/pkg/handlers/contact.go b/pkg/handlers/contact.go index 5fb20f9..1a865d7 100644 --- a/pkg/handlers/contact.go +++ b/pkg/handlers/contact.go @@ -72,10 +72,9 @@ func (h *Contact) Submit(ctx echo.Context) error { return err } - err = h.tasks.New(tasks.TypeExample). - Payload(tasks.ExampleTask{ - Message: input.Message, - }). + err = h.tasks.New(tasks.ExampleTask{ + Message: input.Message, + }). Wait(30 * time.Second). Save() if err != nil { diff --git a/pkg/log/log.go b/pkg/log/log.go index b609815..ebafd28 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -18,5 +18,10 @@ func Ctx(ctx echo.Context) *slog.Logger { return l } + return Default() +} + +// Default returns the default logger +func Default() *slog.Logger { return slog.Default() } diff --git a/pkg/services/tasks.go b/pkg/services/tasks.go index 50586b3..9eae30b 100644 --- a/pkg/services/tasks.go +++ b/pkg/services/tasks.go @@ -4,13 +4,13 @@ import ( "context" "database/sql" "encoding/json" - "log/slog" "strings" "time" "github.com/maragudk/goqite" "github.com/maragudk/goqite/jobs" "github.com/mikestefanello/pagoda/config" + "github.com/mikestefanello/pagoda/pkg/log" ) type ( @@ -21,50 +21,51 @@ type ( db *sql.DB } - // task handles task creation operations - task struct { - client *TaskClient - typ string - payload any - periodic *string - queue *string - maxRetries *int - timeout *time.Duration - deadline *time.Time - at *time.Time - wait *time.Duration - retain *time.Duration + Task interface { + Name() string + } + + // TaskOp handles task creation operations + TaskOp struct { + client *TaskClient + task Task + //payload any + //periodic *string + //queue *string + //maxRetries *int + //timeout *time.Duration + //deadline *time.Time + at *time.Time + wait *time.Duration + //retain *time.Duration + } + + Queuable interface { + Name() string + Receive(ctx context.Context, payload []byte) error } Queue[T any] struct { name string - q *goqite.Queue - subscriber func(context.Context, T) error + subscriber QueueSubscriber[T] } + + QueueSubscriber[T any] func(context.Context, T) error ) -var queues = make(map[string]Queuable) +func NewQueue[T Task](subscriber QueueSubscriber[T]) *Queue[T] { + var task T + + q := &Queue[T]{ + name: task.Name(), + subscriber: subscriber, + } -func NewQueue[T any](name string) *Queue[T] { - q := &Queue[T]{name: name} - queues[name] = q return q } -func GetQueue[T any](name string) *Queue[T] { - return queues[name].(*Queue[T]) -} - -type Queuable interface { - Receive(ctx context.Context, payload []byte) error -} - -func (q *Queue[T]) Add(item T) error { - b, err := json.Marshal(item) - if err != nil { - return err - } - return jobs.Create(context.Background(), q.q, q.name, b) +func (q *Queue[T]) Name() string { + return q.name } func (q *Queue[T]) Receive(ctx context.Context, payload []byte) error { @@ -77,46 +78,47 @@ func (q *Queue[T]) Receive(ctx context.Context, payload []byte) error { return q.subscriber(ctx, obj) } -func (q *Queue[T]) Register(r *jobs.Runner) { - r.Register(q.name, q.Receive) -} - // NewTaskClient creates a new task client func NewTaskClient(cfg *config.Config) (*TaskClient, error) { - db, err := openDB("sqlite3", "dbs/tasks.db?_journal=WAL&_timeout=5000&_fk=true") + var connection string + + switch cfg.App.Environment { + case config.EnvTest: + connection = cfg.Tasks.TestConnection + default: + connection = cfg.Tasks.Connection + } + + db, err := openDB(cfg.Tasks.Driver, connection) if err != nil { return nil, err } - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) + + //db.SetMaxOpenConns(1) + //db.SetMaxIdleConns(1) // Install the schema if err := goqite.Setup(context.Background(), db); err != nil { - // An error is returned if we already ran this + // An error is returned if we already ran this and there's no better way to check. + // You can and probably should handle this via migrations if !strings.Contains(err.Error(), "already exists") { return nil, err } } - // Determine the database based on the environment - //db := cfg.Cache.Database - //if cfg.App.Environment == config.EnvTest { - // db = cfg.Cache.TestDatabase - //} - // TODO test db t := &TaskClient{ queue: goqite.New(goqite.NewOpts{ DB: db, Name: "jobs", - MaxReceive: 10, + MaxReceive: cfg.Tasks.MaxRetries, }), db: db, } t.runner = jobs.NewRunner(jobs.NewRunnerOpts{ - Limit: 1, - Log: slog.Default(), - PollInterval: 10 * time.Millisecond, + Limit: cfg.Tasks.Goroutines, + Log: log.Default(), + PollInterval: cfg.Tasks.PollInterval, Queue: t.queue, }) @@ -125,6 +127,7 @@ func NewTaskClient(cfg *config.Config) (*TaskClient, error) { // Close closes the connection to the task service func (t *TaskClient) Close() error { + // TODO close the runner return t.db.Close() } @@ -134,28 +137,32 @@ func (t *TaskClient) StartRunner(ctx context.Context) { t.runner.Start(ctx) } -func (t *TaskClient) Register(name string, processor jobs.Func) { - t.runner.Register(name, processor) +//func (t *TaskClient) Register(name string, processor jobs.Func) { +// t.runner.Register(name, processor) +//} + +func (t *TaskClient) Register(queue Queuable) { + t.runner.Register(queue.Name(), queue.Receive) } // New starts a task creation operation -func (t *TaskClient) New(typ string) *task { - return &task{ +func (t *TaskClient) New(task Task) *TaskOp { + return &TaskOp{ client: t, - typ: typ, + task: task, } } -// Payload sets the task payload data which will be sent to the task handler -func (t *task) Payload(payload any) *task { - t.payload = payload - return t -} +//// Payload sets the task payload data which will be sent to the task handler +//func (t *TaskOp) Payload(payload Task) *TaskOp { +// t.payload = payload +// return t +//} // // Periodic sets the task to execute periodically according to a given interval // // The interval can be either in cron form ("*/5 * * * *") or "@every 30s" // -// func (t *task) Periodic(interval string) *task { +// func (t *TaskOp) Periodic(interval string) *TaskOp { // t.periodic = &interval // return t // } @@ -163,61 +170,60 @@ func (t *task) Payload(payload any) *task { // // Queue specifies the name of the queue to add the task to // // The default queue will be used if this is not set // -// func (t *task) Queue(queue string) *task { +// func (t *TaskOp) Queue(queue string) *TaskOp { // t.queue = &queue // return t // } // // // Timeout sets the task timeout, meaning the task must execute within a given duration // -// func (t *task) Timeout(timeout time.Duration) *task { +// func (t *TaskOp) Timeout(timeout time.Duration) *TaskOp { // t.timeout = &timeout // return t // } // // // Deadline sets the task execution deadline to a specific date and time // -// func (t *task) Deadline(deadline time.Time) *task { +// func (t *TaskOp) Deadline(deadline time.Time) *TaskOp { // t.deadline = &deadline // return t // } // -// // At sets the exact date and time the task should be executed -// -// func (t *task) At(processAt time.Time) *task { -// t.at = &processAt -// return t -// } -// + +// At sets the exact date and time the task should be executed +func (t *TaskOp) At(processAt time.Time) *TaskOp { + until := time.Until(processAt) + t.wait = &until + return t +} + // Wait instructs the task to wait a given duration before it is executed -func (t *task) Wait(duration time.Duration) *task { +func (t *TaskOp) Wait(duration time.Duration) *TaskOp { t.wait = &duration return t } // //// Retain instructs the task service to retain the task data for a given duration after execution is complete -//func (t *task) Retain(duration time.Duration) *task { +//func (t *TaskOp) Retain(duration time.Duration) *TaskOp { // t.retain = &duration // return t //} // //// MaxRetries sets the maximum amount of times to retry executing the task in the event of a failure -//func (t *task) MaxRetries(retries int) *task { +//func (t *TaskOp) MaxRetries(retries int) *TaskOp { // t.maxRetries = &retries // return t //} // Save saves the task so it can be executed -func (t *task) Save() error { +func (t *TaskOp) Save() error { var err error // Build the payload - var payload []byte - if t.payload != nil { - if payload, err = json.Marshal(t.payload); err != nil { - return err - } + payload, err := json.Marshal(t.task) + if err != nil { + return err } // Build the task options @@ -251,6 +257,6 @@ func (t *task) Save() error { if t.wait != nil { msg.Delay = *t.wait } - return t.client.queue.Send(context.Background(), msg) - //return jobs.Create(context.Background(), t.client.queue, t.typ, payload) + //return t.client.queue.Send(context.Background(), msg) + return jobs.Create(context.Background(), t.client.queue, t.task.Name(), payload) } diff --git a/pkg/tasks/example.go b/pkg/tasks/example.go index c4be06c..159e6fa 100644 --- a/pkg/tasks/example.go +++ b/pkg/tasks/example.go @@ -1,9 +1,9 @@ package tasks -// TypeExample is the type for the example task. -// This is what is passed in to TaskClient.New() when creating a new task -const TypeExample = "example_task" - type ExampleTask struct { Message string } + +func (t ExampleTask) Name() string { + return "example_task" +}