Added tests for the task client and runner.
This commit is contained in:
parent
b68f9bdf3c
commit
5707343d57
6 changed files with 71 additions and 53 deletions
7
.github/workflows/test.yml
vendored
7
.github/workflows/test.yml
vendored
|
|
@ -27,10 +27,5 @@ jobs:
|
||||||
restore-keys: |
|
restore-keys: |
|
||||||
${{ runner.os }}-go-
|
${{ runner.os }}-go-
|
||||||
|
|
||||||
- name: Start containers
|
|
||||||
run: |
|
|
||||||
docker-compose up -d
|
|
||||||
sleep 3
|
|
||||||
|
|
||||||
- name: Test
|
- name: Test
|
||||||
run: go test -p 1 ./...
|
run: go test ./...
|
||||||
|
|
|
||||||
|
|
@ -61,7 +61,7 @@ func main() {
|
||||||
|
|
||||||
// Start the task runner to execute queued tasks
|
// Start the task runner to execute queued tasks
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
c.Tasks.StartRunner(ctx)
|
go c.Tasks.StartRunner(ctx)
|
||||||
|
|
||||||
// Wait for interrupt signal to gracefully shut down the server with a timeout of 10 seconds.
|
// Wait for interrupt signal to gracefully shut down the server with a timeout of 10 seconds.
|
||||||
quit := make(chan os.Signal, 1)
|
quit := make(chan os.Signal, 1)
|
||||||
|
|
|
||||||
|
|
@ -1,18 +0,0 @@
|
||||||
version: "3"
|
|
||||||
|
|
||||||
services:
|
|
||||||
cache:
|
|
||||||
image: "redis:alpine"
|
|
||||||
container_name: pagoda_cache
|
|
||||||
ports:
|
|
||||||
- "127.0.0.1:6379:6379"
|
|
||||||
db:
|
|
||||||
# PG 16 is currently not supported https://github.com/ent/ent/issues/3750
|
|
||||||
image: postgres:15-alpine
|
|
||||||
container_name: pagoda_db
|
|
||||||
ports:
|
|
||||||
- "127.0.0.1:5432:5432"
|
|
||||||
environment:
|
|
||||||
- POSTGRES_USER=admin
|
|
||||||
- POSTGRES_PASSWORD=admin
|
|
||||||
- POSTGRES_DB=app
|
|
||||||
|
|
@ -98,6 +98,7 @@ func NewTaskClient(cfg config.TasksConfig, db *sql.DB) (*TaskClient, error) {
|
||||||
// StartRunner starts the scheduler service which adds scheduled tasks to the queue.
|
// StartRunner starts the scheduler service which adds scheduled tasks to the queue.
|
||||||
// This must be running in order to execute queued tasked.
|
// This must be running in order to execute queued tasked.
|
||||||
// To stop the runner, cancel the context.
|
// To stop the runner, cancel the context.
|
||||||
|
// This is a blocking call.
|
||||||
func (t *TaskClient) StartRunner(ctx context.Context) {
|
func (t *TaskClient) StartRunner(ctx context.Context) {
|
||||||
t.runner.Start(ctx)
|
t.runner.Start(ctx)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,29 +1,69 @@
|
||||||
package services
|
package services
|
||||||
|
|
||||||
// TODO
|
import (
|
||||||
//func TestTaskClient_New(t *testing.T) {
|
"context"
|
||||||
// now := time.Now()
|
"database/sql"
|
||||||
// tk := c.Tasks.
|
"github.com/stretchr/testify/assert"
|
||||||
// New("task1").
|
"github.com/stretchr/testify/require"
|
||||||
// Payload("payload").
|
"testing"
|
||||||
// Queue("queue").
|
"time"
|
||||||
// Periodic("@every 5s").
|
)
|
||||||
// MaxRetries(5).
|
|
||||||
// Timeout(5 * time.Second).
|
type testTask struct {
|
||||||
// Deadline(now).
|
Val int
|
||||||
// At(now).
|
}
|
||||||
// Wait(6 * time.Second).
|
|
||||||
// Retain(7 * time.Second)
|
func (t testTask) Name() string {
|
||||||
//
|
return "test_task"
|
||||||
// assert.Equal(t, "task1", tk.typ)
|
}
|
||||||
// assert.Equal(t, "payload", tk.payload.(string))
|
|
||||||
// assert.Equal(t, "queue", *tk.queue)
|
func TestTaskClient_New(t *testing.T) {
|
||||||
// assert.Equal(t, "@every 5s", *tk.periodic)
|
var subCalled bool
|
||||||
// assert.Equal(t, 5, *tk.maxRetries)
|
|
||||||
// assert.Equal(t, 5*time.Second, *tk.timeout)
|
queue := NewQueue[testTask](func(ctx context.Context, task testTask) error {
|
||||||
// assert.Equal(t, now, *tk.deadline)
|
subCalled = true
|
||||||
// assert.Equal(t, now, *tk.at)
|
assert.Equal(t, 123, task.Val)
|
||||||
// assert.Equal(t, 6*time.Second, *tk.wait)
|
return nil
|
||||||
// assert.Equal(t, 7*time.Second, *tk.retain)
|
})
|
||||||
// assert.NoError(t, tk.Save())
|
c.Tasks.Register(queue)
|
||||||
//}
|
|
||||||
|
task := testTask{Val: 123}
|
||||||
|
|
||||||
|
tx := &sql.Tx{}
|
||||||
|
|
||||||
|
op := c.Tasks.
|
||||||
|
New(task).
|
||||||
|
Wait(5 * time.Second).
|
||||||
|
Tx(tx)
|
||||||
|
|
||||||
|
// Check that the task op was built correctly
|
||||||
|
assert.Equal(t, task, op.task)
|
||||||
|
assert.Equal(t, tx, op.tx)
|
||||||
|
assert.Equal(t, 5*time.Second, *op.wait)
|
||||||
|
|
||||||
|
// Remove the transaction and delay so we can process the task immediately
|
||||||
|
op.tx, op.wait = nil, nil
|
||||||
|
err := op.Save()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Start the runner
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
go c.Tasks.StartRunner(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Check for up to 5 seconds if the task executed
|
||||||
|
start := time.Now()
|
||||||
|
waitLoop:
|
||||||
|
for {
|
||||||
|
switch {
|
||||||
|
case subCalled:
|
||||||
|
break waitLoop
|
||||||
|
case time.Since(start) > (5 * time.Second):
|
||||||
|
break waitLoop
|
||||||
|
default:
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.True(t, subCalled)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@
|
||||||
<p>Warning</p>
|
<p>Warning</p>
|
||||||
</div>
|
</div>
|
||||||
<div class="message-body">
|
<div class="message-body">
|
||||||
This route has caching enabled so hot-reloading in the local environment will not work. Check the Redis cache for a key matching the URL path.
|
This route has caching enabled so hot-reloading in the local environment will not work.
|
||||||
</div>
|
</div>
|
||||||
</article>
|
</article>
|
||||||
{{- end}}
|
{{- end}}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue