Default to SQLite rather than Postgres & Redis (#72)

* Initial rough draft switch to sqlite.

* Rewrote cache implemenation.

* Provide typed tasks.

* Task cleanup.

* Use same db for tasks.

* Provide task queue registration and service container injection.

* Added optional delay to tasks. Pool buffers when encoding.

* Added tests for the task client and runner.

* Added handler examples for caching and tasks.

* Cleanup and documentation.

* Use make in workflow.

* Updated documentation.

* Updated documentation.
This commit is contained in:
Mike Stefanello 2024-06-22 10:34:26 -04:00 committed by GitHub
parent 5e9e502b42
commit a096abd195
29 changed files with 956 additions and 910 deletions

View file

@ -103,6 +103,8 @@ func (f *Submission) setErrorMessages(err error) {
message = "Enter a valid email address."
case "eqfield":
message = "Does not match."
case "gte":
message = fmt.Sprintf("Must be greater than or equal to %v.", ve.Param())
default:
message = "Invalid value."
}

92
pkg/handlers/cache.go Normal file
View file

@ -0,0 +1,92 @@
package handlers
import (
"errors"
"github.com/labstack/echo/v4"
"github.com/mikestefanello/pagoda/pkg/form"
"github.com/mikestefanello/pagoda/pkg/page"
"github.com/mikestefanello/pagoda/pkg/services"
"github.com/mikestefanello/pagoda/templates"
"time"
)
const (
routeNameCache = "cache"
routeNameCacheSubmit = "cache.submit"
)
type (
Cache struct {
cache *services.CacheClient
*services.TemplateRenderer
}
cacheForm struct {
Value string `form:"value"`
form.Submission
}
)
func init() {
Register(new(Cache))
}
func (h *Cache) Init(c *services.Container) error {
h.TemplateRenderer = c.TemplateRenderer
h.cache = c.Cache
return nil
}
func (h *Cache) Routes(g *echo.Group) {
g.GET("/cache", h.Page).Name = routeNameCache
g.POST("/cache", h.Submit).Name = routeNameCacheSubmit
}
func (h *Cache) Page(ctx echo.Context) error {
p := page.New(ctx)
p.Layout = templates.LayoutMain
p.Name = templates.PageCache
p.Title = "Set a cache entry"
p.Form = form.Get[cacheForm](ctx)
// Fetch the value from the cache
value, err := h.cache.
Get().
Key("page_cache_example").
Fetch(ctx.Request().Context())
// Store the value in the page, so it can be rendered, if found
switch {
case err == nil:
p.Data = value.(string)
case errors.Is(err, services.ErrCacheMiss):
default:
return fail(err, "failed to fetch from cache")
}
return h.RenderPage(ctx, p)
}
func (h *Cache) Submit(ctx echo.Context) error {
var input cacheForm
if err := form.Submit(ctx, &input); err != nil {
return err
}
// Set the cache
err := h.cache.
Set().
Key("page_cache_example").
Data(input.Value).
Expiration(30 * time.Minute).
Save(ctx.Request().Context())
if err != nil {
return fail(err, "unable to set cache")
}
form.Clear(ctx)
return h.Page(ctx)
}

View file

@ -2,7 +2,6 @@ package handlers
import (
"fmt"
"github.com/go-playground/validator/v10"
"github.com/labstack/echo/v4"
"github.com/mikestefanello/pagoda/pkg/form"

88
pkg/handlers/task.go Normal file
View file

@ -0,0 +1,88 @@
package handlers
import (
"fmt"
"github.com/mikestefanello/pagoda/pkg/msg"
"time"
"github.com/go-playground/validator/v10"
"github.com/labstack/echo/v4"
"github.com/mikestefanello/pagoda/pkg/form"
"github.com/mikestefanello/pagoda/pkg/page"
"github.com/mikestefanello/pagoda/pkg/services"
"github.com/mikestefanello/pagoda/pkg/tasks"
"github.com/mikestefanello/pagoda/templates"
)
const (
routeNameTask = "task"
routeNameTaskSubmit = "task.submit"
)
type (
Task struct {
tasks *services.TaskClient
*services.TemplateRenderer
}
taskForm struct {
Delay int `form:"delay" validate:"gte=0"`
Message string `form:"message" validate:"required"`
form.Submission
}
)
func init() {
Register(new(Task))
}
func (h *Task) Init(c *services.Container) error {
h.TemplateRenderer = c.TemplateRenderer
h.tasks = c.Tasks
return nil
}
func (h *Task) Routes(g *echo.Group) {
g.GET("/task", h.Page).Name = routeNameTask
g.POST("/task", h.Submit).Name = routeNameTaskSubmit
}
func (h *Task) Page(ctx echo.Context) error {
p := page.New(ctx)
p.Layout = templates.LayoutMain
p.Name = templates.PageTask
p.Title = "Create a task"
p.Form = form.Get[taskForm](ctx)
return h.RenderPage(ctx, p)
}
func (h *Task) Submit(ctx echo.Context) error {
var input taskForm
err := form.Submit(ctx, &input)
switch err.(type) {
case nil:
case validator.ValidationErrors:
return h.Page(ctx)
default:
return err
}
// Insert the task
err = h.tasks.New(tasks.ExampleTask{
Message: input.Message,
}).
Wait(time.Duration(input.Delay) * time.Second).
Save()
if err != nil {
return fail(err, "unable to create a task")
}
msg.Success(ctx, fmt.Sprintf("The task has been created. Check the logs in %d seconds.", input.Delay))
form.Clear(ctx)
return h.Page(ctx)
}

View file

@ -10,7 +10,6 @@ import (
"github.com/mikestefanello/pagoda/pkg/log"
"github.com/mikestefanello/pagoda/pkg/services"
libstore "github.com/eko/gocache/lib/v4/store"
"github.com/labstack/echo/v4"
)
@ -35,7 +34,7 @@ func ServeCachedPage(t *services.TemplateRenderer) echo.MiddlewareFunc {
if err != nil {
switch {
case errors.Is(err, &libstore.NotFound{}):
case errors.Is(err, services.ErrCacheMiss):
case context.IsCanceledError(err):
return nil
default:

View file

@ -4,28 +4,39 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/eko/gocache/lib/v4/cache"
"github.com/eko/gocache/lib/v4/marshaler"
libstore "github.com/eko/gocache/lib/v4/store"
redisstore "github.com/eko/gocache/store/redis/v4"
"github.com/mikestefanello/pagoda/config"
"github.com/redis/go-redis/v9"
"github.com/maypok86/otter"
)
type (
// CacheClient is the client that allows you to interact with the cache
CacheClient struct {
// Client stores the client to the underlying cache service
Client *redis.Client
// ErrCacheMiss indicates that the requested key does not exist in the cache
var ErrCacheMiss = errors.New("cache miss")
// cache stores the cache interface
cache *cache.Cache[any]
type (
// CacheStore provides an interface for cache storage
CacheStore interface {
// get attempts to get a cached value
get(context.Context, *CacheGetOp) (any, error)
// set attempts to set an entry in the cache
set(context.Context, *CacheSetOp) error
// flush removes a given key and/or tags from the cache
flush(context.Context, *CacheFlushOp) error
// close shuts down the cache storage
close()
}
// cacheSet handles chaining a set operation
cacheSet struct {
// CacheClient is the client that allows you to interact with the cache
CacheClient struct {
// store holds the Cache storage
store CacheStore
}
// CacheSetOp handles chaining a set operation
CacheSetOp struct {
client *CacheClient
key string
group string
@ -34,76 +45,69 @@ type (
tags []string
}
// cacheGet handles chaining a get operation
cacheGet struct {
client *CacheClient
key string
group string
dataType any
// CacheGetOp handles chaining a get operation
CacheGetOp struct {
client *CacheClient
key string
group string
}
// cacheFlush handles chaining a flush operation
cacheFlush struct {
// CacheFlushOp handles chaining a flush operation
CacheFlushOp struct {
client *CacheClient
key string
group string
tags []string
}
// inMemoryCacheStore is a cache store implementation in memory
inMemoryCacheStore struct {
store *otter.CacheWithVariableTTL[string, any]
tagIndex *tagIndex
}
// tagIndex maintains an index to support cache tags for in-memory cache stores.
// There is a performance and memory impact to using cache tags since set and get operations using tags will require
// locking, and we need to keep track of this index in order to keep everything in sync.
// If using something like Redis for caching, you can leverage sets to store the index.
// Cache tags can be useful and convenient, so you should decide if your app benefits enough from this.
// As it stands here, there is no limiting how much memory this will consume and it will track all keys
// and tags added and removed from the cache. You could store these in the cache itself but allowing these to
// be evicted poses challenges.
tagIndex struct {
sync.Mutex
tags map[string]map[string]struct{} // tag->keys
keys map[string]map[string]struct{} // key->tags
}
)
// NewCacheClient creates a new cache client
func NewCacheClient(cfg *config.Config) (*CacheClient, error) {
// Determine the database based on the environment
db := cfg.Cache.Database
if cfg.App.Environment == config.EnvTest {
db = cfg.Cache.TestDatabase
}
// Connect to the cache
c := &CacheClient{}
c.Client = redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", cfg.Cache.Hostname, cfg.Cache.Port),
Password: cfg.Cache.Password,
DB: db,
})
if _, err := c.Client.Ping(context.Background()).Result(); err != nil {
return c, err
}
// Flush the database if this is the test environment
if cfg.App.Environment == config.EnvTest {
if err := c.Client.FlushDB(context.Background()).Err(); err != nil {
return c, err
}
}
cacheStore := redisstore.NewRedis(c.Client)
c.cache = cache.New[any](cacheStore)
return c, nil
func NewCacheClient(store CacheStore) *CacheClient {
return &CacheClient{store: store}
}
// Close closes the connection to the cache
func (c *CacheClient) Close() error {
return c.Client.Close()
func (c *CacheClient) Close() {
c.store.close()
}
// Set creates a cache set operation
func (c *CacheClient) Set() *cacheSet {
return &cacheSet{
func (c *CacheClient) Set() *CacheSetOp {
return &CacheSetOp{
client: c,
}
}
// Get creates a cache get operation
func (c *CacheClient) Get() *cacheGet {
return &cacheGet{
func (c *CacheClient) Get() *CacheGetOp {
return &CacheGetOp{
client: c,
}
}
// Flush creates a cache flush operation
func (c *CacheClient) Flush() *cacheFlush {
return &cacheFlush{
func (c *CacheClient) Flush() *CacheFlushOp {
return &CacheFlushOp{
client: c,
}
}
@ -117,111 +121,231 @@ func (c *CacheClient) cacheKey(group, key string) string {
}
// Key sets the cache key
func (c *cacheSet) Key(key string) *cacheSet {
func (c *CacheSetOp) Key(key string) *CacheSetOp {
c.key = key
return c
}
// Group sets the cache group
func (c *cacheSet) Group(group string) *cacheSet {
func (c *CacheSetOp) Group(group string) *CacheSetOp {
c.group = group
return c
}
// Data sets the data to cache
func (c *cacheSet) Data(data any) *cacheSet {
func (c *CacheSetOp) Data(data any) *CacheSetOp {
c.data = data
return c
}
// Expiration sets the expiration duration of the cached data
func (c *cacheSet) Expiration(expiration time.Duration) *cacheSet {
func (c *CacheSetOp) Expiration(expiration time.Duration) *CacheSetOp {
c.expiration = expiration
return c
}
// Tags sets the cache tags
func (c *cacheSet) Tags(tags ...string) *cacheSet {
func (c *CacheSetOp) Tags(tags ...string) *CacheSetOp {
c.tags = tags
return c
}
// Save saves the data in the cache
func (c *cacheSet) Save(ctx context.Context) error {
if c.key == "" {
func (c *CacheSetOp) Save(ctx context.Context) error {
switch {
case c.key == "":
return errors.New("no cache key specified")
case c.data == nil:
return errors.New("no cache data specified")
case c.expiration == 0:
return errors.New("no cache expiration specified")
}
opts := []libstore.Option{
libstore.WithExpiration(c.expiration),
libstore.WithTags(c.tags),
}
return marshaler.
New(c.client.cache).
Set(ctx, c.client.cacheKey(c.group, c.key), c.data, opts...)
return c.client.store.set(ctx, c)
}
// Key sets the cache key
func (c *cacheGet) Key(key string) *cacheGet {
func (c *CacheGetOp) Key(key string) *CacheGetOp {
c.key = key
return c
}
// Group sets the cache group
func (c *cacheGet) Group(group string) *cacheGet {
func (c *CacheGetOp) Group(group string) *CacheGetOp {
c.group = group
return c
}
// Type sets the expected Go type of the data being retrieved from the cache
func (c *cacheGet) Type(expectedType any) *cacheGet {
c.dataType = expectedType
return c
}
// Fetch fetches the data from the cache
func (c *cacheGet) Fetch(ctx context.Context) (any, error) {
func (c *CacheGetOp) Fetch(ctx context.Context) (any, error) {
if c.key == "" {
return nil, errors.New("no cache key specified")
}
return marshaler.New(c.client.cache).Get(
ctx,
c.client.cacheKey(c.group, c.key),
c.dataType,
)
return c.client.store.get(ctx, c)
}
// Key sets the cache key
func (c *cacheFlush) Key(key string) *cacheFlush {
func (c *CacheFlushOp) Key(key string) *CacheFlushOp {
c.key = key
return c
}
// Group sets the cache group
func (c *cacheFlush) Group(group string) *cacheFlush {
func (c *CacheFlushOp) Group(group string) *CacheFlushOp {
c.group = group
return c
}
// Tags sets the cache tags
func (c *cacheFlush) Tags(tags ...string) *cacheFlush {
func (c *CacheFlushOp) Tags(tags ...string) *CacheFlushOp {
c.tags = tags
return c
}
// Execute flushes the data from the cache
func (c *cacheFlush) Execute(ctx context.Context) error {
if len(c.tags) > 0 {
if err := c.client.cache.Invalidate(ctx, libstore.WithInvalidateTags(c.tags)); err != nil {
return err
}
func (c *CacheFlushOp) Execute(ctx context.Context) error {
return c.client.store.flush(ctx, c)
}
// newInMemoryCache creates a new in-memory CacheStore
func newInMemoryCache(capacity int) (CacheStore, error) {
s := &inMemoryCacheStore{
tagIndex: newTagIndex(),
}
if c.key != "" {
return c.client.cache.Delete(ctx, c.client.cacheKey(c.group, c.key))
store, err := otter.MustBuilder[string, any](capacity).
WithVariableTTL().
DeletionListener(func(key string, value any, cause otter.DeletionCause) {
s.tagIndex.purgeKeys(key)
}).
Build()
if err != nil {
return nil, err
}
s.store = &store
return s, nil
}
func (s *inMemoryCacheStore) get(_ context.Context, op *CacheGetOp) (any, error) {
v, exists := s.store.Get(op.client.cacheKey(op.group, op.key))
if !exists {
return nil, ErrCacheMiss
}
return v, nil
}
func (s *inMemoryCacheStore) set(_ context.Context, op *CacheSetOp) error {
key := op.client.cacheKey(op.group, op.key)
added := s.store.Set(
key,
op.data,
op.expiration,
)
if len(op.tags) > 0 {
s.tagIndex.setTags(key, op.tags...)
}
if !added {
return errors.New("cache set failed")
}
return nil
}
func (s *inMemoryCacheStore) flush(_ context.Context, op *CacheFlushOp) error {
keys := make([]string, 0)
if key := op.client.cacheKey(op.group, op.key); key != "" {
keys = append(keys, key)
}
if len(op.tags) > 0 {
keys = append(keys, s.tagIndex.purgeTags(op.tags...)...)
}
for _, key := range keys {
s.store.Delete(key)
}
s.tagIndex.purgeKeys(keys...)
return nil
}
func (s *inMemoryCacheStore) close() {
s.store.Close()
}
func newTagIndex() *tagIndex {
return &tagIndex{
tags: make(map[string]map[string]struct{}),
keys: make(map[string]map[string]struct{}),
}
}
func (i *tagIndex) setTags(key string, tags ...string) {
i.Lock()
defer i.Unlock()
if _, exists := i.keys[key]; !exists {
i.keys[key] = make(map[string]struct{})
}
for _, tag := range tags {
if _, exists := i.tags[tag]; !exists {
i.tags[tag] = make(map[string]struct{})
}
i.tags[tag][key] = struct{}{}
i.keys[key][tag] = struct{}{}
}
}
func (i *tagIndex) purgeTags(tags ...string) []string {
i.Lock()
defer i.Unlock()
keys := make([]string, 0)
for _, tag := range tags {
if tagKeys, exists := i.tags[tag]; exists {
delete(i.tags, tag)
for key := range tagKeys {
delete(i.keys[key], tag)
if len(i.keys[key]) == 0 {
delete(i.keys, key)
}
keys = append(keys, key)
}
}
}
return keys
}
func (i *tagIndex) purgeKeys(keys ...string) {
i.Lock()
defer i.Unlock()
for _, key := range keys {
if keyTags, exists := i.keys[key]; exists {
delete(i.keys, key)
for tag := range keyTags {
delete(i.tags[tag], key)
if len(i.tags[tag]) == 0 {
delete(i.tags, tag)
}
}
}
}
}

View file

@ -2,11 +2,9 @@ package services
import (
"context"
"errors"
"testing"
"time"
libstore "github.com/eko/gocache/lib/v4/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -15,6 +13,7 @@ func TestCacheClient(t *testing.T) {
type cacheTest struct {
Value string
}
// Cache some data
data := cacheTest{Value: "abcdef"}
group := "testgroup"
@ -24,6 +23,7 @@ func TestCacheClient(t *testing.T) {
Group(group).
Key(key).
Data(data).
Expiration(500 * time.Millisecond).
Save(context.Background())
require.NoError(t, err)
@ -32,20 +32,18 @@ func TestCacheClient(t *testing.T) {
Get().
Group(group).
Key(key).
Type(new(cacheTest)).
Fetch(context.Background())
require.NoError(t, err)
cast, ok := fromCache.(*cacheTest)
cast, ok := fromCache.(cacheTest)
require.True(t, ok)
assert.Equal(t, data, *cast)
assert.Equal(t, data, cast)
// The same key with the wrong group should fail
_, err = c.Cache.
Get().
Key(key).
Type(new(cacheTest)).
Fetch(context.Background())
assert.Error(t, err)
assert.Equal(t, ErrCacheMiss, err)
// Flush the data
err = c.Cache.
@ -56,29 +54,42 @@ func TestCacheClient(t *testing.T) {
require.NoError(t, err)
// The data should be gone
assertFlushed := func() {
assertFlushed := func(key string) {
// The data should be gone
_, err = c.Cache.
Get().
Group(group).
Key(key).
Type(new(cacheTest)).
Fetch(context.Background())
assert.True(t, errors.Is(err, &libstore.NotFound{}))
assert.Equal(t, ErrCacheMiss, err)
}
assertFlushed()
assertFlushed(key)
// Set with tags
key = "testkey2"
err = c.Cache.
Set().
Group(group).
Key(key).
Data(data).
Tags("tag1").
Tags("tag1", "tag2").
Expiration(time.Hour).
Save(context.Background())
require.NoError(t, err)
// Flush the tag
// Check the tag index
index := c.Cache.store.(*inMemoryCacheStore).tagIndex
gk := c.Cache.cacheKey(group, key)
_, exists := index.tags["tag1"][gk]
assert.True(t, exists)
_, exists = index.tags["tag2"][gk]
assert.True(t, exists)
_, exists = index.keys[gk]["tag1"]
assert.True(t, exists)
_, exists = index.keys[gk]["tag2"]
assert.True(t, exists)
// Flush one of tags
err = c.Cache.
Flush().
Tags("tag1").
@ -86,21 +97,9 @@ func TestCacheClient(t *testing.T) {
require.NoError(t, err)
// The data should be gone
assertFlushed()
assertFlushed(key)
// Set with expiration
err = c.Cache.
Set().
Group(group).
Key(key).
Data(data).
Expiration(time.Millisecond).
Save(context.Background())
require.NoError(t, err)
// Wait for expiration
time.Sleep(time.Millisecond * 2)
// The data should be gone
assertFlushed()
// The index should be empty
assert.Empty(t, index.tags)
assert.Empty(t, index.keys)
}

View file

@ -5,14 +5,13 @@ import (
"database/sql"
"fmt"
"log/slog"
"os"
"strings"
"entgo.io/ent/dialect"
entsql "entgo.io/ent/dialect/sql"
"entgo.io/ent/dialect/sql/schema"
_ "github.com/mattn/go-sqlite3"
"github.com/mikestefanello/pagoda/pkg/funcmap"
// Required by ent
_ "github.com/jackc/pgx/v4/stdlib"
"github.com/labstack/echo/v4"
"github.com/mikestefanello/pagoda/config"
"github.com/mikestefanello/pagoda/ent"
@ -71,20 +70,16 @@ func NewContainer() *Container {
return c
}
// Shutdown shuts the Container down and disconnects all connections
// Shutdown shuts the Container down and disconnects all connections.
// If the task runner was started, cancel the context to shut it down prior to calling this.
func (c *Container) Shutdown() error {
if err := c.Tasks.Close(); err != nil {
return err
}
if err := c.Cache.Close(); err != nil {
return err
}
if err := c.ORM.Close(); err != nil {
return err
}
if err := c.Database.Close(); err != nil {
return err
}
c.Cache.Close()
return nil
}
@ -120,59 +115,41 @@ func (c *Container) initWeb() {
// initCache initializes the cache
func (c *Container) initCache() {
var err error
if c.Cache, err = NewCacheClient(c.Config); err != nil {
store, err := newInMemoryCache(c.Config.Cache.Capacity)
if err != nil {
panic(err)
}
c.Cache = NewCacheClient(store)
}
// initDatabase initializes the database
// If the environment is set to test, the test database will be used and will be dropped, recreated and migrated
func (c *Container) initDatabase() {
var err error
var connection string
getAddr := func(dbName string) string {
return fmt.Sprintf("postgresql://%s:%s@%s:%d/%s",
c.Config.Database.User,
c.Config.Database.Password,
c.Config.Database.Hostname,
c.Config.Database.Port,
dbName,
)
switch c.Config.App.Environment {
case config.EnvTest:
// TODO: Drop/recreate the DB, if this isn't in memory?
connection = c.Config.Database.TestConnection
default:
connection = c.Config.Database.Connection
}
c.Database, err = sql.Open("pgx", getAddr(c.Config.Database.Database))
c.Database, err = openDB(c.Config.Database.Driver, connection)
if err != nil {
panic(fmt.Sprintf("failed to connect to database: %v", err))
}
// Check if this is a test environment
if c.Config.App.Environment == config.EnvTest {
// Drop the test database, ignoring errors in case it doesn't yet exist
_, _ = c.Database.Exec("DROP DATABASE " + c.Config.Database.TestDatabase)
// Create the test database
if _, err = c.Database.Exec("CREATE DATABASE " + c.Config.Database.TestDatabase); err != nil {
panic(fmt.Sprintf("failed to create test database: %v", err))
}
// Connect to the test database
if err = c.Database.Close(); err != nil {
panic(fmt.Sprintf("failed to close database connection: %v", err))
}
c.Database, err = sql.Open("pgx", getAddr(c.Config.Database.TestDatabase))
if err != nil {
panic(fmt.Sprintf("failed to connect to database: %v", err))
}
panic(err)
}
}
// initORM initializes the ORM
func (c *Container) initORM() {
drv := entsql.OpenDB(dialect.Postgres, c.Database)
drv := entsql.OpenDB(c.Config.Database.Driver, c.Database)
c.ORM = ent.NewClient(ent.Driver(drv))
if err := c.ORM.Schema.Create(context.Background(), schema.WithAtlas(true)); err != nil {
panic(fmt.Sprintf("failed to create database schema: %v", err))
// Run the auto migration tool.
if err := c.ORM.Schema.Create(context.Background()); err != nil {
panic(err)
}
}
@ -197,5 +174,30 @@ func (c *Container) initMail() {
// initTasks initializes the task client
func (c *Container) initTasks() {
c.Tasks = NewTaskClient(c.Config)
var err error
// You could use a separate database for tasks, if you'd like. but using one
// makes transaction support easier
c.Tasks, err = NewTaskClient(c.Config.Tasks, c.Database)
if err != nil {
panic(fmt.Sprintf("failed to create task client: %v", err))
}
}
// openDB opens a database connection
func openDB(driver, connection string) (*sql.DB, error) {
// Helper to automatically create the directories that the specified sqlite file
// should reside in, if one
if driver == "sqlite3" {
d := strings.Split(connection, "/")
if len(d) > 1 {
path := strings.Join(d[:len(d)-1], "/")
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
}
}
}
return sql.Open(driver, connection)
}

View file

@ -1,179 +1,204 @@
package services
import (
"encoding/json"
"fmt"
"bytes"
"context"
"database/sql"
"encoding/gob"
"strings"
"sync"
"time"
"github.com/hibiken/asynq"
"github.com/maragudk/goqite"
"github.com/maragudk/goqite/jobs"
"github.com/mikestefanello/pagoda/config"
"github.com/mikestefanello/pagoda/pkg/log"
)
type (
// TaskClient is that client that allows you to queue or schedule task execution
// TaskClient is that client that allows you to queue or schedule task execution.
// Under the hood we create only a single queue using goqite for all tasks because we do not want more than one
// runner to process the tasks. The TaskClient wrapper provides abstractions for separate, type-safe queues.
TaskClient struct {
// client stores the asynq client
client *asynq.Client
// scheduler stores the asynq scheduler
scheduler *asynq.Scheduler
queue *goqite.Queue
runner *jobs.Runner
buffers sync.Pool
}
// task handles task creation operations
task struct {
client *TaskClient
typ string
payload any
periodic *string
queue *string
maxRetries *int
timeout *time.Duration
deadline *time.Time
at *time.Time
wait *time.Duration
retain *time.Duration
// Task is a job that can be added to a queue and later passed to and executed by a QueueSubscriber.
// See pkg/tasks for an example of how this can be used with a queue.
Task interface {
Name() string
}
// TaskSaveOp handles task save operations
TaskSaveOp struct {
client *TaskClient
task Task
tx *sql.Tx
at *time.Time
wait *time.Duration
}
// Queue is a queue that a Task can be pushed to for execution.
// While this can be implemented directly, it's recommended to use NewQueue() which uses generics in
// order to provide type-safe queues and queue subscriber callbacks for task execution.
Queue interface {
// Name returns the name of the task this queue processes
Name() string
// Receive receives the Task payload to be processed
Receive(ctx context.Context, payload []byte) error
}
// queue provides a type-safe implementation of Queue
queue[T Task] struct {
name string
subscriber QueueSubscriber[T]
}
// QueueSubscriber is a generic subscriber callback for a given queue to process Tasks
QueueSubscriber[T Task] func(context.Context, T) error
)
// NewTaskClient creates a new task client
func NewTaskClient(cfg *config.Config) *TaskClient {
// Determine the database based on the environment
db := cfg.Cache.Database
if cfg.App.Environment == config.EnvTest {
db = cfg.Cache.TestDatabase
func NewTaskClient(cfg config.TasksConfig, db *sql.DB) (*TaskClient, error) {
// Install the schema
if err := goqite.Setup(context.Background(), db); err != nil {
// An error is returned if we already ran this and there's no better way to check.
// You can and probably should handle this via migrations
if !strings.Contains(err.Error(), "already exists") {
return nil, err
}
}
conn := asynq.RedisClientOpt{
Addr: fmt.Sprintf("%s:%d", cfg.Cache.Hostname, cfg.Cache.Port),
Password: cfg.Cache.Password,
DB: db,
t := &TaskClient{
queue: goqite.New(goqite.NewOpts{
DB: db,
Name: "tasks",
MaxReceive: cfg.MaxRetries,
}),
buffers: sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
},
}
return &TaskClient{
client: asynq.NewClient(conn),
scheduler: asynq.NewScheduler(conn, nil),
}
t.runner = jobs.NewRunner(jobs.NewRunnerOpts{
Limit: cfg.Goroutines,
Log: log.Default(),
PollInterval: cfg.PollInterval,
Queue: t.queue,
})
return t, nil
}
// Close closes the connection to the task service
func (t *TaskClient) Close() error {
return t.client.Close()
// StartRunner starts the scheduler service which adds scheduled tasks to the queue.
// This must be running in order to execute queued tasked.
// To stop the runner, cancel the context.
// This is a blocking call.
func (t *TaskClient) StartRunner(ctx context.Context) {
t.runner.Start(ctx)
}
// StartScheduler starts the scheduler service which adds scheduled tasks to the queue
// This must be running in order to queue tasks set for periodic execution
func (t *TaskClient) StartScheduler() error {
return t.scheduler.Run()
// Register registers a queue so tasks can be added to it and processed
func (t *TaskClient) Register(queue Queue) {
t.runner.Register(queue.Name(), queue.Receive)
}
// New starts a task creation operation
func (t *TaskClient) New(typ string) *task {
return &task{
// New starts a task save operation
func (t *TaskClient) New(task Task) *TaskSaveOp {
return &TaskSaveOp{
client: t,
typ: typ,
task: task,
}
}
// Payload sets the task payload data which will be sent to the task handler
func (t *task) Payload(payload any) *task {
t.payload = payload
return t
}
// Periodic sets the task to execute periodically according to a given interval
// The interval can be either in cron form ("*/5 * * * *") or "@every 30s"
func (t *task) Periodic(interval string) *task {
t.periodic = &interval
return t
}
// Queue specifies the name of the queue to add the task to
// The default queue will be used if this is not set
func (t *task) Queue(queue string) *task {
t.queue = &queue
return t
}
// Timeout sets the task timeout, meaning the task must execute within a given duration
func (t *task) Timeout(timeout time.Duration) *task {
t.timeout = &timeout
return t
}
// Deadline sets the task execution deadline to a specific date and time
func (t *task) Deadline(deadline time.Time) *task {
t.deadline = &deadline
return t
}
// At sets the exact date and time the task should be executed
func (t *task) At(processAt time.Time) *task {
t.at = &processAt
func (t *TaskSaveOp) At(processAt time.Time) *TaskSaveOp {
t.Wait(time.Until(processAt))
return t
}
// Wait instructs the task to wait a given duration before it is executed
func (t *task) Wait(duration time.Duration) *task {
func (t *TaskSaveOp) Wait(duration time.Duration) *TaskSaveOp {
t.wait = &duration
return t
}
// Retain instructs the task service to retain the task data for a given duration after execution is complete
func (t *task) Retain(duration time.Duration) *task {
t.retain = &duration
// Tx will include the task as part of a given database transaction
func (t *TaskSaveOp) Tx(tx *sql.Tx) *TaskSaveOp {
t.tx = tx
return t
}
// MaxRetries sets the maximum amount of times to retry executing the task in the event of a failure
func (t *task) MaxRetries(retries int) *task {
t.maxRetries = &retries
return t
}
// Save saves the task so it can be executed
func (t *task) Save() error {
var err error
// Build the payload
var payload []byte
if t.payload != nil {
if payload, err = json.Marshal(t.payload); err != nil {
return err
}
// Save saves the task, so it can be queued for execution
func (t *TaskSaveOp) Save() error {
type message struct {
Name string
Message []byte
}
// Build the task options
opts := make([]asynq.Option, 0)
if t.queue != nil {
opts = append(opts, asynq.Queue(*t.queue))
// Encode the task
taskBuf := t.client.buffers.Get().(*bytes.Buffer)
if err := gob.NewEncoder(taskBuf).Encode(t.task); err != nil {
return err
}
if t.maxRetries != nil {
opts = append(opts, asynq.MaxRetry(*t.maxRetries))
// Wrap and encode the message
// This is needed as a workaround because goqite doesn't support delays using the jobs package,
// so we format the message the way it expects but use the queue to supply the delay
msgBuf := t.client.buffers.Get().(*bytes.Buffer)
wrapper := message{Name: t.task.Name(), Message: taskBuf.Bytes()}
if err := gob.NewEncoder(msgBuf).Encode(wrapper); err != nil {
return err
}
if t.timeout != nil {
opts = append(opts, asynq.Timeout(*t.timeout))
}
if t.deadline != nil {
opts = append(opts, asynq.Deadline(*t.deadline))
msg := goqite.Message{
Body: msgBuf.Bytes(),
}
if t.wait != nil {
opts = append(opts, asynq.ProcessIn(*t.wait))
}
if t.retain != nil {
opts = append(opts, asynq.Retention(*t.retain))
}
if t.at != nil {
opts = append(opts, asynq.ProcessAt(*t.at))
msg.Delay = *t.wait
}
// Build the task
task := asynq.NewTask(t.typ, payload, opts...)
// Put the buffers back in the pool for re-use
taskBuf.Reset()
msgBuf.Reset()
t.client.buffers.Put(taskBuf)
t.client.buffers.Put(msgBuf)
// Schedule, if needed
if t.periodic != nil {
_, err = t.client.scheduler.Register(*t.periodic, task)
if t.tx == nil {
return t.client.queue.Send(context.Background(), msg)
} else {
_, err = t.client.client.Enqueue(task)
return t.client.queue.SendTx(context.Background(), t.tx, msg)
}
return err
}
// NewQueue queues a new type-safe Queue of a given Task type
func NewQueue[T Task](subscriber QueueSubscriber[T]) Queue {
var task T
q := &queue[T]{
name: task.Name(),
subscriber: subscriber,
}
return q
}
func (q *queue[T]) Name() string {
return q.name
}
func (q *queue[T]) Receive(ctx context.Context, payload []byte) error {
var obj T
err := gob.NewDecoder(bytes.NewReader(payload)).Decode(&obj)
if err != nil {
return err
}
return q.subscriber(ctx, obj)
}

View file

@ -1,35 +1,69 @@
package services
import (
"context"
"database/sql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestTaskClient_New(t *testing.T) {
now := time.Now()
tk := c.Tasks.
New("task1").
Payload("payload").
Queue("queue").
Periodic("@every 5s").
MaxRetries(5).
Timeout(5 * time.Second).
Deadline(now).
At(now).
Wait(6 * time.Second).
Retain(7 * time.Second)
assert.Equal(t, "task1", tk.typ)
assert.Equal(t, "payload", tk.payload.(string))
assert.Equal(t, "queue", *tk.queue)
assert.Equal(t, "@every 5s", *tk.periodic)
assert.Equal(t, 5, *tk.maxRetries)
assert.Equal(t, 5*time.Second, *tk.timeout)
assert.Equal(t, now, *tk.deadline)
assert.Equal(t, now, *tk.at)
assert.Equal(t, 6*time.Second, *tk.wait)
assert.Equal(t, 7*time.Second, *tk.retain)
assert.NoError(t, tk.Save())
type testTask struct {
Val int
}
func (t testTask) Name() string {
return "test_task"
}
func TestTaskClient_New(t *testing.T) {
var subCalled bool
queue := NewQueue[testTask](func(ctx context.Context, task testTask) error {
subCalled = true
assert.Equal(t, 123, task.Val)
return nil
})
c.Tasks.Register(queue)
task := testTask{Val: 123}
tx := &sql.Tx{}
op := c.Tasks.
New(task).
Wait(5 * time.Second).
Tx(tx)
// Check that the task op was built correctly
assert.Equal(t, task, op.task)
assert.Equal(t, tx, op.tx)
assert.Equal(t, 5*time.Second, *op.wait)
// Remove the transaction and delay so we can process the task immediately
op.tx, op.wait = nil, nil
err := op.Save()
require.NoError(t, err)
// Start the runner
ctx, cancel := context.WithCancel(context.Background())
go c.Tasks.StartRunner(ctx)
defer cancel()
// Check for up to 5 seconds if the task executed
start := time.Now()
waitLoop:
for {
switch {
case subCalled:
break waitLoop
case time.Since(start) > (5 * time.Second):
break waitLoop
default:
time.Sleep(10 * time.Millisecond)
}
}
assert.True(t, subCalled)
}

View file

@ -185,7 +185,7 @@ func (t *TemplateRenderer) cachePage(ctx echo.Context, page page.Page, html *byt
// The request URL is used as the cache key so the middleware can serve the
// cached page on matching requests
key := ctx.Request().URL.String()
cp := CachedPage{
cp := &CachedPage{
URL: key,
HTML: html.Bytes(),
Headers: headers,
@ -217,7 +217,6 @@ func (t *TemplateRenderer) GetCachedPage(ctx echo.Context, url string) (*CachedP
Get().
Group(cachedPageGroup).
Key(url).
Type(new(CachedPage)).
Fetch(ctx.Request().Context())
if err != nil {

View file

@ -2,21 +2,35 @@ package tasks
import (
"context"
"log"
"github.com/hibiken/asynq"
"github.com/mikestefanello/pagoda/pkg/log"
"github.com/mikestefanello/pagoda/pkg/services"
)
// TypeExample is the type for the example task.
// This is what is passed in to TaskClient.New() when creating a new task
const TypeExample = "example_task"
// ExampleProcessor processes example tasks
type ExampleProcessor struct {
// ExampleTask is an example implementation of services.Task
// This represents the task that can be queued for execution via the task client and should contain everything
// that your queue subscriber needs to process the task.
type ExampleTask struct {
Message string
}
// ProcessTask handles the processing of the task
func (p *ExampleProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
log.Printf("executing task: %s", t.Type())
return nil
// Name satisfies the services.Task interface by proviing a unique name for this Task type
func (t ExampleTask) Name() string {
return "example_task"
}
// NewExampleTaskQueue provides a Queue that can process ExampleTask tasks
// The service container is provided so the subscriber can have access to the app dependencies.
// All queues must be registered in the Register() function.
// Whenever an ExampleTask is added to the task client, it will be queued and eventually sent here for execution.
func NewExampleTaskQueue(c *services.Container) services.Queue {
return services.NewQueue[ExampleTask](func(ctx context.Context, task ExampleTask) error {
log.Default().Info("Example task received",
"message", task.Message,
)
log.Default().Info("This can access the container for dependencies",
"echo", c.Web.Reverse("home"),
)
return nil
})
}

10
pkg/tasks/register.go Normal file
View file

@ -0,0 +1,10 @@
package tasks
import (
"github.com/mikestefanello/pagoda/pkg/services"
)
// Register registers all task queues with the task client
func Register(c *services.Container) {
c.Tasks.Register(NewExampleTaskQueue(c))
}