Added optional delay to tasks. Pool buffers when encoding.

This commit is contained in:
mikestefanello 2024-06-20 22:11:53 -04:00
parent 912ae2ca6b
commit b68f9bdf3c
8 changed files with 96 additions and 131 deletions

View file

@ -72,10 +72,11 @@ func (h *Contact) Submit(ctx echo.Context) error {
return err
}
// TODO create a new page for this
err = h.tasks.New(tasks.ExampleTask{
Message: input.Message,
}).
Wait(30 * time.Second).
Wait(10 * time.Second).
Save()
if err != nil {
return err

View file

@ -71,8 +71,9 @@ type (
// locking, and we need to keep track of this index in order to keep everything in sync.
// If using something like Redis for caching, you can leverage sets to store the index.
// Cache tags can be useful and convenient, so you should decide if your app benefits enough from this.
// As it stands there, there is no limiting how much memory this will consume and it will track all keys
// and tags added and removed from the cache.
// As it stands here, there is no limiting how much memory this will consume and it will track all keys
// and tags added and removed from the cache. You could store these in the cache itself but allowing these to
// be evicted poses challenges.
tagIndex struct {
sync.Mutex
tags map[string]map[string]struct{} // tag->keys
@ -314,16 +315,17 @@ func (i *tagIndex) purgeTags(tags ...string) []string {
keys := make([]string, 0)
for _, tag := range tags {
tagKeys := i.tags[tag]
delete(i.tags, tag)
if tagKeys, exists := i.tags[tag]; exists {
delete(i.tags, tag)
for key := range tagKeys {
delete(i.keys[key], tag)
if len(i.keys[key]) == 0 {
delete(i.keys, key)
for key := range tagKeys {
delete(i.keys[key], tag)
if len(i.keys[key]) == 0 {
delete(i.keys, key)
}
keys = append(keys, key)
}
keys = append(keys, key)
}
}
@ -335,13 +337,14 @@ func (i *tagIndex) purgeKeys(keys ...string) {
defer i.Unlock()
for _, key := range keys {
keyTags := i.keys[key]
delete(i.keys, key)
if keyTags, exists := i.keys[key]; exists {
delete(i.keys, key)
for tag := range keyTags {
delete(i.tags[tag], key)
if len(i.tags[tag]) == 0 {
delete(i.tags, tag)
for tag := range keyTags {
delete(i.tags[tag], key)
if len(i.tags[tag]) == 0 {
delete(i.tags, tag)
}
}
}
}

View file

@ -13,6 +13,7 @@ func TestCacheClient(t *testing.T) {
type cacheTest struct {
Value string
}
// Cache some data
data := cacheTest{Value: "abcdef"}
group := "testgroup"
@ -22,7 +23,7 @@ func TestCacheClient(t *testing.T) {
Group(group).
Key(key).
Data(data).
Expiration(time.Hour).
Expiration(500 * time.Millisecond).
Save(context.Background())
require.NoError(t, err)
@ -53,7 +54,7 @@ func TestCacheClient(t *testing.T) {
require.NoError(t, err)
// The data should be gone
assertFlushed := func() {
assertFlushed := func(key string) {
// The data should be gone
_, err = c.Cache.
Get().
@ -62,20 +63,33 @@ func TestCacheClient(t *testing.T) {
Fetch(context.Background())
assert.Equal(t, ErrCacheMiss, err)
}
assertFlushed()
assertFlushed(key)
// Set with tags
key = "testkey2"
err = c.Cache.
Set().
Group(group).
Key(key).
Data(data).
Tags("tag1").
Tags("tag1", "tag2").
Expiration(time.Hour).
Save(context.Background())
require.NoError(t, err)
// Flush the tag
// Check the tag index
index := c.Cache.store.(*inMemoryCacheStore).tagIndex
gk := c.Cache.cacheKey(group, key)
_, exists := index.tags["tag1"][gk]
assert.True(t, exists)
_, exists = index.tags["tag2"][gk]
assert.True(t, exists)
_, exists = index.keys[gk]["tag1"]
assert.True(t, exists)
_, exists = index.keys[gk]["tag2"]
assert.True(t, exists)
// Flush one of tags
err = c.Cache.
Flush().
Tags("tag1").
@ -83,22 +97,9 @@ func TestCacheClient(t *testing.T) {
require.NoError(t, err)
// The data should be gone
assertFlushed()
assertFlushed(key)
// Set with expiration
err = c.Cache.
Set().
Group(group).
Key(key).
Data(data).
Expiration(time.Millisecond).
Save(context.Background())
require.NoError(t, err)
// Wait for expiration
// TODO why does this need to wait so long?
time.Sleep(time.Millisecond * 500)
// The data should be gone
assertFlushed()
// The index should be empty
assert.Empty(t, index.tags)
assert.Empty(t, index.keys)
}

View file

@ -1,10 +1,12 @@
package services
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"encoding/gob"
"strings"
"sync"
"time"
"github.com/maragudk/goqite"
@ -18,8 +20,9 @@ type (
// Under the hood we create only a single queue using goqite for all tasks because we do not want more than one
// runner to process the tasks. The TaskClient wrapper provides abstractions for separate, type-safe queues.
TaskClient struct {
queue *goqite.Queue
runner *jobs.Runner
queue *goqite.Queue
runner *jobs.Runner
buffers sync.Pool
}
// Task is a job that can be added to a queue and later passed to and executed by a QueueSubscriber.
@ -75,6 +78,11 @@ func NewTaskClient(cfg config.TasksConfig, db *sql.DB) (*TaskClient, error) {
Name: "tasks",
MaxReceive: cfg.MaxRetries,
}),
buffers: sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
},
}
t.runner = jobs.NewRunner(jobs.NewRunnerOpts{
@ -87,12 +95,6 @@ func NewTaskClient(cfg config.TasksConfig, db *sql.DB) (*TaskClient, error) {
return t, nil
}
//// Close closes the connection to the task service
//func (t *TaskClient) Close() error {
// // TODO close the runner
// return t.db.Close()
//}
// StartRunner starts the scheduler service which adds scheduled tasks to the queue.
// This must be running in order to execute queued tasked.
// To stop the runner, cancel the context.
@ -131,28 +133,46 @@ func (t *TaskSaveOp) Tx(tx *sql.Tx) *TaskSaveOp {
return t
}
// Save saves the task so it can be queued for execution
// Save saves the task, so it can be queued for execution
func (t *TaskSaveOp) Save() error {
// Build the payload
// TODO use gob?
payload, err := json.Marshal(t.task)
if err != nil {
type message struct {
Name string
Message []byte
}
// Encode the task
taskBuf := t.client.buffers.Get().(*bytes.Buffer)
if err := gob.NewEncoder(taskBuf).Encode(t.task); err != nil {
return err
}
//msg := goqite.Message{
// Body: payload,
//}
//
//if t.wait != nil {
// msg.Delay = *t.wait
//}
// TODO support delay
//return t.client.queue.Send(context.Background(), msg)
// Wrap and encode the message
// This is needed as a workaround because goqite doesn't support delays using the jobs package,
// so we format the message the way it expects but use the queue to supply the delay
msgBuf := t.client.buffers.Get().(*bytes.Buffer)
wrapper := message{Name: t.task.Name(), Message: taskBuf.Bytes()}
if err := gob.NewEncoder(msgBuf).Encode(wrapper); err != nil {
return err
}
msg := goqite.Message{
Body: msgBuf.Bytes(),
}
if t.wait != nil {
msg.Delay = *t.wait
}
// Put the buffers back in the pool for re-use
taskBuf.Reset()
msgBuf.Reset()
t.client.buffers.Put(taskBuf)
t.client.buffers.Put(msgBuf)
if t.tx == nil {
return jobs.Create(context.Background(), t.client.queue, t.task.Name(), payload)
return t.client.queue.Send(context.Background(), msg)
} else {
return jobs.CreateTx(context.Background(), t.tx, t.client.queue, t.task.Name(), payload)
return t.client.queue.SendTx(context.Background(), t.tx, msg)
}
}
@ -174,7 +194,7 @@ func (q *queue[T]) Name() string {
func (q *queue[T]) Receive(ctx context.Context, payload []byte) error {
var obj T
err := json.Unmarshal(payload, &obj)
err := gob.NewDecoder(bytes.NewReader(payload)).Decode(&obj)
if err != nil {
return err
}

View file

@ -1,5 +1,6 @@
package services
// TODO
//func TestTaskClient_New(t *testing.T) {
// now := time.Now()
// tk := c.Tasks.