Compare commits

...

11 commits

Author SHA1 Message Date
mikestefanello
bac8dc01cd Updated documentation. 2024-06-22 10:11:48 -04:00
mikestefanello
2d2c21df49 Updated documentation. 2024-06-22 09:57:02 -04:00
mikestefanello
b1c0ccf0c5 Use make in workflow. 2024-06-22 09:44:19 -04:00
mikestefanello
60a4a107ca Cleanup and documentation. 2024-06-22 09:43:24 -04:00
mikestefanello
2004d6b139 Added handler examples for caching and tasks. 2024-06-21 22:20:39 -04:00
mikestefanello
5707343d57 Added tests for the task client and runner. 2024-06-21 08:36:51 -04:00
mikestefanello
b68f9bdf3c Added optional delay to tasks. Pool buffers when encoding. 2024-06-20 22:11:53 -04:00
mikestefanello
912ae2ca6b Provide task queue registration and service container injection. 2024-06-19 13:53:44 -04:00
mikestefanello
0d2ad6e936 Merge branch 'main' of github.com:mikestefanello/pagoda into lite 2024-06-19 12:54:30 -04:00
mikestefanello
edf9af8fc4 Use same db for tasks. 2024-06-19 12:51:54 -04:00
mikestefanello
5e9e502b42 Log request URI rather than path. 2024-06-19 09:32:22 -04:00
24 changed files with 655 additions and 461 deletions

View file

@ -27,10 +27,5 @@ jobs:
restore-keys: | restore-keys: |
${{ runner.os }}-go- ${{ runner.os }}-go-
- name: Start containers
run: |
docker-compose up -d
sleep 3
- name: Test - name: Test
run: go test -p 1 ./... run: make test

View file

@ -1,32 +1,3 @@
# Determine if you have docker-compose or docker compose installed locally
# If this does not work on your system, just set the name of the executable you have installed
DCO_BIN := $(shell { command -v docker-compose || command -v docker compose; } 2>/dev/null)
# Connect to the primary database
.PHONY: db
db:
docker exec -it pagoda_db psql postgresql://admin:admin@localhost:5432/app
# Connect to the test database (you must run tests first before running this)
.PHONY: db-test
db-test:
docker exec -it pagoda_db psql postgresql://admin:admin@localhost:5432/app_test
# Connect to the primary cache
.PHONY: cache
cache:
docker exec -it pagoda_cache redis-cli
# Clear the primary cache
.PHONY: cache-clear
cache-clear:
docker exec -it pagoda_cache redis-cli flushall
# Connect to the test cache
.PHONY: cache-test
cache-test:
docker exec -it pagoda_cache redis-cli -n 1
# Install Ent code-generation module # Install Ent code-generation module
.PHONY: ent-install .PHONY: ent-install
ent-install: ent-install:
@ -42,28 +13,6 @@ ent-gen:
ent-new: ent-new:
go run entgo.io/ent/cmd/ent new $(name) go run entgo.io/ent/cmd/ent new $(name)
# Start the Docker containers
.PHONY: up
up:
$(DCO_BIN) up -d
sleep 3
# Stop the Docker containers
.PHONY: stop
stop:
$(DCO_BIN) stop
# Drop the Docker containers to wipe all data
.PHONY: down
down:
$(DCO_BIN) down
# Rebuild Docker containers to wipe all data
.PHONY: reset
reset:
$(DCO_BIN) down
make up
# Run the application # Run the application
.PHONY: run .PHONY: run
run: run:
@ -75,12 +24,6 @@ run:
test: test:
go test -count=1 -p 1 ./... go test -count=1 -p 1 ./...
# Run the worker
.PHONY: worker
worker:
clear
go run cmd/worker/main.go
# Check for direct dependency updates # Check for direct dependency updates
.PHONY: check-updates .PHONY: check-updates
check-updates: check-updates:

233
README.md
View file

