From a8bd9f8b2de64efa36376f84defb85a842e71d34 Mon Sep 17 00:00:00 2001 From: mikestefanello Date: Wed, 2 Feb 2022 21:24:52 -0500 Subject: [PATCH] Added asynq and a task client to the container to faciliate task queues. --- README.md | 138 ++++++++++++++++++++++++--- controller/controller_test.go | 11 ++- go.mod | 6 +- go.sum | 11 +++ main.go | 7 ++ middleware/middleware_test.go | 11 ++- routes/routes_test.go | 11 ++- services/container.go | 12 +++ services/container_test.go | 1 + services/services_test.go | 11 ++- services/tasks.go | 172 ++++++++++++++++++++++++++++++++++ services/tasks_test.go | 35 +++++++ 12 files changed, 392 insertions(+), 34 deletions(-) create mode 100644 services/tasks.go create mode 100644 services/tasks_test.go diff --git a/README.md b/README.md index 1b4aa19..b923dd2 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,10 @@ [![Go Report Card](https://goreportcard.com/badge/github.com/mikestefanello/pagoda)](https://goreportcard.com/report/github.com/mikestefanello/pagoda) [![Test](https://github.com/mikestefanello/pagoda/actions/workflows/test.yml/badge.svg)](https://github.com/mikestefanello/pagoda/actions/workflows/test.yml) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) +[![Go Reference](https://pkg.go.dev/badge/github.com/mikestefanello/pagoda.svg)](https://pkg.go.dev/github.com/mikestefanello/pagoda) [![GoT](https://img.shields.io/badge/Made%20with-Go-1f425f.svg)](https://go.dev) +

Logo

## Table of Contents @@ -80,6 +82,11 @@ * [Get data](#get-data) * [Flush data](#flush-data) * [Flush tags](#flush-tags) +* [Tasks](#tasks) + * [Queues](#queues) + * [Scheduled tasks](#scheduled-tasks) + * [Workers](#workers) + * [Monitoring](#monitoring) * [Static files](#static-files) * [Cache control headers](#cache-control-headers) * [Cache-buster](#cache-buster) @@ -180,6 +187,7 @@ The container is located at `services/container.go` and is meant to house all of - Authentication - Mail - Template renderer +- Tasks A new container can be created and initialized via `services.NewContainer()`. It can be later shutdown via `Shutdown()`. @@ -216,20 +224,21 @@ A helper function (`config.SwitchEnvironment`) is available to make switching th ```go func TestMain(m *testing.M) { - // Set the environment to test - config.SwitchEnvironment(config.EnvTest) + // Set the environment to test + config.SwitchEnvironment(config.EnvTest) - // Start a new container - c = services.NewContainer() - defer func() { - if err := c.Shutdown(); err != nil { - c.Web.Logger.Fatal(err) - } - }() + // Start a new container + c = services.NewContainer() - // Run tests - exitVal := m.Run() - os.Exit(exitVal) + // Run tests + exitVal := m.Run() + + // Shutdown the container + if err := c.Shutdown(); err != nil { + panic(err) + } + + os.Exit(exitVal) } ``` @@ -980,6 +989,110 @@ err := c.Cache. Execute(ctx) ``` +## Tasks + +Tasks are operations to be executed in the background, either in a queue, at a specfic time, after a given amount of time, or according to a periodic interval (like _cron_). Some examples of tasks could be long-running operations, bulk processing, cleanup, notifications, and so on. + +Since we're already using [Redis](https://redis.io) as a _cache_, it's available to act as a message broker as well and handle the processing of queued tasks. [Asynq](https://github.com/hibiken/asynq) is the library chosen to interface with Redis and handle queueing tasks and processing them asynchronously with workers. + +To make things even easier, a custom client (`TaskClient`) is provided as a _Service_ on the `Container` which exposes a simple interface with [asynq](https://github.com/hibiken/asynq). + +For more detailed information about [asynq](https://github.com/hibiken/asynq) and it's usage, review the [wiki](https://github.com/hibiken/asynq/wiki). + +### Queues + +All tasks must be placed in to queues in order to be executed by [workers](#workers). You are not required to specify a queue when creating a task, as it will be placed in the default queue if one is not provided. [Asynq](https://github.com/hibiken/asynq) supports multiple queues which allows for functionality such as [prioritization](https://github.com/hibiken/asynq/wiki/Queue-Priority). + +Creating a queued task is easy and at the minimum only requires the name of the task: + +```go +err := c.Tasks. + New("my_task"). + Save() +``` + +This will add a task to the _default_ queue with a task _name_ of `my_task`. The name is used to route the task to the correct [worker](#workers). + +#### Options + +Tasks can be created and queued with various chained options: + +```go +err := c.Tasks. + New("my_task"). + Payload(taskData). + Queue("critical"). + MaxRetries(5). + Timeout(30 * time.Second). + Wait(5 * time.Second). + Retain(2 * time.Hour). + Save() +``` + +In this example, this task will be: +- Assigned a task name of `my_task` +- The task worker will be sent `taskData` as the payload +- Put in to the `critical` queue +- Be retried up to 5 times in the event of a failure +- Timeout after 30 seconds of execution +- Wait 5 seconds before execution starts +- Retain the task data in Redis for 2 hours after execution completes + +### Scheduled tasks + +Tasks can be scheduled to execute at a single point in the future or at a periodic interval. These tasks can also use the options highlighted in the previous section. + +**To execute a task once at a specific time:** + +```go +err := c.Tasks. + New("my_task"). + At(time.Date(2022, time.November, 10, 23, 0, 0, 0, time.UTC)). + Save() +``` + +**To execute a periodic task using a cron schedule:** + +```go +err := c.Tasks. + New("my_task"). + Periodic("*/10 * * * *") + Save() +``` + +**To execute a periodic task using a simple syntax:** + +```go +err := c.Tasks. + New("my_task"). + Periodic("@every 10m") + Save() +``` + +#### Scheduler + +A service needs to run in order to add periodic tasks to the queue at the specified intervals. When the application is started, this _scheduler_ service will also be started. In `main.go`, this is done with the following code: + +```go +go func() { + if err := c.Tasks.StartScheduler(); err != nil { + c.Web.Logger.Fatalf("scheduler shutdown: %v", err) + } +}() +``` + +In the event of an application restart, periodic tasks must be re-registered with the _scheduler_ in order to continue being queued for execution. + +### Workers + +Workers are what executes the queued tasks. No workers are included so you will have to implement your own for each task you need to support. You have the option of listening to and executing tasks within this application, or creating a separate application to faciliate this. + +The [asynq quickstarter](https://github.com/hibiken/asynq#quickstart) provides a clear example of how to go about implementing this by leveraging `asynq.NewServer` to listen for queued tasks and `asynq.NewServeMux` to route tasks to their workers much like an HTTP router does. + +### Monitoring + +[Asynq](https://github.com/hibiken/asynq) comes with two options to monitor your queues: 1) [Command-line tool](https://github.com/hibiken/asynq#command-line-tool) and 2) [Web UI](https://github.com/hibiken/asynqmon) + ## Static files Static files are currently configured in the router (`routes/router.go`) to be served from the `static` directory. If you wish to change the directory, alter the constant `config.StaticDir`. The URL prefix for static files is `/files` which is controlled via the `config.StaticPrefix` constant. @@ -1084,6 +1197,7 @@ Future work includes but is not limited to: Thank you to all of the following amazing projects for making this possible. - [alpinejs](https://github.com/alpinejs/alpine) +- [asynq](https://github.com/hibiken/asynq) - [bulma](https://github.com/jgthms/bulma) - [docker](https://www.docker.com/) - [echo](https://github.com/labstack/echo) diff --git a/controller/controller_test.go b/controller/controller_test.go index 6d313b5..a9d0e28 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -30,14 +30,15 @@ func TestMain(m *testing.M) { // Create a new container c = services.NewContainer() - defer func() { - if err := c.Shutdown(); err != nil { - c.Web.Logger.Fatal(err) - } - }() // Run tests exitVal := m.Run() + + // Shutdown the container + if err := c.Shutdown(); err != nil { + panic(err) + } + os.Exit(exitVal) } diff --git a/go.mod b/go.mod index b6ec4e4..c538ff9 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( github.com/gorilla/context v1.1.1 // indirect github.com/gorilla/securecookie v1.1.1 // indirect github.com/hashicorp/hcl/v2 v2.10.0 // indirect + github.com/hibiken/asynq v0.21.0 // indirect github.com/huandu/xstrings v1.3.2 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect @@ -69,6 +70,7 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/sirupsen/logrus v1.6.0 // indirect github.com/spf13/cast v1.4.1 // indirect github.com/spf13/cobra v1.3.0 // indirect @@ -81,9 +83,9 @@ require ( golang.org/x/mod v0.5.1 // indirect golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect - golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect + golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect + golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect golang.org/x/tools v0.1.9-0.20211216111533-8d383106f7e7 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index cc37117..88f1bb1 100644 --- a/go.sum +++ b/go.sum @@ -246,6 +246,7 @@ github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl github.com/go-playground/validator/v10 v10.9.0 h1:NgTtmN58D0m8+UuxtYmGztBJB7VnPgjj221I1QHci2A= github.com/go-playground/validator/v10 v10.9.0/go.mod h1:74x4gJWsvQexRdW8Pn3dXSGrTK4nAUsbPlLADvpJkos= github.com/go-redis/redis/v8 v8.9.0/go.mod h1:ik7vb7+gm8Izylxu6kf6wG26/t2VljgCfSQ1DM4O1uU= +github.com/go-redis/redis/v8 v8.11.2/go.mod h1:DLomh7y2e3ggQXQLd1YgmvIfecPJoFl7WU5SOQ/r06M= github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg= github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= @@ -343,6 +344,7 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4 github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= @@ -410,6 +412,8 @@ github.com/hashicorp/memberlist v0.3.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOn github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk= github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4= +github.com/hibiken/asynq v0.21.0 h1:uH9XogJhjq/S39E0/DEPWLZQ6hHJ73UiblZTe4RzHwA= +github.com/hibiken/asynq v0.21.0/go.mod h1:tyc63ojaW8SJ5SBm8mvI4DDONsguP5HE85EEl4Qr5Ig= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huandu/xstrings v1.3.2 h1:L18LIDzqlW6xN2rEkpdV8+oL/IXWJ1APd+vsdYy4Wdw= github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= @@ -707,6 +711,8 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -833,6 +839,7 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= @@ -1082,6 +1089,8 @@ golang.org/x/sys v0.0.0-20211205182925-97ca703d548d h1:FjkYO/PPp4Wi0EAUOVLxePm7q golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 h1:XDXtA5hveEEV8JB2l7nhMTp3t3cHp9ZpwcdjqyEWLlo= +golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1101,6 +1110,8 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 h1:Hir2P/De0WpUhtrKGGjvSb2YxUgyZ7EFOSLIcSSpiwE= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/main.go b/main.go index d2c2715..4b9ee59 100644 --- a/main.go +++ b/main.go @@ -51,6 +51,13 @@ func main() { } }() + // Start the scheduler service to queue periodic tasks + go func() { + if err := c.Tasks.StartScheduler(); err != nil { + c.Web.Logger.Fatalf("scheduler shutdown: %v", err) + } + }() + // Wait for interrupt signal to gracefully shutdown the server with a timeout of 10 seconds. quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt) diff --git a/middleware/middleware_test.go b/middleware/middleware_test.go index 8752c9f..dc07986 100644 --- a/middleware/middleware_test.go +++ b/middleware/middleware_test.go @@ -21,11 +21,6 @@ func TestMain(m *testing.M) { // Create a new container c = services.NewContainer() - defer func() { - if err := c.Shutdown(); err != nil { - c.Web.Logger.Fatal(err) - } - }() // Create a user var err error @@ -35,5 +30,11 @@ func TestMain(m *testing.M) { // Run tests exitVal := m.Run() + + // Shutdown the container + if err = c.Shutdown(); err != nil { + panic(err) + } + os.Exit(exitVal) } diff --git a/routes/routes_test.go b/routes/routes_test.go index 7e2d308..f7409f2 100644 --- a/routes/routes_test.go +++ b/routes/routes_test.go @@ -28,11 +28,6 @@ func TestMain(m *testing.M) { // Start a new container c = services.NewContainer() - defer func() { - if err := c.Shutdown(); err != nil { - c.Web.Logger.Fatal(err) - } - }() // Start a test HTTP server BuildRouter(c) @@ -40,7 +35,13 @@ func TestMain(m *testing.M) { // Run tests exitVal := m.Run() + + // Shutdown the container and test server + if err := c.Shutdown(); err != nil { + panic(err) + } srv.Close() + os.Exit(exitVal) } diff --git a/services/container.go b/services/container.go index 9aabb70..14d0abd 100644 --- a/services/container.go +++ b/services/container.go @@ -46,6 +46,9 @@ type Container struct { // TemplateRenderer stores a service to easily render and cache templates TemplateRenderer *TemplateRenderer + + // Tasks stores the task client + Tasks *TaskClient } // NewContainer creates and initializes a new Container @@ -60,6 +63,7 @@ func NewContainer() *Container { c.initAuth() c.initTemplateRenderer() c.initMail() + c.initTasks() return c } @@ -74,6 +78,9 @@ func (c *Container) Shutdown() error { if err := c.Database.Close(); err != nil { return err } + if err := c.Tasks.Close(); err != nil { + return err + } return nil } @@ -182,3 +189,8 @@ func (c *Container) initMail() { panic(fmt.Sprintf("failed to create mail client: %v", err)) } } + +// initTasks initializes the task client +func (c *Container) initTasks() { + c.Tasks = NewTaskClient(c.Config.Cache) +} diff --git a/services/container_test.go b/services/container_test.go index dc875d9..bfe4cc3 100644 --- a/services/container_test.go +++ b/services/container_test.go @@ -16,4 +16,5 @@ func TestNewContainer(t *testing.T) { assert.NotNil(t, c.Mail) assert.NotNil(t, c.Auth) assert.NotNil(t, c.TemplateRenderer) + assert.NotNil(t, c.Tasks) } diff --git a/services/services_test.go b/services/services_test.go index 30e7387..9347f29 100644 --- a/services/services_test.go +++ b/services/services_test.go @@ -23,11 +23,6 @@ func TestMain(m *testing.M) { // Create a new container c = NewContainer() - defer func() { - if err := c.Shutdown(); err != nil { - c.Web.Logger.Fatal(err) - } - }() // Create a web context ctx, _ = tests.NewContext(c.Web, "/") @@ -41,5 +36,11 @@ func TestMain(m *testing.M) { // Run tests exitVal := m.Run() + + // Shutdown the container + if err = c.Shutdown(); err != nil { + panic(err) + } + os.Exit(exitVal) } diff --git a/services/tasks.go b/services/tasks.go new file mode 100644 index 0000000..00eb7ab --- /dev/null +++ b/services/tasks.go @@ -0,0 +1,172 @@ +package services + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/hibiken/asynq" + "github.com/mikestefanello/pagoda/config" +) + +type ( + // TaskClient is that client that allows you to queue or schedule task execution + TaskClient struct { + // client stores the asynq client + client *asynq.Client + + // scheduler stores the asynq scheduler + scheduler *asynq.Scheduler + } + + // task handles task creation operations + task struct { + client *TaskClient + name string + payload interface{} + periodic *string + queue *string + maxRetries *int + timeout *time.Duration + deadline *time.Time + at *time.Time + wait *time.Duration + retain *time.Duration + } +) + +// NewTaskClient creates a new task client +func NewTaskClient(cfg config.CacheConfig) *TaskClient { + conn := asynq.RedisClientOpt{ + Addr: fmt.Sprintf("%s:%d", cfg.Hostname, cfg.Port), + Password: cfg.Password, + } + + return &TaskClient{ + client: asynq.NewClient(conn), + scheduler: asynq.NewScheduler(conn, nil), + } +} + +// Close closes the connection to the task service +func (t *TaskClient) Close() error { + return t.client.Close() +} + +// StartScheduler starts the scheduler service which adds scheduled tasks to the queue +// This must be running in order to queue tasks set for periodic execution +func (t *TaskClient) StartScheduler() error { + return t.scheduler.Run() +} + +// New starts a task creation operation +func (t *TaskClient) New(name string) *task { + return &task{ + client: t, + name: name, + } +} + +// Payload sets the task payload data which will be sent to the task handler +func (t *task) Payload(payload interface{}) *task { + t.payload = payload + return t +} + +// Periodic sets the task to execute periodically according to a given interval +// The interval can be either in cron form ("*/5 * * * *") or "@every 30s" +func (t *task) Periodic(interval string) *task { + t.periodic = &interval + return t +} + +// Queue specifies the name of the queue to add the task to +// The default queue will be used if this is not set +func (t *task) Queue(queue string) *task { + t.queue = &queue + return t +} + +// Timeout sets the task timeout, meaning the task must execute within a given duration +func (t *task) Timeout(timeout time.Duration) *task { + t.timeout = &timeout + return t +} + +// Deadline sets the task execution deadline to a specific date and time +func (t *task) Deadline(deadline time.Time) *task { + t.deadline = &deadline + return t +} + +// At sets the exact date and time the task should be executed +func (t *task) At(processAt time.Time) *task { + t.at = &processAt + return t +} + +// Wait instructs the task to wait a given duration before it is executed +func (t *task) Wait(duration time.Duration) *task { + t.wait = &duration + return t +} + +// Retain instructs the task service to retain the task data for a given duration after execution is complete +func (t *task) Retain(duration time.Duration) *task { + t.retain = &duration + return t +} + +// MaxRetries sets the maximum amount of times to retry executing the task in the event of a failure +func (t *task) MaxRetries(retries int) *task { + t.maxRetries = &retries + return t +} + +// Save saves the task so it can be executed +func (t *task) Save() error { + var err error + + // Build the payload + var payload []byte + if t.payload != nil { + if payload, err = json.Marshal(t.payload); err != nil { + return err + } + } + + // Build the task options + opts := make([]asynq.Option, 0) + if t.queue != nil { + opts = append(opts, asynq.Queue(*t.queue)) + } + if t.maxRetries != nil { + opts = append(opts, asynq.MaxRetry(*t.maxRetries)) + } + if t.timeout != nil { + opts = append(opts, asynq.Timeout(*t.timeout)) + } + if t.deadline != nil { + opts = append(opts, asynq.Deadline(*t.deadline)) + } + if t.wait != nil { + opts = append(opts, asynq.ProcessIn(*t.wait)) + } + if t.retain != nil { + opts = append(opts, asynq.Retention(*t.retain)) + } + if t.at != nil { + opts = append(opts, asynq.ProcessAt(*t.at)) + } + + // Build the task + task := asynq.NewTask(t.name, payload, opts...) + + // Schedule, if needed + if t.periodic != nil { + _, err = t.client.scheduler.Register(*t.periodic, task) + } else { + _, err = t.client.client.Enqueue(task) + } + return err +} diff --git a/services/tasks_test.go b/services/tasks_test.go new file mode 100644 index 0000000..109e5b0 --- /dev/null +++ b/services/tasks_test.go @@ -0,0 +1,35 @@ +package services + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestTaskClient_New(t *testing.T) { + now := time.Now() + tk := c.Tasks. + New("task1"). + Payload("payload"). + Queue("queue"). + Periodic("@every 5s"). + MaxRetries(5). + Timeout(5 * time.Second). + Deadline(now). + At(now). + Wait(6 * time.Second). + Retain(7 * time.Second) + + assert.Equal(t, "task1", tk.name) + assert.Equal(t, "payload", tk.payload.(string)) + assert.Equal(t, "queue", *tk.queue) + assert.Equal(t, "@every 5s", *tk.periodic) + assert.Equal(t, 5, *tk.maxRetries) + assert.Equal(t, 5*time.Second, *tk.timeout) + assert.Equal(t, now, *tk.deadline) + assert.Equal(t, now, *tk.at) + assert.Equal(t, 6*time.Second, *tk.wait) + assert.Equal(t, 7*time.Second, *tk.retain) + assert.NoError(t, tk.Save()) +}