diff --git a/config/config.go b/config/config.go index 69f7344..7cb69f7 100644 --- a/config/config.go +++ b/config/config.go @@ -106,6 +106,7 @@ type ( // TasksConfig stores the tasks configuration TasksConfig struct { + // TODO remove separate DB? Driver string Connection string TestConnection string diff --git a/pkg/services/container.go b/pkg/services/container.go index 4ba6ca9..5fafac0 100644 --- a/pkg/services/container.go +++ b/pkg/services/container.go @@ -177,7 +177,9 @@ func (c *Container) initMail() { // initTasks initializes the task client func (c *Container) initTasks() { var err error - c.Tasks, err = NewTaskClient(c.Config) + // You could use a separate database for tasks, if you'd like. but using one + // makes transaction support easier + c.Tasks, err = NewTaskClient(c.Config.Tasks, c.Database) if err != nil { panic(fmt.Sprintf("failed to create task client: %v", err)) } diff --git a/pkg/services/tasks.go b/pkg/services/tasks.go index ace015e..c170a8a 100644 --- a/pkg/services/tasks.go +++ b/pkg/services/tasks.go @@ -20,7 +20,6 @@ type ( TaskClient struct { queue *goqite.Queue runner *jobs.Runner - db *sql.DB } Task interface { @@ -37,6 +36,7 @@ type ( //maxRetries *int //timeout *time.Duration //deadline *time.Time + tx *sql.Tx at *time.Time wait *time.Duration //retain *time.Duration @@ -81,24 +81,24 @@ func (q *queue[T]) Receive(ctx context.Context, payload []byte) error { } // NewTaskClient creates a new task client -func NewTaskClient(cfg *config.Config) (*TaskClient, error) { - var connection string - - switch cfg.App.Environment { - case config.EnvTest: - connection = cfg.Tasks.TestConnection - default: - connection = cfg.Tasks.Connection - } - - db, err := openDB(cfg.Tasks.Driver, connection) - if err != nil { - return nil, err - } - - // TODO is this correct? - db.SetMaxOpenConns(cfg.Tasks.Goroutines) - db.SetMaxIdleConns(cfg.Tasks.Goroutines) +func NewTaskClient(cfg config.TasksConfig, db *sql.DB) (*TaskClient, error) { + //var connection string + // + //switch cfg.App.Environment { + //case config.EnvTest: + // connection = cfg.Tasks.TestConnection + //default: + // connection = cfg.Tasks.Connection + //} + // + //db, err := openDB(cfg.Tasks.Driver, connection) + //if err != nil { + // return nil, err + //} + // + //// TODO is this correct? + //db.SetMaxOpenConns(cfg.Tasks.Goroutines) + //db.SetMaxIdleConns(cfg.Tasks.Goroutines) // Install the schema if err := goqite.Setup(context.Background(), db); err != nil { @@ -113,29 +113,29 @@ func NewTaskClient(cfg *config.Config) (*TaskClient, error) { queue: goqite.New(goqite.NewOpts{ DB: db, Name: "tasks", - MaxReceive: cfg.Tasks.MaxRetries, + MaxReceive: cfg.MaxRetries, }), - db: db, } t.runner = jobs.NewRunner(jobs.NewRunnerOpts{ - Limit: cfg.Tasks.Goroutines, + Limit: cfg.Goroutines, Log: log.Default(), - PollInterval: cfg.Tasks.PollInterval, + PollInterval: cfg.PollInterval, Queue: t.queue, }) return t, nil } -// Close closes the connection to the task service -func (t *TaskClient) Close() error { - // TODO close the runner - return t.db.Close() -} +//// Close closes the connection to the task service +//func (t *TaskClient) Close() error { +// // TODO close the runner +// return t.db.Close() +//} -// StartRunner starts the scheduler service which adds scheduled tasks to the queue -// This must be running in order to queue tasks set for periodic execution +// StartRunner starts the scheduler service which adds scheduled tasks to the queue. +// This must be running in order to execute queued tasked. +// To stop the runner, cancel the context. func (t *TaskClient) StartRunner(ctx context.Context) { t.runner.Start(ctx) } @@ -164,21 +164,33 @@ func (t *TaskOp) Wait(duration time.Duration) *TaskOp { return t } +// Tx will insert the task as part of a given database transaction +func (t *TaskOp) Tx(tx *sql.Tx) *TaskOp { + t.tx = tx + return t +} + // Save saves the task so it can be executed func (t *TaskOp) Save() error { // Build the payload + // TODO use gob? payload, err := json.Marshal(t.task) if err != nil { return err } - msg := goqite.Message{ - Body: payload, - } - - if t.wait != nil { - msg.Delay = *t.wait - } + //msg := goqite.Message{ + // Body: payload, + //} + // + //if t.wait != nil { + // msg.Delay = *t.wait + //} + // TODO support delay //return t.client.queue.Send(context.Background(), msg) - return jobs.Create(context.Background(), t.client.queue, t.task.Name(), payload) + if t.tx == nil { + return jobs.Create(context.Background(), t.client.queue, t.task.Name(), payload) + } else { + return jobs.CreateTx(context.Background(), t.tx, t.client.queue, t.task.Name(), payload) + } }