Provide task queue registration and service container injection.
This commit is contained in:
parent
0d2ad6e936
commit
912ae2ca6b
5 changed files with 96 additions and 78 deletions
|
|
@ -6,7 +6,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
|
@ -57,15 +56,10 @@ func main() {
|
|||
}
|
||||
}()
|
||||
|
||||
q := services.NewQueue[tasks.ExampleTask](
|
||||
func(ctx context.Context, task tasks.ExampleTask) error {
|
||||
slog.Info("Example task received", "message", task.Message)
|
||||
return nil
|
||||
},
|
||||
)
|
||||
c.Tasks.Register(q)
|
||||
// Register all task queues
|
||||
tasks.Register(c)
|
||||
|
||||
// Start the scheduler service to queue periodic tasks
|
||||
// Start the task runner to executed queued tasks
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c.Tasks.StartRunner(ctx)
|
||||
|
||||
|
|
|
|||
|
|
@ -72,9 +72,9 @@ func NewContainer() *Container {
|
|||
|
||||
// Shutdown shuts the Container down and disconnects all connections
|
||||
func (c *Container) Shutdown() error {
|
||||
if err := c.Tasks.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
//if err := c.Tasks.Close(); err != nil {
|
||||
// return err
|
||||
//}
|
||||
if err := c.ORM.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,84 +22,44 @@ type (
|
|||
runner *jobs.Runner
|
||||
}
|
||||
|
||||
// Task is a job that can be added to a queue and later passed to and executed by a QueueSubscriber.
|
||||
// See pkg/tasks for an example of how this can be used with a queue.
|
||||
Task interface {
|
||||
Name() string
|
||||
}
|
||||
|
||||
// TaskOp handles task creation operations
|
||||
TaskOp struct {
|
||||
// TaskSaveOp handles task save operations
|
||||
TaskSaveOp struct {
|
||||
client *TaskClient
|
||||
task Task
|
||||
//payload any
|
||||
//periodic *string
|
||||
//queue *string
|
||||
//maxRetries *int
|
||||
//timeout *time.Duration
|
||||
//deadline *time.Time
|
||||
tx *sql.Tx
|
||||
at *time.Time
|
||||
wait *time.Duration
|
||||
//retain *time.Duration
|
||||
tx *sql.Tx
|
||||
at *time.Time
|
||||
wait *time.Duration
|
||||
}
|
||||
|
||||
// Queue is a queue that a Task can be pushed to for execution.
|
||||
// While this can be implemented directly, it's recommended to use NewQueue() which uses generics in
|
||||
// order to provide type-safe queues and queue subscriber callbacks for task execution.
|
||||
Queue interface {
|
||||
// Name returns the name of the task this queue processes
|
||||
Name() string
|
||||
|
||||
// Receive receives the Task payload to be processed
|
||||
Receive(ctx context.Context, payload []byte) error
|
||||
}
|
||||
|
||||
queue[T any] struct {
|
||||
// queue provides a type-safe implementation of Queue
|
||||
queue[T Task] struct {
|
||||
name string
|
||||
subscriber QueueSubscriber[T]
|
||||
}
|
||||
|
||||
QueueSubscriber[T any] func(context.Context, T) error
|
||||
// QueueSubscriber is a generic subscriber callback for a given queue to process Tasks
|
||||
QueueSubscriber[T Task] func(context.Context, T) error
|
||||
)
|
||||
|
||||
func NewQueue[T Task](subscriber QueueSubscriber[T]) Queue {
|
||||
var task T
|
||||
|
||||
q := &queue[T]{
|
||||
name: task.Name(),
|
||||
subscriber: subscriber,
|
||||
}
|
||||
|
||||
return q
|
||||
}
|
||||
|
||||
func (q *queue[T]) Name() string {
|
||||
return q.name
|
||||
}
|
||||
|
||||
func (q *queue[T]) Receive(ctx context.Context, payload []byte) error {
|
||||
var obj T
|
||||
err := json.Unmarshal(payload, &obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return q.subscriber(ctx, obj)
|
||||
}
|
||||
|
||||
// NewTaskClient creates a new task client
|
||||
func NewTaskClient(cfg config.TasksConfig, db *sql.DB) (*TaskClient, error) {
|
||||
//var connection string
|
||||
//
|
||||
//switch cfg.App.Environment {
|
||||
//case config.EnvTest:
|
||||
// connection = cfg.Tasks.TestConnection
|
||||
//default:
|
||||
// connection = cfg.Tasks.Connection
|
||||
//}
|
||||
//
|
||||
//db, err := openDB(cfg.Tasks.Driver, connection)
|
||||
//if err != nil {
|
||||
// return nil, err
|
||||
//}
|
||||
//
|
||||
//// TODO is this correct?
|
||||
//db.SetMaxOpenConns(cfg.Tasks.Goroutines)
|
||||
//db.SetMaxIdleConns(cfg.Tasks.Goroutines)
|
||||
|
||||
// Install the schema
|
||||
if err := goqite.Setup(context.Background(), db); err != nil {
|
||||
// An error is returned if we already ran this and there's no better way to check.
|
||||
|
|
@ -140,38 +100,39 @@ func (t *TaskClient) StartRunner(ctx context.Context) {
|
|||
t.runner.Start(ctx)
|
||||
}
|
||||
|
||||
// Register registers a queue so tasks can be added to it and processed
|
||||
func (t *TaskClient) Register(queue Queue) {
|
||||
t.runner.Register(queue.Name(), queue.Receive)
|
||||
}
|
||||
|
||||
// New starts a task creation operation
|
||||
func (t *TaskClient) New(task Task) *TaskOp {
|
||||
return &TaskOp{
|
||||
// New starts a task save operation
|
||||
func (t *TaskClient) New(task Task) *TaskSaveOp {
|
||||
return &TaskSaveOp{
|
||||
client: t,
|
||||
task: task,
|
||||
}
|
||||
}
|
||||
|
||||
// At sets the exact date and time the task should be executed
|
||||
func (t *TaskOp) At(processAt time.Time) *TaskOp {
|
||||
func (t *TaskSaveOp) At(processAt time.Time) *TaskSaveOp {
|
||||
t.Wait(time.Until(processAt))
|
||||
return t
|
||||
}
|
||||
|
||||
// Wait instructs the task to wait a given duration before it is executed
|
||||
func (t *TaskOp) Wait(duration time.Duration) *TaskOp {
|
||||
func (t *TaskSaveOp) Wait(duration time.Duration) *TaskSaveOp {
|
||||
t.wait = &duration
|
||||
return t
|
||||
}
|
||||
|
||||
// Tx will insert the task as part of a given database transaction
|
||||
func (t *TaskOp) Tx(tx *sql.Tx) *TaskOp {
|
||||
// Tx will include the task as part of a given database transaction
|
||||
func (t *TaskSaveOp) Tx(tx *sql.Tx) *TaskSaveOp {
|
||||
t.tx = tx
|
||||
return t
|
||||
}
|
||||
|
||||
// Save saves the task so it can be executed
|
||||
func (t *TaskOp) Save() error {
|
||||
// Save saves the task so it can be queued for execution
|
||||
func (t *TaskSaveOp) Save() error {
|
||||
// Build the payload
|
||||
// TODO use gob?
|
||||
payload, err := json.Marshal(t.task)
|
||||
|
|
@ -194,3 +155,29 @@ func (t *TaskOp) Save() error {
|
|||
return jobs.CreateTx(context.Background(), t.tx, t.client.queue, t.task.Name(), payload)
|
||||
}
|
||||
}
|
||||
|
||||
// NewQueue queues a new type-safe Queue of a given Task type
|
||||
func NewQueue[T Task](subscriber QueueSubscriber[T]) Queue {
|
||||
var task T
|
||||
|
||||
q := &queue[T]{
|
||||
name: task.Name(),
|
||||
subscriber: subscriber,
|
||||
}
|
||||
|
||||
return q
|
||||
}
|
||||
|
||||
func (q *queue[T]) Name() string {
|
||||
return q.name
|
||||
}
|
||||
|
||||
func (q *queue[T]) Receive(ctx context.Context, payload []byte) error {
|
||||
var obj T
|
||||
err := json.Unmarshal(payload, &obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return q.subscriber(ctx, obj)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,36 @@
|
|||
package tasks
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/mikestefanello/pagoda/pkg/log"
|
||||
"github.com/mikestefanello/pagoda/pkg/services"
|
||||
)
|
||||
|
||||
// ExampleTask is an example implementation of services.Task
|
||||
// This represents the task that can be queued for execution via the task client and should contain everything
|
||||
// that your queue subscriber needs to process the task.
|
||||
type ExampleTask struct {
|
||||
Message string
|
||||
}
|
||||
|
||||
// Name satisfies the services.Task interface by proviing a unique name for this Task type
|
||||
func (t ExampleTask) Name() string {
|
||||
return "example_task"
|
||||
}
|
||||
|
||||
// NewExampleTaskQueue provides a Queue that can process ExampleTask tasks
|
||||
// The service container is provided so the subscriber can have access to the app dependencies.
|
||||
// All queues must be registered in the Register() function.
|
||||
// Whenever an ExampleTask is added to the task client, it will be queued and eventually sent here for execution.
|
||||
func NewExampleTaskQueue(c *services.Container) services.Queue {
|
||||
return services.NewQueue[ExampleTask](func(ctx context.Context, task ExampleTask) error {
|
||||
log.Default().Info("Example task received",
|
||||
"message", task.Message,
|
||||
)
|
||||
log.Default().Info("This can access the container for dependencies",
|
||||
"echo", c.Web.Reverse("home"),
|
||||
)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
|
|||
10
pkg/tasks/register.go
Normal file
10
pkg/tasks/register.go
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
package tasks
|
||||
|
||||
import (
|
||||
"github.com/mikestefanello/pagoda/pkg/services"
|
||||
)
|
||||
|
||||
// Register registers all task queues with the task client
|
||||
func Register(c *services.Container) {
|
||||
c.Tasks.Register(NewExampleTaskQueue(c))
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue