From b68f9bdf3c7c6d28b8d00066448dce0753e50040 Mon Sep 17 00:00:00 2001 From: mikestefanello Date: Thu, 20 Jun 2024 22:11:53 -0400 Subject: [PATCH] Added optional delay to tasks. Pool buffers when encoding. --- Makefile | 59 +------------------------------ cmd/web/main.go | 2 +- config/config.go | 10 ++---- pkg/handlers/contact.go | 3 +- pkg/services/cache.go | 35 +++++++++--------- pkg/services/cache_test.go | 45 ++++++++++++------------ pkg/services/tasks.go | 72 ++++++++++++++++++++++++-------------- pkg/services/tasks_test.go | 1 + 8 files changed, 96 insertions(+), 131 deletions(-) diff --git a/Makefile b/Makefile index 02044ef..d19a1e1 100644 --- a/Makefile +++ b/Makefile @@ -1,32 +1,3 @@ -# Determine if you have docker-compose or docker compose installed locally -# If this does not work on your system, just set the name of the executable you have installed -DCO_BIN := $(shell { command -v docker-compose || command -v docker compose; } 2>/dev/null) - -# Connect to the primary database -.PHONY: db -db: - docker exec -it pagoda_db psql postgresql://admin:admin@localhost:5432/app - -# Connect to the test database (you must run tests first before running this) -.PHONY: db-test -db-test: - docker exec -it pagoda_db psql postgresql://admin:admin@localhost:5432/app_test - -# Connect to the primary cache -.PHONY: cache -cache: - docker exec -it pagoda_cache redis-cli - -# Clear the primary cache -.PHONY: cache-clear -cache-clear: - docker exec -it pagoda_cache redis-cli flushall - - # Connect to the test cache -.PHONY: cache-test -cache-test: - docker exec -it pagoda_cache redis-cli -n 1 - # Install Ent code-generation module .PHONY: ent-install ent-install: @@ -42,28 +13,6 @@ ent-gen: ent-new: go run entgo.io/ent/cmd/ent new $(name) -# Start the Docker containers -.PHONY: up -up: - $(DCO_BIN) up -d - sleep 3 - -# Stop the Docker containers -.PHONY: stop -stop: - $(DCO_BIN) stop - -# Drop the Docker containers to wipe all data -.PHONY: down -down: - $(DCO_BIN) down - -# Rebuild Docker containers to wipe all data -.PHONY: reset -reset: - $(DCO_BIN) down - make up - # Run the application .PHONY: run run: @@ -73,13 +22,7 @@ run: # Run all tests .PHONY: test test: - go test -count=1 -p 1 ./... - -# Run the worker -.PHONY: worker -worker: - clear - go run cmd/worker/main.go + go test ./... # Check for direct dependency updates .PHONY: check-updates diff --git a/cmd/web/main.go b/cmd/web/main.go index 1335023..6b73f5d 100644 --- a/cmd/web/main.go +++ b/cmd/web/main.go @@ -59,7 +59,7 @@ func main() { // Register all task queues tasks.Register(c) - // Start the task runner to executed queued tasks + // Start the task runner to execute queued tasks ctx, cancel := context.WithCancel(context.Background()) c.Tasks.StartRunner(ctx) diff --git a/config/config.go b/config/config.go index 7cb69f7..b868259 100644 --- a/config/config.go +++ b/config/config.go @@ -106,13 +106,9 @@ type ( // TasksConfig stores the tasks configuration TasksConfig struct { - // TODO remove separate DB? - Driver string - Connection string - TestConnection string - PollInterval time.Duration - MaxRetries int - Goroutines int + PollInterval time.Duration + MaxRetries int + Goroutines int } // MailConfig stores the mail configuration diff --git a/pkg/handlers/contact.go b/pkg/handlers/contact.go index 1a865d7..0273364 100644 --- a/pkg/handlers/contact.go +++ b/pkg/handlers/contact.go @@ -72,10 +72,11 @@ func (h *Contact) Submit(ctx echo.Context) error { return err } + // TODO create a new page for this err = h.tasks.New(tasks.ExampleTask{ Message: input.Message, }). - Wait(30 * time.Second). + Wait(10 * time.Second). Save() if err != nil { return err diff --git a/pkg/services/cache.go b/pkg/services/cache.go index bb17f12..331ef75 100644 --- a/pkg/services/cache.go +++ b/pkg/services/cache.go @@ -71,8 +71,9 @@ type ( // locking, and we need to keep track of this index in order to keep everything in sync. // If using something like Redis for caching, you can leverage sets to store the index. // Cache tags can be useful and convenient, so you should decide if your app benefits enough from this. - // As it stands there, there is no limiting how much memory this will consume and it will track all keys - // and tags added and removed from the cache. + // As it stands here, there is no limiting how much memory this will consume and it will track all keys + // and tags added and removed from the cache. You could store these in the cache itself but allowing these to + // be evicted poses challenges. tagIndex struct { sync.Mutex tags map[string]map[string]struct{} // tag->keys @@ -314,16 +315,17 @@ func (i *tagIndex) purgeTags(tags ...string) []string { keys := make([]string, 0) for _, tag := range tags { - tagKeys := i.tags[tag] - delete(i.tags, tag) + if tagKeys, exists := i.tags[tag]; exists { + delete(i.tags, tag) - for key := range tagKeys { - delete(i.keys[key], tag) - if len(i.keys[key]) == 0 { - delete(i.keys, key) + for key := range tagKeys { + delete(i.keys[key], tag) + if len(i.keys[key]) == 0 { + delete(i.keys, key) + } + + keys = append(keys, key) } - - keys = append(keys, key) } } @@ -335,13 +337,14 @@ func (i *tagIndex) purgeKeys(keys ...string) { defer i.Unlock() for _, key := range keys { - keyTags := i.keys[key] - delete(i.keys, key) + if keyTags, exists := i.keys[key]; exists { + delete(i.keys, key) - for tag := range keyTags { - delete(i.tags[tag], key) - if len(i.tags[tag]) == 0 { - delete(i.tags, tag) + for tag := range keyTags { + delete(i.tags[tag], key) + if len(i.tags[tag]) == 0 { + delete(i.tags, tag) + } } } } diff --git a/pkg/services/cache_test.go b/pkg/services/cache_test.go index a152abc..fdaa308 100644 --- a/pkg/services/cache_test.go +++ b/pkg/services/cache_test.go @@ -13,6 +13,7 @@ func TestCacheClient(t *testing.T) { type cacheTest struct { Value string } + // Cache some data data := cacheTest{Value: "abcdef"} group := "testgroup" @@ -22,7 +23,7 @@ func TestCacheClient(t *testing.T) { Group(group). Key(key). Data(data). - Expiration(time.Hour). + Expiration(500 * time.Millisecond). Save(context.Background()) require.NoError(t, err) @@ -53,7 +54,7 @@ func TestCacheClient(t *testing.T) { require.NoError(t, err) // The data should be gone - assertFlushed := func() { + assertFlushed := func(key string) { // The data should be gone _, err = c.Cache. Get(). @@ -62,20 +63,33 @@ func TestCacheClient(t *testing.T) { Fetch(context.Background()) assert.Equal(t, ErrCacheMiss, err) } - assertFlushed() + assertFlushed(key) // Set with tags + key = "testkey2" err = c.Cache. Set(). Group(group). Key(key). Data(data). - Tags("tag1"). + Tags("tag1", "tag2"). Expiration(time.Hour). Save(context.Background()) require.NoError(t, err) - // Flush the tag + // Check the tag index + index := c.Cache.store.(*inMemoryCacheStore).tagIndex + gk := c.Cache.cacheKey(group, key) + _, exists := index.tags["tag1"][gk] + assert.True(t, exists) + _, exists = index.tags["tag2"][gk] + assert.True(t, exists) + _, exists = index.keys[gk]["tag1"] + assert.True(t, exists) + _, exists = index.keys[gk]["tag2"] + assert.True(t, exists) + + // Flush one of tags err = c.Cache. Flush(). Tags("tag1"). @@ -83,22 +97,9 @@ func TestCacheClient(t *testing.T) { require.NoError(t, err) // The data should be gone - assertFlushed() + assertFlushed(key) - // Set with expiration - err = c.Cache. - Set(). - Group(group). - Key(key). - Data(data). - Expiration(time.Millisecond). - Save(context.Background()) - require.NoError(t, err) - - // Wait for expiration - // TODO why does this need to wait so long? - time.Sleep(time.Millisecond * 500) - - // The data should be gone - assertFlushed() + // The index should be empty + assert.Empty(t, index.tags) + assert.Empty(t, index.keys) } diff --git a/pkg/services/tasks.go b/pkg/services/tasks.go index d18dc22..cb963f4 100644 --- a/pkg/services/tasks.go +++ b/pkg/services/tasks.go @@ -1,10 +1,12 @@ package services import ( + "bytes" "context" "database/sql" - "encoding/json" + "encoding/gob" "strings" + "sync" "time" "github.com/maragudk/goqite" @@ -18,8 +20,9 @@ type ( // Under the hood we create only a single queue using goqite for all tasks because we do not want more than one // runner to process the tasks. The TaskClient wrapper provides abstractions for separate, type-safe queues. TaskClient struct { - queue *goqite.Queue - runner *jobs.Runner + queue *goqite.Queue + runner *jobs.Runner + buffers sync.Pool } // Task is a job that can be added to a queue and later passed to and executed by a QueueSubscriber. @@ -75,6 +78,11 @@ func NewTaskClient(cfg config.TasksConfig, db *sql.DB) (*TaskClient, error) { Name: "tasks", MaxReceive: cfg.MaxRetries, }), + buffers: sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(nil) + }, + }, } t.runner = jobs.NewRunner(jobs.NewRunnerOpts{ @@ -87,12 +95,6 @@ func NewTaskClient(cfg config.TasksConfig, db *sql.DB) (*TaskClient, error) { return t, nil } -//// Close closes the connection to the task service -//func (t *TaskClient) Close() error { -// // TODO close the runner -// return t.db.Close() -//} - // StartRunner starts the scheduler service which adds scheduled tasks to the queue. // This must be running in order to execute queued tasked. // To stop the runner, cancel the context. @@ -131,28 +133,46 @@ func (t *TaskSaveOp) Tx(tx *sql.Tx) *TaskSaveOp { return t } -// Save saves the task so it can be queued for execution +// Save saves the task, so it can be queued for execution func (t *TaskSaveOp) Save() error { - // Build the payload - // TODO use gob? - payload, err := json.Marshal(t.task) - if err != nil { + type message struct { + Name string + Message []byte + } + + // Encode the task + taskBuf := t.client.buffers.Get().(*bytes.Buffer) + if err := gob.NewEncoder(taskBuf).Encode(t.task); err != nil { return err } - //msg := goqite.Message{ - // Body: payload, - //} - // - //if t.wait != nil { - // msg.Delay = *t.wait - //} - // TODO support delay - //return t.client.queue.Send(context.Background(), msg) + // Wrap and encode the message + // This is needed as a workaround because goqite doesn't support delays using the jobs package, + // so we format the message the way it expects but use the queue to supply the delay + msgBuf := t.client.buffers.Get().(*bytes.Buffer) + wrapper := message{Name: t.task.Name(), Message: taskBuf.Bytes()} + if err := gob.NewEncoder(msgBuf).Encode(wrapper); err != nil { + return err + } + + msg := goqite.Message{ + Body: msgBuf.Bytes(), + } + + if t.wait != nil { + msg.Delay = *t.wait + } + + // Put the buffers back in the pool for re-use + taskBuf.Reset() + msgBuf.Reset() + t.client.buffers.Put(taskBuf) + t.client.buffers.Put(msgBuf) + if t.tx == nil { - return jobs.Create(context.Background(), t.client.queue, t.task.Name(), payload) + return t.client.queue.Send(context.Background(), msg) } else { - return jobs.CreateTx(context.Background(), t.tx, t.client.queue, t.task.Name(), payload) + return t.client.queue.SendTx(context.Background(), t.tx, msg) } } @@ -174,7 +194,7 @@ func (q *queue[T]) Name() string { func (q *queue[T]) Receive(ctx context.Context, payload []byte) error { var obj T - err := json.Unmarshal(payload, &obj) + err := gob.NewDecoder(bytes.NewReader(payload)).Decode(&obj) if err != nil { return err } diff --git a/pkg/services/tasks_test.go b/pkg/services/tasks_test.go index 64b5fbf..4008f4a 100644 --- a/pkg/services/tasks_test.go +++ b/pkg/services/tasks_test.go @@ -1,5 +1,6 @@ package services +// TODO //func TestTaskClient_New(t *testing.T) { // now := time.Now() // tk := c.Tasks.