@ -21,7 +21,6 @@
* [Dependencies](#dependencies) * [Dependencies](#dependencies)
* [Start the application](#start-the-application) * [Start the application](#start-the-application)
* [Running tests](#running-tests) * [Running tests](#running-tests)
* [Clients](#clients)
* [Service container](#service-container) * [Service container](#service-container)
* [Dependency injection](#dependency-injection) * [Dependency injection](#dependency-injection)
* [Test dependencies](#test-dependencies) * [Test dependencies](#test-dependencies)
@ -82,9 +81,8 @@
* [Flush tags](#flush-tags) * [Flush tags](#flush-tags)
* [Tasks](#tasks) * [Tasks](#tasks)
* [Queues](#queues) * [Queues](#queues)
* [Scheduled tasks](#scheduled-tasks) * [Runner](#runner)
* [Worker](#worker) * [Cron](#cron)
* [Monitoring](#monitoring)
* [Static files](#static-files) * [Static files](#static-files)
* [Cache control headers](#cache-control-headers) * [Cache control headers](#cache-control-headers)
* [Cache-buster](#cache-buster) * [Cache-buster](#cache-buster)
@ -123,8 +121,9 @@ Go server-side rendered HTML combined with the projects below enable you to crea
#### Storage #### Storage
- [PostgreSQL](https://www.postgresql.org/): The world's most advanced open source relational database. - [SQLite](https://sqlite.org/): A small, fast, self-contained, high-reliability, full-featured, SQL database engine and the most used database engine in the world.
- [Redis](https://redis.io/): In-memory data structure store, used as a database, cache, and message broker.
Originally, Postgres and Redis were chosen as defaults but since the aim of this project is rapid, simple development, it was changed to SQLite which now provides the primary data storage as well as persistent, background [task queues](#tasks). For [caching](#cache), a simple in-memory solution is provided. If you need to use something like Postgres or Redis, swapping those in can be done quickly and easily. For reference, [this branch](https://github.com/mikestefanello/pagoda/tree/postgres-redis) contains the code that included those (but is no longer maintained).
### Screenshots ### Screenshots
@ -144,40 +143,27 @@ Go server-side rendered HTML combined with the projects below enable you to crea
### Dependencies ### Dependencies
Ensure the following are installed on your system: Ensure that [Go](https://go.dev/) is installed on your system.
- [Go](https://go.dev/)
- [Docker](https://www.docker.com/)
- [Docker Compose](https://docs.docker.com/compose/install/)
### Start the application ### Start the application
After checking out the repository, from within the root, start the Docker containers for the database and cache by executing `make up`: After checking out the repository, from within the root, simply run `make run`:
``` ```
git clone git@github.com:mikestefanello/pagoda.git git clone git@github.com:mikestefanello/pagoda.git
cd pagoda cd pagoda
make up make run
``` ```
Since this repository is a _template_ and not a Go _library_, you **do not** use `go get`. Since this repository is a _template_ and not a Go _library_, you **do not** use `go get`.
Once that completes, you can start the application by executing `make run`. By default, you should be able to access the application in your browser at `localhost:8000`. By default, you should be able to access the application in your browser at `localhost:8000`. This can be changed via the [configuration](#configuration).
If you ever want to quickly drop the Docker containers and restart them in order to wipe all data, execute `make reset`. By default, your data will be stored within the `dbs` directory. If you ever want to quickly delete all data just remove this directory.
### Running tests ### Running tests
To run all tests in the application, execute `make test`. This ensures that the tests from each package are not run in parallel. This is required since many packages contain tests that connect to the test database which is dropped and recreated automatically for each package. To run all tests in the application, execute `make test`. This ensures that the tests from each package are not run in parallel. This is required since many packages contain tests that connect to the test database which is stored in memory and reset automatically for each package.
### Clients
The following _make_ commands are available to make it easy to connect to the database and cache.
- `make db`: Connects to the primary database
- `make db-test`: Connects to the test database
- `make cache`: Connects to the primary cache
- `make cache-test`: Connects to the test cache
## Service container ## Service container
@ -198,7 +184,7 @@ A new container can be created and initialized via `services.NewContainer()`. It
### Dependency injection ### Dependency injection
The container exists to faciliate easy dependency-injection both for services within the container as well as areas of your application that require any of these dependencies. For example, the container is automatically passed to the `Init()` method of your route handlers so that the handlers have full, easy access to all services. The container exists to faciliate easy dependency-injection both for services within the container as well as areas of your application that require any of these dependencies. For example, the container is automatically passed to the `Init()` method of your route [handlers](#handlers) so that the handlers have full, easy access to all services.
### Test dependencies ### Test dependencies
@ -217,11 +203,11 @@ Leveraging the functionality of [viper](https://github.com/spf13/viper) to manag
In `config/config.go`, the prefix is set as `pagoda` via `viper.SetEnvPrefix("pagoda")`. Nested fields require an underscore between levels. For example: In `config/config.go`, the prefix is set as `pagoda` via `viper.SetEnvPrefix("pagoda")`. Nested fields require an underscore between levels. For example:
```yaml ```yaml
cache: http:
port: 1234 port: 1234
``` ```
can be overridden by setting an environment variable with the name `PAGODA_CACHE_PORT`. can be overridden by setting an environment variable with the name `PAGODA_HTTP_PORT`.
### Environments ### Environments
@ -251,7 +237,7 @@ func TestMain(m *testing.M) {
## Database ## Database
The database currently used is [PostgreSQL](https://www.postgresql.org/) but you are free to use whatever you prefer. If you plan to continue using [Ent](https://entgo.io/), the incredible ORM, you can check their supported databases [here](https://entgo.io/docs/dialects). The database-driver and client is provided by [pgx](https://github.com/jackc/pgx/tree/v4) and included in the `Container`. The database currently used is [SQLite](https://sqlite.org/) but you are free to use whatever you prefer. If you plan to continue using [Ent](https://entgo.io/), the incredible ORM, you can check their supported databases [here](https://entgo.io/docs/dialects). The database driver is provided by [go-sqlite3](https://github.com/mattn/go-sqlite3). A reference to the database is included in the `Container` if direct access is required.
Database configuration can be found and managed within the `config` package. Database configuration can be found and managed within the `config` package.
@ -261,9 +247,11 @@ Database configuration can be found and managed within the `config` package.
### Separate test database ### Separate test database
Since many tests can require a database, this application supports a separate database specifically for tests. Within the `config`, the test database name can be specified at `Config.Database.TestDatabase`. Since many tests can require a database, this application supports a separate database specifically for tests. Within the `config`, the test database can be specified at `Config.Database.TestConnection`, which is the database connection string that will be used. By default, this will be an in-memory SQLite database.
When a `Container` is created, if the [environment](#environments) is set to `config.EnvTest`, the database client will connect to the test database instead, drop the database, recreate it, and run migrations so your tests start with a clean, ready-to-go database. Another benefit is that after the tests execute in a given package, you can connect to the test database to audit the data which can be useful for debugging. When a `Container` is created, if the [environment](#environments) is set to `config.EnvTest`, the database client will connect to the test database instead and run migrations so your tests start with a clean, ready-to-go database.
When this project was using Postgres, it would automatically drop and recreate the test database. Since the current default is in-memory, that is no longer needed. If you decide to use a test database not in-memory, you can alter the `Container` initialization code to do this for you.
## ORM ## ORM
@ -926,13 +914,11 @@ To include additional custom functions, add to the map in `NewFuncMap()` and def
## Cache ## Cache
As previously mentioned, [Redis](https://redis.io/) was chosen as the cache but it can be easily swapped out for something else. [go-redis](https://github.com/go-redis/redis) is used as the underlying client but the `Container` contains a custom client wrapper (`CacheClient`) that makes typical cache operations extremely simple. This wrapper does expose the [go-redis]() client however, at `CacheClient.Client`, in case you have a need for it. As previously mentioned, the default cache implementation is a simple in-memory store, backed by [otter](https://github.com/maypok86/otter), a lockless cache that uses [S3-FIFO](https://s3fifo.com/) eviction. The `Container` houses a `CacheClient` which is a useful, wrapper to interact with the cache (see examples below). Within the `CacheClient` is the underlying store interface `CacheStore`. If you wish to use a different store, such as Redis, and want to keep using the `CacheClient`, simply implement the `CacheStore` interface with a Redis library and adjust the `Container` initialization to use that.
The cache functionality within the `CacheClient` is powered by [gocache](https://github.com/eko/gocache) which was chosen because it makes interfacing with the cache service much easier, and it provides a consistent interface if you were to use a cache backend other than Redis. The built-in usage of the cache is currently only for optional [page caching](#cached-responses) and a simple example route located at `/cache` where you can set and view the value of a given cache entry.
The built-in usage of the cache is currently only for optional [page caching](#cached-responses) but it can be used for practically anything. See examples below: Since the current cache is in-memory, there's no need to adjust the `Container` during tests. When this project used Redis, the configuration had a separate database that would be used strictly for tests to avoid writing to your primary database. If you need that functionality, it is easy to add back in.
Similar to how there is a separate [test database](#separate-test-database) to avoid writing to your primary database when running tests, the cache supports a separate database as well for tests. Within the `config`, the test database number can be specified at `Config.Cache.TestDatabase`. By default, the primary database is `0` and the test database is `1`.
### Set data ### Set data
@ -943,6 +929,7 @@ err := c.Cache.
Set(). Set().
Key("my-key"). Key("my-key").
Data(myData). Data(myData).
Expiration(time.Hour * 2).
Save(ctx) Save(ctx)
``` ```
@ -953,6 +940,7 @@ err := c.Cache.
Set(). Set().
Group("my-group"). Group("my-group").
Key("my-key"). Key("my-key").
Expiration(time.Hour * 2).
Data(myData). Data(myData).
Save(ctx) Save(ctx)
``` ```
@ -964,16 +952,6 @@ err := c.Cache.
Set(). Set().
Key("my-key"). Key("my-key").
Tags("tag1", "tag2"). Tags("tag1", "tag2").
Data(myData).
Save(ctx)
```
**Include an expiration:**
```go
err := c.Cache.
Set().
Key("my-key").
Expiration(time.Hour * 2). Expiration(time.Hour * 2).
Data(myData). Data(myData).
Save(ctx) Save(ctx)
@ -986,12 +964,9 @@ data, err := c.Cache.
Get(). Get().
Group("my-group"). Group("my-group").
Key("my-key"). Key("my-key").
Type(myType).
Fetch(ctx) Fetch(ctx)
``` ```
The `Type` method tells the cache what type of data you stored so it can be cast afterwards with: `result, ok := data.(myType)`
### Flush data ### Flush data
```go ```go
@ -1013,29 +988,62 @@ err := c.Cache.
Execute(ctx) Execute(ctx)
``` ```
### Tagging
As shown in the previous examples, cache tags were provided because they can be convenient. However, maintaining them comes at a cost and it may not be a good fit for your application depending on your needs. When including tags, the `CacheClient` must lock in order to keep the tag index in sync. And since the tag index cannot support eviction, since that could result in a flush call not actually flushing the tag's keys, the maps that provide the index do not have a size limit. See the code for more details.
## Tasks ## Tasks
Tasks are operations to be executed in the background, either in a queue, at a specfic time, after a given amount of time, or according to a periodic interval (like _cron_). Some examples of tasks could be long-running operations, bulk processing, cleanup, notifications, and so on. Tasks are queued operations to be executed in the background, either immediately, at a specfic time, or after a given amount of time has passed. Some examples of tasks could be long-running operations, bulk processing, cleanup, notifications, etc.
Since we're already using [Redis](https://redis.io) as a _cache_, it's available to act as a message broker as well and handle the processing of queued tasks. [Asynq](https://github.com/hibiken/asynq) is the library chosen to interface with Redis and handle queueing tasks and processing them asynchronously with workers. Since we're already using [SQLite](https://sqlite.org/) for our database, it's available to act as a persistent store for queued tasks so that tasks are never lost, can be retried until successful, and their concurrent execution can be managed. [Goqite](https://github.com/maragudk/goqite) is the library chosen to interface with [SQLite](https://sqlite.org/) and handle queueing tasks and processing them asynchronously.
To make things even easier, a custom client (`TaskClient`) is provided as a _Service_ on the `Container` which exposes a simple interface with [asynq](https://github.com/hibiken/asynq). To make things even easier, a custom client (`TaskClient`) is provided as a _Service_ on the `Container` which exposes a simple interface with [goqite](https://github.com/maragudk/goqite) that supports type-safe tasks and queues.
For more detailed information about [asynq](https://github.com/hibiken/asynq) and it's usage, review the [wiki](https://github.com/hibiken/asynq/wiki).
### Queues ### Queues
All tasks must be placed in to queues in order to be executed by the [worker](#worker). You are not required to specify a queue when creating a task, as it will be placed in the default queue if one is not provided. [Asynq](https://github.com/hibiken/asynq) supports multiple queues which allows for functionality such as [prioritization](https://github.com/hibiken/asynq/wiki/Queue-Priority). A full example of a queue implementation can be found in `pkg/tasks` with an interactive form to create a task and add to the queue at `/task` (see `pkg/handlers/task.go`).
Creating a queued task is easy and at the minimum only requires the name of the task: A queue starts by declaring a `Task` _type_, which is the object that gets placed in to a queue and eventually passed to a queue subscriber (a callback function to process the task). A `Task` must implement the `Name()` method which returns a unique name for the task. For example:
```go ```go
err := c.Tasks. type MyTask struct {
New("my_task"). Text string
Save() Num int
}
func (t MyTask) Name() string {
return "my_task"
}
``` ```
This will add a task to the _default_ queue with a task _type_ of `my_task`. The type is used to route the task to the correct [worker](#worker). Then, create the queue for `MyTask` tasks:
```go
q := services.NewQueue[MyTask](func(ctx context.Context, task MyTask) error {
// This is where you process the task
fmt.Println("Processed %s task!", task.Text)
return nil
})
```
And finally, register the queue with the `TaskClient`:
```go
c.Tasks.Register(q)
```
See `pkg/tasks/register.go` for a simple way to register all of your queues and to easily pass the `Container` to them so the queue subscriber callbacks have access to all of your app's dependencies.
Now you can easily add a task to the queue using the `TaskClient`:
```go
task := MyTask{Text: "Hello world!", Num: 10}
err := c.Tasks.
New(task).
Save()
```
#### Options #### Options
@ -1043,98 +1051,26 @@ Tasks can be created and queued with various chained options:
```go ```go
err := c.Tasks. err := c.Tasks.
New("my_task"). New(task).
Payload(taskData). Wait(30 * time.Second). // Wait 30 seconds before passing the task to the subscriber
Queue("critical"). At(time.Date(...)). // Wait until a given date before passing the task to the subscriber
MaxRetries(5). Tx(tx). // Include the queueing of this task in a database transaction
Timeout(30 * time.Second).
Wait(5 * time.Second).
Retain(2 * time.Hour).
Save() Save()
``` ```
In this example, this task will be: ### Runner
- Assigned a task type of `my_task`
- The task worker will be sent `taskData` as the payload
- Put in to the `critical` queue
- Be retried up to 5 times in the event of a failure
- Timeout after 30 seconds of execution
- Wait 5 seconds before execution starts
- Retain the task data in Redis for 2 hours after execution completes
### Scheduled tasks The _task runner_ is what manages periodically polling the database for available queued tasks to process and passing them to the queue's subscriber callback. This must be started in order for this to happen. In `cmd/web/main.go`, the _task runner_ is started by using the `TaskClient`:
Tasks can be scheduled to execute at a single point in the future or at a periodic interval. These tasks can also use the options highlighted in the previous section.
**To execute a task once at a specific time:**
```go ```go
err := c.Tasks. go c.Tasks.StartRunner(ctx)
New("my_task").
At(time.Date(2022, time.November, 10, 23, 0, 0, 0, time.UTC)).
Save()
``` ```
**To execute a periodic task using a cron schedule:** The app [configuration](#configuration) contains values to configure the runner including how often to poll the database for tasks, the maximum amount of retries for a given task, and the amount of tasks that can be processed concurrently.
```go ## Cron
err := c.Tasks.
New("my_task").
Periodic("*/10 * * * *")
Save()
```
**To execute a periodic task using a simple syntax:** By default, no cron solution is provided because it's very easy to add yourself if you need this. You can either use a [ticker](https://pkg.go.dev/time#Ticker) or a [library](https://github.com/robfig/cron).
```go
err := c.Tasks.
New("my_task").
Periodic("@every 10m")
Save()
```
#### Scheduler
A service needs to run in order to add periodic tasks to the queue at the specified intervals. When the application is started, this _scheduler_ service will also be started. In `cmd/web/main.go`, this is done with the following code:
```go
go func() {
if err := c.Tasks.StartScheduler(); err != nil {
log.Fatalf("scheduler shutdown: %v", err)
}
}()
```
In the event of an application restart, periodic tasks must be re-registered with the _scheduler_ in order to continue being queued for execution.
### Worker
The worker is a service that executes the queued tasks using task processors. Included is a basic implementation of a separate worker service that will listen for and execute tasks being added to the queues. If you prefer to move the worker so it runs alongside the web server, you can do that, though it's recommended to keep these processes separate for performance and scalability reasons.
The underlying functionality of the worker service is provided by [asynq](https://github.com/hibiken/asynq), so it's highly recommended that you review the documentation for that project first.
#### Starting the worker
A make target was added to allow you to start the worker service easily. From the root of the repository, execute `make worker`.
#### Understanding the service
The worker service is located in [cmd/worker/main.go](/cmd/worker/main.go) and starts with the creation of a new `*asynq.Server` provided by `asynq.NewServer()`. There are various configuration options available, so be sure to review them all.
Prior to starting the service, we need to route tasks according to their _type_ to their handlers which will process the tasks. This is done by using `async.ServeMux` much like you would use an HTTP router:
```go
mux := asynq.NewServeMux()
mux.Handle(tasks.TypeExample, new(tasks.ExampleProcessor))
```
In this example, all tasks of _type_ `tasks.TypeExample` will be routed to `ExampleProcessor` which is a struct that implements `ProcessTask()`. See the included [basic example](/pkg/tasks/example.go).
Finally, the service is started with `async.Server.Run(mux)`.
### Monitoring
[Asynq](https://github.com/hibiken/asynq) comes with two options to monitor your queues: 1) [Command-line tool](https://github.com/hibiken/asynq#command-line-tool) and 2) [Web UI](https://github.com/hibiken/asynqmon)
## Static files ## Static files
@ -1266,22 +1202,19 @@ Future work includes but is not limited to:
Thank you to all of the following amazing projects for making this possible. Thank you to all of the following amazing projects for making this possible.
- [alpinejs](https://github.com/alpinejs/alpine) - [alpinejs](https://github.com/alpinejs/alpine)
- [asynq](https://github.com/hibiken/asynq)
- [bulma](https://github.com/jgthms/bulma) - [bulma](https://github.com/jgthms/bulma)
- [docker](https://www.docker.com/)
- [echo](https://github.com/labstack/echo) - [echo](https://github.com/labstack/echo)
- [ent](https://github.com/ent/ent) - [ent](https://github.com/ent/ent)
- [go](https://go.dev/) - [go](https://go.dev/)
- [gocache](https://github.com/eko/gocache) - [go-sqlite3](https://github.com/mattn/go-sqlite3)
- [goqite](https://github.com/maragudk/goqite)
- [goquery](https://github.com/PuerkitoBio/goquery) - [goquery](https://github.com/PuerkitoBio/goquery)
- [go-redis](https://github.com/go-redis/redis)
- [htmx](https://github.com/bigskysoftware/htmx) - [htmx](https://github.com/bigskysoftware/htmx)
- [jwt](https://github.com/golang-jwt/jwt) - [jwt](https://github.com/golang-jwt/jwt)
- [pgx](https://github.com/jackc/pgx) - [otter](https://github.com/maypok86/otter)
- [postgresql](https://www.postgresql.org/)
- [redis](https://redis.io/)
- [sprig](https://github.com/Masterminds/sprig)
- [sessions](https://github.com/gorilla/sessions) - [sessions](https://github.com/gorilla/sessions)
- [sprig](https://github.com/Masterminds/sprig)
- [sqlite](https://sqlite.org/)
- [testify](https://github.com/stretchr/testify) - [testify](https://github.com/stretchr/testify)
- [validator](https://github.com/go-playground/validator) - [validator](https://github.com/go-playground/validator)
- [viper](https://github.com/spf13/viper) - [viper](https://github.com/spf13/viper)

View file

@ -6,7 +6,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"log" "log"
"log/slog"
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
@ -57,17 +56,12 @@ func main() {
} }
}() }()
q := services.NewQueue[tasks.ExampleTask]( // Register all task queues
func(ctx context.Context, task tasks.ExampleTask) error { tasks.Register(c)
slog.Info("Example task received", "message", task.Message)
return nil
},
)
c.Tasks.Register(q)
// Start the scheduler service to queue periodic tasks // Start the task runner to execute queued tasks
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
c.Tasks.StartRunner(ctx) go c.Tasks.StartRunner(ctx)
// Wait for interrupt signal to gracefully shut down the server with a timeout of 10 seconds. // Wait for interrupt signal to gracefully shut down the server with a timeout of 10 seconds.
quit := make(chan os.Signal, 1) quit := make(chan os.Signal, 1)

View file

@ -106,9 +106,6 @@ type (
// TasksConfig stores the tasks configuration // TasksConfig stores the tasks configuration
TasksConfig struct { TasksConfig struct {
Driver string
Connection string
TestConnection string
PollInterval time.Duration PollInterval time.Duration
MaxRetries int MaxRetries int
Goroutines int Goroutines int

View file

@ -32,12 +32,9 @@ database:
testConnection: ":memory:?_journal=WAL&_timeout=5000&_fk=true" testConnection: ":memory:?_journal=WAL&_timeout=5000&_fk=true"
tasks: tasks:
driver: "sqlite3"
connection: "dbs/jobs.db?_journal=WAL&_timeout=5000&_fk=true"
testConnection: ":memory:?_journal=WAL&_timeout=5000&_fk=true"
pollInterval: "1s" pollInterval: "1s"
maxRetries: 10 maxRetries: 10
goRoutines: 1 goroutines: 1
mail: mail:
hostname: "localhost" hostname: "localhost"

View file

@ -1,18 +0,0 @@
version: "3"
services:
cache:
image: "redis:alpine"
container_name: pagoda_cache
ports:
- "127.0.0.1:6379:6379"
db:
# PG 16 is currently not supported https://github.com/ent/ent/issues/3750
image: postgres:15-alpine
container_name: pagoda_db
ports:
- "127.0.0.1:5432:5432"
environment:
- POSTGRES_USER=admin
- POSTGRES_PASSWORD=admin
- POSTGRES_DB=app

View file

@ -103,6 +103,8 @@ func (f *Submission) setErrorMessages(err error) {
message = "Enter a valid email address." message = "Enter a valid email address."
case "eqfield": case "eqfield":
message = "Does not match." message = "Does not match."
case "gte":
message = fmt.Sprintf("Must be greater than or equal to %v.", ve.Param())
default: default:
message = "Invalid value." message = "Invalid value."
} }

92
pkg/handlers/cache.go Normal file
View file

@ -0,0 +1,92 @@
package handlers
import (
"errors"
"github.com/labstack/echo/v4"
"github.com/mikestefanello/pagoda/pkg/form"
"github.com/mikestefanello/pagoda/pkg/page"
"github.com/mikestefanello/pagoda/pkg/services"
"github.com/mikestefanello/pagoda/templates"
"time"
)
const (
routeNameCache = "cache"
routeNameCacheSubmit = "cache.submit"
)
type (
Cache struct {
cache *services.CacheClient
*services.TemplateRenderer
}
cacheForm struct {
Value string `form:"value"`
form.Submission
}
)
func init() {
Register(new(Cache))
}
func (h *Cache) Init(c *services.Container) error {
h.TemplateRenderer = c.TemplateRenderer
h.cache = c.Cache
return nil
}
func (h *Cache) Routes(g *echo.Group) {
g.GET("/cache", h.Page).Name = routeNameCache
g.POST("/cache", h.Submit).Name = routeNameCacheSubmit
}
func (h *Cache) Page(ctx echo.Context) error {
p := page.New(ctx)
p.Layout = templates.LayoutMain
p.Name = templates.PageCache
p.Title = "Set a cache entry"
p.Form = form.Get[cacheForm](ctx)
// Fetch the value from the cache
value, err := h.cache.
Get().
Key("page_cache_example").
Fetch(ctx.Request().Context())
// Store the value in the page, so it can be rendered, if found
switch {
case err == nil:
p.Data = value.(string)
case errors.Is(err, services.ErrCacheMiss):
default:
return fail(err, "failed to fetch from cache")
}
return h.RenderPage(ctx, p)
}
func (h *Cache) Submit(ctx echo.Context) error {
var input cacheForm
if err := form.Submit(ctx, &input); err != nil {
return err
}
// Set the cache
err := h.cache.
Set().
Key("page_cache_example").
Data(input.Value).
Expiration(30 * time.Minute).
Save(ctx.Request().Context())
if err != nil {
return fail(err, "unable to set cache")
}
form.Clear(ctx)
return h.Page(ctx)
}

View file

@ -2,14 +2,11 @@ package handlers
import ( import (
"fmt" "fmt"
"time"
"github.com/go-playground/validator/v10" "github.com/go-playground/validator/v10"
"github.com/labstack/echo/v4" "github.com/labstack/echo/v4"
"github.com/mikestefanello/pagoda/pkg/form" "github.com/mikestefanello/pagoda/pkg/form"
"github.com/mikestefanello/pagoda/pkg/page" "github.com/mikestefanello/pagoda/pkg/page"
"github.com/mikestefanello/pagoda/pkg/services" "github.com/mikestefanello/pagoda/pkg/services"
"github.com/mikestefanello/pagoda/pkg/tasks"
"github.com/mikestefanello/pagoda/templates" "github.com/mikestefanello/pagoda/templates"
) )
@ -21,7 +18,6 @@ const (
type ( type (
Contact struct { Contact struct {
mail *services.MailClient mail *services.MailClient
tasks *services.TaskClient
*services.TemplateRenderer *services.TemplateRenderer
} }
@ -40,7 +36,6 @@ func init() {
func (h *Contact) Init(c *services.Container) error { func (h *Contact) Init(c *services.Container) error {
h.TemplateRenderer = c.TemplateRenderer h.TemplateRenderer = c.TemplateRenderer
h.mail = c.Mail h.mail = c.Mail
h.tasks = c.Tasks
return nil return nil
} }
@ -72,15 +67,6 @@ func (h *Contact) Submit(ctx echo.Context) error {
return err return err
} }
err = h.tasks.New(tasks.ExampleTask{
Message: input.Message,
}).
Wait(30 * time.Second).
Save()
if err != nil {
return err
}
err = h.mail. err = h.mail.
Compose(). Compose().
To(input.Email). To(input.Email).

88
pkg/handlers/task.go Normal file
View file

@ -0,0 +1,88 @@
package handlers
import (
"fmt"
"github.com/mikestefanello/pagoda/pkg/msg"
"time"
"github.com/go-playground/validator/v10"
"github.com/labstack/echo/v4"
"github.com/mikestefanello/pagoda/pkg/form"
"github.com/mikestefanello/pagoda/pkg/page"
"github.com/mikestefanello/pagoda/pkg/services"
"github.com/mikestefanello/pagoda/pkg/tasks"
"github.com/mikestefanello/pagoda/templates"
)
const (
routeNameTask = "task"
routeNameTaskSubmit = "task.submit"
)
type (
Task struct {
tasks *services.TaskClient
*services.TemplateRenderer
}
taskForm struct {
Delay int `form:"delay" validate:"gte=0"`
Message string `form:"message" validate:"required"`
form.Submission
}
)
func init() {
Register(new(Task))
}
func (h *Task) Init(c *services.Container) error {
h.TemplateRenderer = c.TemplateRenderer
h.tasks = c.Tasks
return nil
}
func (h *Task) Routes(g *echo.Group) {
g.GET("/task", h.Page).Name = routeNameTask
g.POST("/task", h.Submit).Name = routeNameTaskSubmit
}
func (h *Task) Page(ctx echo.Context) error {
p := page.New(ctx)
p.Layout = templates.LayoutMain
p.Name = templates.PageTask
p.Title = "Create a task"
p.Form = form.Get[taskForm](ctx)
return h.RenderPage(ctx, p)
}
func (h *Task) Submit(ctx echo.Context) error {
var input taskForm
err := form.Submit(ctx, &input)
switch err.(type) {
case nil:
case validator.ValidationErrors:
return h.Page(ctx)
default:
return err
}
// Insert the task
err = h.tasks.New(tasks.ExampleTask{
Message: input.Message,
}).
Wait(time.Duration(input.Delay) * time.Second).
Save()
if err != nil {
return fail(err, "unable to create a task")
}
msg.Success(ctx, fmt.Sprintf("The task has been created. Check the logs in %d seconds.", input.Delay))
form.Clear(ctx)
return h.Page(ctx)
}

View file

@ -77,7 +77,7 @@ func TestLogRequest(t *testing.T) {
h := new(mockLogHandler) h := new(mockLogHandler)
exec := func() { exec := func() {
ctx, _ := tests.NewContext(c.Web, "http://test.localhost/abc?d=1") ctx, _ := tests.NewContext(c.Web, "http://test.localhost/abc?d=1&e=2")
logger := slog.New(h).With("previous", "param") logger := slog.New(h).With("previous", "param")
log.Set(ctx, logger) log.Set(ctx, logger)
ctx.Request().Header.Set("Referer", "ref.com") ctx.Request().Header.Set("Referer", "ref.com")
@ -101,7 +101,7 @@ func TestLogRequest(t *testing.T) {
assert.Equal(t, "5", h.GetAttr("bytes_out")) assert.Equal(t, "5", h.GetAttr("bytes_out"))
assert.NotEmpty(t, h.GetAttr("latency")) assert.NotEmpty(t, h.GetAttr("latency"))
assert.Equal(t, "INFO", h.level) assert.Equal(t, "INFO", h.level)
assert.Equal(t, "GET /abc?d=1", h.msg) assert.Equal(t, "GET /abc?d=1&e=2", h.msg)
statusCode = 500 statusCode = 500
exec() exec()

View file

@ -71,8 +71,9 @@ type (
// locking, and we need to keep track of this index in order to keep everything in sync. // locking, and we need to keep track of this index in order to keep everything in sync.
// If using something like Redis for caching, you can leverage sets to store the index. // If using something like Redis for caching, you can leverage sets to store the index.
// Cache tags can be useful and convenient, so you should decide if your app benefits enough from this. // Cache tags can be useful and convenient, so you should decide if your app benefits enough from this.
// As it stands there, there is no limiting how much memory this will consume and it will track all keys // As it stands here, there is no limiting how much memory this will consume and it will track all keys
// and tags added and removed from the cache. // and tags added and removed from the cache. You could store these in the cache itself but allowing these to
// be evicted poses challenges.
tagIndex struct { tagIndex struct {
sync.Mutex sync.Mutex
tags map[string]map[string]struct{} // tag->keys tags map[string]map[string]struct{} // tag->keys
@ -314,7 +315,7 @@ func (i *tagIndex) purgeTags(tags ...string) []string {
keys := make([]string, 0) keys := make([]string, 0)
for _, tag := range tags { for _, tag := range tags {
tagKeys := i.tags[tag] if tagKeys, exists := i.tags[tag]; exists {
delete(i.tags, tag) delete(i.tags, tag)
for key := range tagKeys { for key := range tagKeys {
@ -326,6 +327,7 @@ func (i *tagIndex) purgeTags(tags ...string) []string {
keys = append(keys, key) keys = append(keys, key)
} }
} }
}
return keys return keys
} }
@ -335,7 +337,7 @@ func (i *tagIndex) purgeKeys(keys ...string) {
defer i.Unlock() defer i.Unlock()
for _, key := range keys { for _, key := range keys {
keyTags := i.keys[key] if keyTags, exists := i.keys[key]; exists {
delete(i.keys, key) delete(i.keys, key)
for tag := range keyTags { for tag := range keyTags {
@ -345,4 +347,5 @@ func (i *tagIndex) purgeKeys(keys ...string) {
} }
} }
} }
}
} }

View file

@ -13,6 +13,7 @@ func TestCacheClient(t *testing.T) {
type cacheTest struct { type cacheTest struct {
Value string Value string
} }
// Cache some data // Cache some data
data := cacheTest{Value: "abcdef"} data := cacheTest{Value: "abcdef"}
group := "testgroup" group := "testgroup"
@ -22,7 +23,7 @@ func TestCacheClient(t *testing.T) {
Group(group). Group(group).
Key(key). Key(key).
Data(data). Data(data).
Expiration(time.Hour). Expiration(500 * time.Millisecond).
Save(context.Background()) Save(context.Background())
require.NoError(t, err) require.NoError(t, err)
@ -53,7 +54,7 @@ func TestCacheClient(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// The data should be gone // The data should be gone
assertFlushed := func() { assertFlushed := func(key string) {
// The data should be gone // The data should be gone
_, err = c.Cache. _, err = c.Cache.
Get(). Get().
@ -62,20 +63,33 @@ func TestCacheClient(t *testing.T) {
Fetch(context.Background()) Fetch(context.Background())
assert.Equal(t, ErrCacheMiss, err) assert.Equal(t, ErrCacheMiss, err)
} }
assertFlushed() assertFlushed(key)
// Set with tags // Set with tags
key = "testkey2"
err = c.Cache. err = c.Cache.
Set(). Set().
Group(group). Group(group).
Key(key). Key(key).
Data(data). Data(data).
Tags("tag1"). Tags("tag1", "tag2").
Expiration(time.Hour). Expiration(time.Hour).
Save(context.Background()) Save(context.Background())
require.NoError(t, err) require.NoError(t, err)
// Flush the tag // Check the tag index
index := c.Cache.store.(*inMemoryCacheStore).tagIndex
gk := c.Cache.cacheKey(group, key)
_, exists := index.tags["tag1"][gk]
assert.True(t, exists)
_, exists = index.tags["tag2"][gk]
assert.True(t, exists)
_, exists = index.keys[gk]["tag1"]
assert.True(t, exists)
_, exists = index.keys[gk]["tag2"]
assert.True(t, exists)
// Flush one of tags
err = c.Cache. err = c.Cache.
Flush(). Flush().
Tags("tag1"). Tags("tag1").
@ -83,22 +97,9 @@ func TestCacheClient(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// The data should be gone // The data should be gone
assertFlushed() assertFlushed(key)
// Set with expiration // The index should be empty
err = c.Cache. assert.Empty(t, index.tags)
Set(). assert.Empty(t, index.keys)
Group(group).
Key(key).
Data(data).
Expiration(time.Millisecond).
Save(context.Background())
require.NoError(t, err)
// Wait for expiration
// TODO why does this need to wait so long?
time.Sleep(time.Millisecond * 500)
// The data should be gone
assertFlushed()
} }

View file

@ -70,11 +70,9 @@ func NewContainer() *Container {
return c return c
} }
// Shutdown shuts the Container down and disconnects all connections // Shutdown shuts the Container down and disconnects all connections.
// If the task runner was started, cancel the context to shut it down prior to calling this.
func (c *Container) Shutdown() error { func (c *Container) Shutdown() error {
if err := c.Tasks.Close(); err != nil {
return err
}
if err := c.ORM.Close(); err != nil { if err := c.ORM.Close(); err != nil {
return err return err
} }
@ -177,7 +175,9 @@ func (c *Container) initMail() {
// initTasks initializes the task client // initTasks initializes the task client
func (c *Container) initTasks() { func (c *Container) initTasks() {
var err error var err error
c.Tasks, err = NewTaskClient(c.Config) // You could use a separate database for tasks, if you'd like. but using one
// makes transaction support easier
c.Tasks, err = NewTaskClient(c.Config.Tasks, c.Database)
if err != nil { if err != nil {
panic(fmt.Sprintf("failed to create task client: %v", err)) panic(fmt.Sprintf("failed to create task client: %v", err))
} }

View file

@ -1,10 +1,12 @@
package services package services
import ( import (
"bytes"
"context" "context"
"database/sql" "database/sql"
"encoding/json" "encoding/gob"
"strings" "strings"
"sync"
"time" "time"
"github.com/maragudk/goqite" "github.com/maragudk/goqite"
@ -20,41 +22,162 @@ type (
TaskClient struct { TaskClient struct {
queue *goqite.Queue queue *goqite.Queue
runner *jobs.Runner runner *jobs.Runner
db *sql.DB buffers sync.Pool
} }
// Task is a job that can be added to a queue and later passed to and executed by a QueueSubscriber.
// See pkg/tasks for an example of how this can be used with a queue.
Task interface { Task interface {
Name() string Name() string
} }
// TaskOp handles task creation operations // TaskSaveOp handles task save operations
TaskOp struct { TaskSaveOp struct {
client *TaskClient client *TaskClient
task Task task Task
//payload any tx *sql.Tx
//periodic *string
//queue *string
//maxRetries *int
//timeout *time.Duration
//deadline *time.Time
at *time.Time at *time.Time
wait *time.Duration wait *time.Duration
//retain *time.Duration
} }
// Queue is a queue that a Task can be pushed to for execution.
// While this can be implemented directly, it's recommended to use NewQueue() which uses generics in
// order to provide type-safe queues and queue subscriber callbacks for task execution.
Queue interface { Queue interface {
// Name returns the name of the task this queue processes
Name() string Name() string
// Receive receives the Task payload to be processed
Receive(ctx context.Context, payload []byte) error Receive(ctx context.Context, payload []byte) error
} }
queue[T any] struct { // queue provides a type-safe implementation of Queue
queue[T Task] struct {
name string name string
subscriber QueueSubscriber[T] subscriber QueueSubscriber[T]
} }
QueueSubscriber[T any] func(context.Context, T) error // QueueSubscriber is a generic subscriber callback for a given queue to process Tasks
QueueSubscriber[T Task] func(context.Context, T) error
) )
// NewTaskClient creates a new task client
func NewTaskClient(cfg config.TasksConfig, db *sql.DB) (*TaskClient, error) {
// Install the schema
if err := goqite.Setup(context.Background(), db); err != nil {
// An error is returned if we already ran this and there's no better way to check.
// You can and probably should handle this via migrations
if !strings.Contains(err.Error(), "already exists") {
return nil, err
}
}
t := &TaskClient{
queue: goqite.New(goqite.NewOpts{
DB: db,
Name: "tasks",
MaxReceive: cfg.MaxRetries,
}),
buffers: sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
},
}
t.runner = jobs.NewRunner(jobs.NewRunnerOpts{
Limit: cfg.Goroutines,
Log: log.Default(),
PollInterval: cfg.PollInterval,
Queue: t.queue,
})
return t, nil
}
// StartRunner starts the scheduler service which adds scheduled tasks to the queue.
// This must be running in order to execute queued tasked.
// To stop the runner, cancel the context.
// This is a blocking call.
func (t *TaskClient) StartRunner(ctx context.Context) {
t.runner.Start(ctx)
}
// Register registers a queue so tasks can be added to it and processed
func (t *TaskClient) Register(queue Queue) {
t.runner.Register(queue.Name(), queue.Receive)
}
// New starts a task save operation
func (t *TaskClient) New(task Task) *TaskSaveOp {
return &TaskSaveOp{
client: t,
task: task,
}
}
// At sets the exact date and time the task should be executed
func (t *TaskSaveOp) At(processAt time.Time) *TaskSaveOp {
t.Wait(time.Until(processAt))
return t
}
// Wait instructs the task to wait a given duration before it is executed
func (t *TaskSaveOp) Wait(duration time.Duration) *TaskSaveOp {
t.wait = &duration
return t
}
// Tx will include the task as part of a given database transaction
func (t *TaskSaveOp) Tx(tx *sql.Tx) *TaskSaveOp {
t.tx = tx
return t
}
// Save saves the task, so it can be queued for execution
func (t *TaskSaveOp) Save() error {
type message struct {
Name string
Message []byte
}
// Encode the task
taskBuf := t.client.buffers.Get().(*bytes.Buffer)
if err := gob.NewEncoder(taskBuf).Encode(t.task); err != nil {
return err
}
// Wrap and encode the message
// This is needed as a workaround because goqite doesn't support delays using the jobs package,
// so we format the message the way it expects but use the queue to supply the delay
msgBuf := t.client.buffers.Get().(*bytes.Buffer)
wrapper := message{Name: t.task.Name(), Message: taskBuf.Bytes()}
if err := gob.NewEncoder(msgBuf).Encode(wrapper); err != nil {
return err
}
msg := goqite.Message{
Body: msgBuf.Bytes(),
}
if t.wait != nil {
msg.Delay = *t.wait
}
// Put the buffers back in the pool for re-use
taskBuf.Reset()
msgBuf.Reset()
t.client.buffers.Put(taskBuf)
t.client.buffers.Put(msgBuf)
if t.tx == nil {
return t.client.queue.Send(context.Background(), msg)
} else {
return t.client.queue.SendTx(context.Background(), t.tx, msg)
}
}
// NewQueue queues a new type-safe Queue of a given Task type
func NewQueue[T Task](subscriber QueueSubscriber[T]) Queue { func NewQueue[T Task](subscriber QueueSubscriber[T]) Queue {
var task T var task T
@ -72,113 +195,10 @@ func (q *queue[T]) Name() string {
func (q *queue[T]) Receive(ctx context.Context, payload []byte) error { func (q *queue[T]) Receive(ctx context.Context, payload []byte) error {
var obj T var obj T
err := json.Unmarshal(payload, &obj) err := gob.NewDecoder(bytes.NewReader(payload)).Decode(&obj)
if err != nil { if err != nil {
return err return err
} }
return q.subscriber(ctx, obj) return q.subscriber(ctx, obj)
} }
// NewTaskClient creates a new task client
func NewTaskClient(cfg *config.Config) (*TaskClient, error) {
var connection string
switch cfg.App.Environment {
case config.EnvTest:
connection = cfg.Tasks.TestConnection
default:
connection = cfg.Tasks.Connection
}
db, err := openDB(cfg.Tasks.Driver, connection)
if err != nil {
return nil, err
}
// TODO is this correct?
db.SetMaxOpenConns(cfg.Tasks.Goroutines)
db.SetMaxIdleConns(cfg.Tasks.Goroutines)
// Install the schema
if err := goqite.Setup(context.Background(), db); err != nil {
// An error is returned if we already ran this and there's no better way to check.
// You can and probably should handle this via migrations
if !strings.Contains(err.Error(), "already exists") {
return nil, err
}
}
t := &TaskClient{
queue: goqite.New(goqite.NewOpts{
DB: db,
Name: "tasks",
MaxReceive: cfg.Tasks.MaxRetries,
}),
db: db,
}
t.runner = jobs.NewRunner(jobs.NewRunnerOpts{
Limit: cfg.Tasks.Goroutines,
Log: log.Default(),
PollInterval: cfg.Tasks.PollInterval,
Queue: t.queue,
})
return t, nil
}
// Close closes the connection to the task service
func (t *TaskClient) Close() error {
// TODO close the runner
return t.db.Close()
}
// StartRunner starts the scheduler service which adds scheduled tasks to the queue
// This must be running in order to queue tasks set for periodic execution
func (t *TaskClient) StartRunner(ctx context.Context) {
t.runner.Start(ctx)
}
func (t *TaskClient) Register(queue Queue) {
t.runner.Register(queue.Name(), queue.Receive)
}
// New starts a task creation operation
func (t *TaskClient) New(task Task) *TaskOp {
return &TaskOp{
client: t,
task: task,
}
}
// At sets the exact date and time the task should be executed
func (t *TaskOp) At(processAt time.Time) *TaskOp {
t.Wait(time.Until(processAt))
return t
}
// Wait instructs the task to wait a given duration before it is executed
func (t *TaskOp) Wait(duration time.Duration) *TaskOp {
t.wait = &duration
return t
}
// Save saves the task so it can be executed
func (t *TaskOp) Save() error {
// Build the payload
payload, err := json.Marshal(t.task)
if err != nil {
return err
}
msg := goqite.Message{
Body: payload,
}
if t.wait != nil {
msg.Delay = *t.wait
}
//return t.client.queue.Send(context.Background(), msg)
return jobs.Create(context.Background(), t.client.queue, t.task.Name(), payload)
}

View file

@ -1,28 +1,69 @@
package services package services
//func TestTaskClient_New(t *testing.T) { import (
// now := time.Now() "context"
// tk := c.Tasks. "database/sql"
// New("task1"). "github.com/stretchr/testify/assert"
// Payload("payload"). "github.com/stretchr/testify/require"
// Queue("queue"). "testing"
// Periodic("@every 5s"). "time"
// MaxRetries(5). )
// Timeout(5 * time.Second).
// Deadline(now). type testTask struct {
// At(now). Val int
// Wait(6 * time.Second). }
// Retain(7 * time.Second)
// func (t testTask) Name() string {
// assert.Equal(t, "task1", tk.typ) return "test_task"
// assert.Equal(t, "payload", tk.payload.(string)) }
// assert.Equal(t, "queue", *tk.queue)
// assert.Equal(t, "@every 5s", *tk.periodic) func TestTaskClient_New(t *testing.T) {
// assert.Equal(t, 5, *tk.maxRetries) var subCalled bool
// assert.Equal(t, 5*time.Second, *tk.timeout)
// assert.Equal(t, now, *tk.deadline) queue := NewQueue[testTask](func(ctx context.Context, task testTask) error {
// assert.Equal(t, now, *tk.at) subCalled = true
// assert.Equal(t, 6*time.Second, *tk.wait) assert.Equal(t, 123, task.Val)
// assert.Equal(t, 7*time.Second, *tk.retain) return nil
// assert.NoError(t, tk.Save()) })
//} c.Tasks.Register(queue)
task := testTask{Val: 123}
tx := &sql.Tx{}
op := c.Tasks.
New(task).
Wait(5 * time.Second).
Tx(tx)
// Check that the task op was built correctly
assert.Equal(t, task, op.task)
assert.Equal(t, tx, op.tx)
assert.Equal(t, 5*time.Second, *op.wait)
// Remove the transaction and delay so we can process the task immediately
op.tx, op.wait = nil, nil
err := op.Save()
require.NoError(t, err)
// Start the runner
ctx, cancel := context.WithCancel(context.Background())
go c.Tasks.StartRunner(ctx)
defer cancel()
// Check for up to 5 seconds if the task executed
start := time.Now()
waitLoop:
for {
switch {
case subCalled:
break waitLoop
case time.Since(start) > (5 * time.Second):
break waitLoop
default:
time.Sleep(10 * time.Millisecond)
}
}
assert.True(t, subCalled)
}

View file

@ -1,9 +1,36 @@
package tasks package tasks
import (
"context"
"github.com/mikestefanello/pagoda/pkg/log"
"github.com/mikestefanello/pagoda/pkg/services"
)
// ExampleTask is an example implementation of services.Task
// This represents the task that can be queued for execution via the task client and should contain everything
// that your queue subscriber needs to process the task.
type ExampleTask struct { type ExampleTask struct {
Message string Message string
} }
// Name satisfies the services.Task interface by proviing a unique name for this Task type
func (t ExampleTask) Name() string { func (t ExampleTask) Name() string {
return "example_task" return "example_task"
} }
// NewExampleTaskQueue provides a Queue that can process ExampleTask tasks
// The service container is provided so the subscriber can have access to the app dependencies.
// All queues must be registered in the Register() function.
// Whenever an ExampleTask is added to the task client, it will be queued and eventually sent here for execution.
func NewExampleTaskQueue(c *services.Container) services.Queue {
return services.NewQueue[ExampleTask](func(ctx context.Context, task ExampleTask) error {
log.Default().Info("Example task received",
"message", task.Message,
)
log.Default().Info("This can access the container for dependencies",
"echo", c.Web.Reverse("home"),
)
return nil
})
}

10
pkg/tasks/register.go Normal file
View file

@ -0,0 +1,10 @@
package tasks
import (
"github.com/mikestefanello/pagoda/pkg/services"
)
// Register registers all task queues with the task client
func Register(c *services.Container) {
c.Tasks.Register(NewExampleTaskQueue(c))
}

View file

@ -28,6 +28,8 @@
<li>{{link (url "home") "Dashboard" .Path}}</li> <li>{{link (url "home") "Dashboard" .Path}}</li>
<li>{{link (url "about") "About" .Path}}</li> <li>{{link (url "about") "About" .Path}}</li>
<li>{{link (url "contact") "Contact" .Path}}</li> <li>{{link (url "contact") "Contact" .Path}}</li>
<li>{{link (url "cache") "Cache" .Path}}</li>
<li>{{link (url "task") "Task" .Path}}</li>
</ul> </ul>
<p class="menu-label">Account</p> <p class="menu-label">Account</p>

View file

@ -19,7 +19,7 @@
<p>Warning</p> <p>Warning</p>
</div> </div>
<div class="message-body"> <div class="message-body">
This route has caching enabled so hot-reloading in the local environment will not work. Check the Redis cache for a key matching the URL path. This route has caching enabled so hot-reloading in the local environment will not work.
</div> </div>
</article> </article>
{{- end}} {{- end}}

View file

@ -0,0 +1,36 @@
{{define "content"}}
<form id="task" method="post" hx-post="{{url "cache.submit"}}">
<article class="message">
<div class="message-header">
<p>Test the cache</p>
</div>
<div class="message-body">
This route handler shows how the default in-memory cache works. Try updating the value using the form below and see how it persists after you reload the page.
HTMX makes it easy to re-render the cached value after the form is submitted.
</div>
</article>
<label for="value" class="label">Value in cache: </label>
{{if .Data}}
<span class="tag is-success">{{.Data}}</span>
{{- else}}
<i>(empty)</i>
{{- end}}
<br/><br/>
<div class="field">
<label for="value" class="label">Value</label>
<div class="control">
<input id="value" name="value" class="input" value="{{.Form.Value}}"/>
</div>
</div>
<div class="field is-grouped">
<div class="control">
<button class="button is-link">Update cache</button>
</div>
</div>
{{template "csrf" .}}
</form>
{{end}}

View file

@ -0,0 +1,43 @@
{{define "content"}}
{{- if not (eq .HTMX.Request.Target "task")}}
<article class="message is-link">
<div class="message-body">
<p>Submitting this form will create an <i>ExampleTask</i> in the task queue. After the specified delay, the message will be logged by the queue processor.</p>
<p>See pkg/tasks and the README for more information.</p>
</div>
</article>
{{- end}}
{{template "form" .}}
{{end}}
{{define "form"}}
<form id="task" method="post" hx-post="{{url "task.submit"}}">
{{template "messages" .}}
<div class="field">
<label for="delay" class="label">Delay (in seconds)</label>
<div class="control">
<input type="number" id="delay" name="delay" class="input {{.Form.GetFieldStatusClass "Delay"}}" value="{{.Form.Delay}}"/>
</div>
<p class="help">How long to wait until the task is executed</p>
{{template "field-errors" (.Form.GetFieldErrors "Delay")}}
</div>
<div class="field">
<label for="message" class="label">Message</label>
<div class="control">
<textarea id="message" name="message" class="textarea {{.Form.GetFieldStatusClass "Message"}}">{{.Form.Message}}</textarea>
</div>
<p class="help">The message the task will output to the log</p>
{{template "field-errors" (.Form.GetFieldErrors "Message")}}
</div>
<div class="field is-grouped">
<div class="control">
<button class="button is-link">Add task to queue</button>
</div>
</div>
{{template "csrf" .}}
</form>
{{end}}

View file

@ -22,6 +22,7 @@ const (
const ( const (
PageAbout Page = "about" PageAbout Page = "about"
PageCache Page = "cache"
PageContact Page = "contact" PageContact Page = "contact"
PageError Page = "error" PageError Page = "error"
PageForgotPassword Page = "forgot-password" PageForgotPassword Page = "forgot-password"
@ -30,6 +31,7 @@ const (
PageRegister Page = "register" PageRegister Page = "register"
PageResetPassword Page = "reset-password" PageResetPassword Page = "reset-password"
PageSearch Page = "search" PageSearch Page = "search"
PageTask Page = "task"
) )
//go:embed * //go:embed *
@ -41,7 +43,7 @@ func Get() embed.FS {
} }
// GetOS returns a file system containing all templates which will load the files directly from the operating system. // GetOS returns a file system containing all templates which will load the files directly from the operating system.
// This should only be used for local development in order to faciliate live reloading. // This should only be used for local development in order to facilitate live reloading.
func GetOS() fs.FS { func GetOS() fs.FS {
// Gets the complete templates directory path // Gets the complete templates directory path
// This is needed in case this is called from a package outside of main, such as within tests // This is needed in case this is called from a package outside of main, such as within tests