diff --git a/cmd/web/main.go b/cmd/web/main.go index c71130f..cca1047 100644 --- a/cmd/web/main.go +++ b/cmd/web/main.go @@ -3,6 +3,7 @@ package main import ( "context" "crypto/tls" + "errors" "fmt" "log" "log/slog" @@ -51,7 +52,7 @@ func main() { } } - if err := c.Web.StartServer(&srv); err != http.ErrServerClosed { + if err := c.Web.StartServer(&srv); errors.Is(err, http.ErrServerClosed) { log.Fatalf("shutting down the server: %v", err) } }() diff --git a/pkg/middleware/log_test.go b/pkg/middleware/log_test.go index 0549e24..7028806 100644 --- a/pkg/middleware/log_test.go +++ b/pkg/middleware/log_test.go @@ -77,7 +77,7 @@ func TestLogRequest(t *testing.T) { h := new(mockLogHandler) exec := func() { - ctx, _ := tests.NewContext(c.Web, "http://test.localhost/abc") + ctx, _ := tests.NewContext(c.Web, "http://test.localhost/abc?d=1") logger := slog.New(h).With("previous", "param") log.Set(ctx, logger) ctx.Request().Header.Set("Referer", "ref.com") @@ -101,7 +101,7 @@ func TestLogRequest(t *testing.T) { assert.Equal(t, "5", h.GetAttr("bytes_out")) assert.NotEmpty(t, h.GetAttr("latency")) assert.Equal(t, "INFO", h.level) - assert.Equal(t, "GET /abc", h.msg) + assert.Equal(t, "GET /abc?d=1", h.msg) statusCode = 500 exec() diff --git a/pkg/services/tasks.go b/pkg/services/tasks.go index 9eae30b..ace015e 100644 --- a/pkg/services/tasks.go +++ b/pkg/services/tasks.go @@ -14,7 +14,9 @@ import ( ) type ( - // TaskClient is that client that allows you to queue or schedule task execution + // TaskClient is that client that allows you to queue or schedule task execution. + // Under the hood we create only a single queue using goqite for all tasks because we do not want more than one + // runner to process the tasks. The TaskClient wrapper provides abstractions for separate, type-safe queues. TaskClient struct { queue *goqite.Queue runner *jobs.Runner @@ -40,12 +42,12 @@ type ( //retain *time.Duration } - Queuable interface { + Queue interface { Name() string Receive(ctx context.Context, payload []byte) error } - Queue[T any] struct { + queue[T any] struct { name string subscriber QueueSubscriber[T] } @@ -53,10 +55,10 @@ type ( QueueSubscriber[T any] func(context.Context, T) error ) -func NewQueue[T Task](subscriber QueueSubscriber[T]) *Queue[T] { +func NewQueue[T Task](subscriber QueueSubscriber[T]) Queue { var task T - q := &Queue[T]{ + q := &queue[T]{ name: task.Name(), subscriber: subscriber, } @@ -64,11 +66,11 @@ func NewQueue[T Task](subscriber QueueSubscriber[T]) *Queue[T] { return q } -func (q *Queue[T]) Name() string { +func (q *queue[T]) Name() string { return q.name } -func (q *Queue[T]) Receive(ctx context.Context, payload []byte) error { +func (q *queue[T]) Receive(ctx context.Context, payload []byte) error { var obj T err := json.Unmarshal(payload, &obj) if err != nil { @@ -94,8 +96,9 @@ func NewTaskClient(cfg *config.Config) (*TaskClient, error) { return nil, err } - //db.SetMaxOpenConns(1) - //db.SetMaxIdleConns(1) + // TODO is this correct? + db.SetMaxOpenConns(cfg.Tasks.Goroutines) + db.SetMaxIdleConns(cfg.Tasks.Goroutines) // Install the schema if err := goqite.Setup(context.Background(), db); err != nil { @@ -109,7 +112,7 @@ func NewTaskClient(cfg *config.Config) (*TaskClient, error) { t := &TaskClient{ queue: goqite.New(goqite.NewOpts{ DB: db, - Name: "jobs", + Name: "tasks", MaxReceive: cfg.Tasks.MaxRetries, }), db: db, @@ -137,11 +140,7 @@ func (t *TaskClient) StartRunner(ctx context.Context) { t.runner.Start(ctx) } -//func (t *TaskClient) Register(name string, processor jobs.Func) { -// t.runner.Register(name, processor) -//} - -func (t *TaskClient) Register(queue Queuable) { +func (t *TaskClient) Register(queue Queue) { t.runner.Register(queue.Name(), queue.Receive) } @@ -153,47 +152,9 @@ func (t *TaskClient) New(task Task) *TaskOp { } } -//// Payload sets the task payload data which will be sent to the task handler -//func (t *TaskOp) Payload(payload Task) *TaskOp { -// t.payload = payload -// return t -//} - -// // Periodic sets the task to execute periodically according to a given interval -// // The interval can be either in cron form ("*/5 * * * *") or "@every 30s" -// -// func (t *TaskOp) Periodic(interval string) *TaskOp { -// t.periodic = &interval -// return t -// } -// -// // Queue specifies the name of the queue to add the task to -// // The default queue will be used if this is not set -// -// func (t *TaskOp) Queue(queue string) *TaskOp { -// t.queue = &queue -// return t -// } -// -// // Timeout sets the task timeout, meaning the task must execute within a given duration -// -// func (t *TaskOp) Timeout(timeout time.Duration) *TaskOp { -// t.timeout = &timeout -// return t -// } -// -// // Deadline sets the task execution deadline to a specific date and time -// -// func (t *TaskOp) Deadline(deadline time.Time) *TaskOp { -// t.deadline = &deadline -// return t -// } -// - // At sets the exact date and time the task should be executed func (t *TaskOp) At(processAt time.Time) *TaskOp { - until := time.Until(processAt) - t.wait = &until + t.Wait(time.Until(processAt)) return t } @@ -203,53 +164,14 @@ func (t *TaskOp) Wait(duration time.Duration) *TaskOp { return t } -// -//// Retain instructs the task service to retain the task data for a given duration after execution is complete -//func (t *TaskOp) Retain(duration time.Duration) *TaskOp { -// t.retain = &duration -// return t -//} -// -//// MaxRetries sets the maximum amount of times to retry executing the task in the event of a failure -//func (t *TaskOp) MaxRetries(retries int) *TaskOp { -// t.maxRetries = &retries -// return t -//} - // Save saves the task so it can be executed func (t *TaskOp) Save() error { - var err error - // Build the payload payload, err := json.Marshal(t.task) if err != nil { return err } - // Build the task options - //opts := make([]asynq.Option, 0) - //if t.queue != nil { - // opts = append(opts, asynq.Queue(*t.queue)) - //} - //if t.maxRetries != nil { - // opts = append(opts, asynq.MaxRetry(*t.maxRetries)) - //} - //if t.timeout != nil { - // opts = append(opts, asynq.Timeout(*t.timeout)) - //} - //if t.deadline != nil { - // opts = append(opts, asynq.Deadline(*t.deadline)) - //} - //if t.wait != nil { - // opts = append(opts, asynq.ProcessIn(*t.wait)) - //} - //if t.retain != nil { - // opts = append(opts, asynq.Retention(*t.retain)) - //} - //if t.at != nil { - // opts = append(opts, asynq.ProcessAt(*t.at)) - //} - msg := goqite.Message{ Body: payload, }