From 912ae2ca6b5eb4be227ef6d43119bb875a84f3f7 Mon Sep 17 00:00:00 2001 From: mikestefanello Date: Wed, 19 Jun 2024 13:53:44 -0400 Subject: [PATCH] Provide task queue registration and service container injection. --- cmd/web/main.go | 12 +--- pkg/services/container.go | 6 +- pkg/services/tasks.go | 119 +++++++++++++++++--------------------- pkg/tasks/example.go | 27 +++++++++ pkg/tasks/register.go | 10 ++++ 5 files changed, 96 insertions(+), 78 deletions(-) create mode 100644 pkg/tasks/register.go diff --git a/cmd/web/main.go b/cmd/web/main.go index cca1047..1335023 100644 --- a/cmd/web/main.go +++ b/cmd/web/main.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "log" - "log/slog" "net/http" "os" "os/signal" @@ -57,15 +56,10 @@ func main() { } }() - q := services.NewQueue[tasks.ExampleTask]( - func(ctx context.Context, task tasks.ExampleTask) error { - slog.Info("Example task received", "message", task.Message) - return nil - }, - ) - c.Tasks.Register(q) + // Register all task queues + tasks.Register(c) - // Start the scheduler service to queue periodic tasks + // Start the task runner to executed queued tasks ctx, cancel := context.WithCancel(context.Background()) c.Tasks.StartRunner(ctx) diff --git a/pkg/services/container.go b/pkg/services/container.go index 5fafac0..b2c6a5c 100644 --- a/pkg/services/container.go +++ b/pkg/services/container.go @@ -72,9 +72,9 @@ func NewContainer() *Container { // Shutdown shuts the Container down and disconnects all connections func (c *Container) Shutdown() error { - if err := c.Tasks.Close(); err != nil { - return err - } + //if err := c.Tasks.Close(); err != nil { + // return err + //} if err := c.ORM.Close(); err != nil { return err } diff --git a/pkg/services/tasks.go b/pkg/services/tasks.go index c170a8a..d18dc22 100644 --- a/pkg/services/tasks.go +++ b/pkg/services/tasks.go @@ -22,84 +22,44 @@ type ( runner *jobs.Runner } + // Task is a job that can be added to a queue and later passed to and executed by a QueueSubscriber. + // See pkg/tasks for an example of how this can be used with a queue. Task interface { Name() string } - // TaskOp handles task creation operations - TaskOp struct { + // TaskSaveOp handles task save operations + TaskSaveOp struct { client *TaskClient task Task - //payload any - //periodic *string - //queue *string - //maxRetries *int - //timeout *time.Duration - //deadline *time.Time - tx *sql.Tx - at *time.Time - wait *time.Duration - //retain *time.Duration + tx *sql.Tx + at *time.Time + wait *time.Duration } + // Queue is a queue that a Task can be pushed to for execution. + // While this can be implemented directly, it's recommended to use NewQueue() which uses generics in + // order to provide type-safe queues and queue subscriber callbacks for task execution. Queue interface { + // Name returns the name of the task this queue processes Name() string + + // Receive receives the Task payload to be processed Receive(ctx context.Context, payload []byte) error } - queue[T any] struct { + // queue provides a type-safe implementation of Queue + queue[T Task] struct { name string subscriber QueueSubscriber[T] } - QueueSubscriber[T any] func(context.Context, T) error + // QueueSubscriber is a generic subscriber callback for a given queue to process Tasks + QueueSubscriber[T Task] func(context.Context, T) error ) -func NewQueue[T Task](subscriber QueueSubscriber[T]) Queue { - var task T - - q := &queue[T]{ - name: task.Name(), - subscriber: subscriber, - } - - return q -} - -func (q *queue[T]) Name() string { - return q.name -} - -func (q *queue[T]) Receive(ctx context.Context, payload []byte) error { - var obj T - err := json.Unmarshal(payload, &obj) - if err != nil { - return err - } - - return q.subscriber(ctx, obj) -} - // NewTaskClient creates a new task client func NewTaskClient(cfg config.TasksConfig, db *sql.DB) (*TaskClient, error) { - //var connection string - // - //switch cfg.App.Environment { - //case config.EnvTest: - // connection = cfg.Tasks.TestConnection - //default: - // connection = cfg.Tasks.Connection - //} - // - //db, err := openDB(cfg.Tasks.Driver, connection) - //if err != nil { - // return nil, err - //} - // - //// TODO is this correct? - //db.SetMaxOpenConns(cfg.Tasks.Goroutines) - //db.SetMaxIdleConns(cfg.Tasks.Goroutines) - // Install the schema if err := goqite.Setup(context.Background(), db); err != nil { // An error is returned if we already ran this and there's no better way to check. @@ -140,38 +100,39 @@ func (t *TaskClient) StartRunner(ctx context.Context) { t.runner.Start(ctx) } +// Register registers a queue so tasks can be added to it and processed func (t *TaskClient) Register(queue Queue) { t.runner.Register(queue.Name(), queue.Receive) } -// New starts a task creation operation -func (t *TaskClient) New(task Task) *TaskOp { - return &TaskOp{ +// New starts a task save operation +func (t *TaskClient) New(task Task) *TaskSaveOp { + return &TaskSaveOp{ client: t, task: task, } } // At sets the exact date and time the task should be executed -func (t *TaskOp) At(processAt time.Time) *TaskOp { +func (t *TaskSaveOp) At(processAt time.Time) *TaskSaveOp { t.Wait(time.Until(processAt)) return t } // Wait instructs the task to wait a given duration before it is executed -func (t *TaskOp) Wait(duration time.Duration) *TaskOp { +func (t *TaskSaveOp) Wait(duration time.Duration) *TaskSaveOp { t.wait = &duration return t } -// Tx will insert the task as part of a given database transaction -func (t *TaskOp) Tx(tx *sql.Tx) *TaskOp { +// Tx will include the task as part of a given database transaction +func (t *TaskSaveOp) Tx(tx *sql.Tx) *TaskSaveOp { t.tx = tx return t } -// Save saves the task so it can be executed -func (t *TaskOp) Save() error { +// Save saves the task so it can be queued for execution +func (t *TaskSaveOp) Save() error { // Build the payload // TODO use gob? payload, err := json.Marshal(t.task) @@ -194,3 +155,29 @@ func (t *TaskOp) Save() error { return jobs.CreateTx(context.Background(), t.tx, t.client.queue, t.task.Name(), payload) } } + +// NewQueue queues a new type-safe Queue of a given Task type +func NewQueue[T Task](subscriber QueueSubscriber[T]) Queue { + var task T + + q := &queue[T]{ + name: task.Name(), + subscriber: subscriber, + } + + return q +} + +func (q *queue[T]) Name() string { + return q.name +} + +func (q *queue[T]) Receive(ctx context.Context, payload []byte) error { + var obj T + err := json.Unmarshal(payload, &obj) + if err != nil { + return err + } + + return q.subscriber(ctx, obj) +} diff --git a/pkg/tasks/example.go b/pkg/tasks/example.go index 159e6fa..3ef4b60 100644 --- a/pkg/tasks/example.go +++ b/pkg/tasks/example.go @@ -1,9 +1,36 @@ package tasks +import ( + "context" + + "github.com/mikestefanello/pagoda/pkg/log" + "github.com/mikestefanello/pagoda/pkg/services" +) + +// ExampleTask is an example implementation of services.Task +// This represents the task that can be queued for execution via the task client and should contain everything +// that your queue subscriber needs to process the task. type ExampleTask struct { Message string } +// Name satisfies the services.Task interface by proviing a unique name for this Task type func (t ExampleTask) Name() string { return "example_task" } + +// NewExampleTaskQueue provides a Queue that can process ExampleTask tasks +// The service container is provided so the subscriber can have access to the app dependencies. +// All queues must be registered in the Register() function. +// Whenever an ExampleTask is added to the task client, it will be queued and eventually sent here for execution. +func NewExampleTaskQueue(c *services.Container) services.Queue { + return services.NewQueue[ExampleTask](func(ctx context.Context, task ExampleTask) error { + log.Default().Info("Example task received", + "message", task.Message, + ) + log.Default().Info("This can access the container for dependencies", + "echo", c.Web.Reverse("home"), + ) + return nil + }) +} diff --git a/pkg/tasks/register.go b/pkg/tasks/register.go new file mode 100644 index 0000000..895b974 --- /dev/null +++ b/pkg/tasks/register.go @@ -0,0 +1,10 @@ +package tasks + +import ( + "github.com/mikestefanello/pagoda/pkg/services" +) + +// Register registers all task queues with the task client +func Register(c *services.Container) { + c.Tasks.Register(NewExampleTaskQueue(c)) +}