Rewrote cache implemenation.
This commit is contained in:
parent
ab55705b9f
commit
3f46617f80
7 changed files with 285 additions and 148 deletions
|
|
@ -89,12 +89,8 @@ type (
|
||||||
|
|
||||||
// CacheConfig stores the cache configuration
|
// CacheConfig stores the cache configuration
|
||||||
CacheConfig struct {
|
CacheConfig struct {
|
||||||
Hostname string
|
Capacity int
|
||||||
Port uint16
|
Expiration struct {
|
||||||
Password string
|
|
||||||
Database int
|
|
||||||
TestDatabase int
|
|
||||||
Expiration struct {
|
|
||||||
StaticFile time.Duration
|
StaticFile time.Duration
|
||||||
Page time.Duration
|
Page time.Duration
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,11 +21,7 @@ app:
|
||||||
emailVerificationTokenExpiration: "12h"
|
emailVerificationTokenExpiration: "12h"
|
||||||
|
|
||||||
cache:
|
cache:
|
||||||
hostname: "localhost"
|
capacity: 100000
|
||||||
port: 6379
|
|
||||||
password: ""
|
|
||||||
database: 0
|
|
||||||
testDatabase: 1
|
|
||||||
expiration:
|
expiration:
|
||||||
staticFile: "4380h"
|
staticFile: "4380h"
|
||||||
page: "24h"
|
page: "24h"
|
||||||
|
|
|
||||||
|
|
@ -59,13 +59,7 @@ func LogRequest() echo.MiddlewareFunc {
|
||||||
"latency", stop.Sub(start).String(),
|
"latency", stop.Sub(start).String(),
|
||||||
)
|
)
|
||||||
|
|
||||||
msg := fmt.Sprintf("%s %s", req.Method, func() string {
|
msg := fmt.Sprintf("%s %s", req.Method, req.URL.RequestURI())
|
||||||
p := req.URL.Path
|
|
||||||
if p == "" {
|
|
||||||
p = "/"
|
|
||||||
}
|
|
||||||
return p
|
|
||||||
}())
|
|
||||||
|
|
||||||
if res.Status >= 500 {
|
if res.Status >= 500 {
|
||||||
sub.Error(msg)
|
sub.Error(msg)
|
||||||
|
|
|
||||||
|
|
@ -4,21 +4,39 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/maypok86/otter"
|
"github.com/maypok86/otter"
|
||||||
"github.com/mikestefanello/pagoda/config"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ErrCacheMiss indicates that the requested key does not exist in the cache
|
||||||
|
var ErrCacheMiss = errors.New("cache miss")
|
||||||
|
|
||||||
type (
|
type (
|
||||||
// CacheClient is the client that allows you to interact with the cache
|
// CacheStore provides an interface for cache storage
|
||||||
CacheClient struct {
|
CacheStore interface {
|
||||||
// cache stores the cache interface
|
// get attempts to get a cached value
|
||||||
cache *otter.CacheWithVariableTTL[string, any]
|
get(context.Context, *CacheGetOp) (any, error)
|
||||||
|
|
||||||
|
// set attempts to set an entry in the cache
|
||||||
|
set(context.Context, *CacheSetOp) error
|
||||||
|
|
||||||
|
// flush removes a given key and/or tags from the cache
|
||||||
|
flush(context.Context, *CacheFlushOp) error
|
||||||
|
|
||||||
|
// close shuts down the cache storage
|
||||||
|
close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// cacheSet handles chaining a set operation
|
// CacheClient is the client that allows you to interact with the cache
|
||||||
cacheSet struct {
|
CacheClient struct {
|
||||||
|
// store holds the Cache storage
|
||||||
|
store CacheStore
|
||||||
|
}
|
||||||
|
|
||||||
|
// CacheSetOp handles chaining a set operation
|
||||||
|
CacheSetOp struct {
|
||||||
client *CacheClient
|
client *CacheClient
|
||||||
key string
|
key string
|
||||||
group string
|
group string
|
||||||
|
|
@ -27,60 +45,68 @@ type (
|
||||||
tags []string
|
tags []string
|
||||||
}
|
}
|
||||||
|
|
||||||
// cacheGet handles chaining a get operation
|
// CacheGetOp handles chaining a get operation
|
||||||
cacheGet struct {
|
CacheGetOp struct {
|
||||||
client *CacheClient
|
client *CacheClient
|
||||||
key string
|
key string
|
||||||
group string
|
group string
|
||||||
}
|
}
|
||||||
|
|
||||||
// cacheFlush handles chaining a flush operation
|
// CacheFlushOp handles chaining a flush operation
|
||||||
cacheFlush struct {
|
CacheFlushOp struct {
|
||||||
client *CacheClient
|
client *CacheClient
|
||||||
key string
|
key string
|
||||||
group string
|
group string
|
||||||
tags []string
|
tags []string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// inMemoryCacheStore is a cache store implementation in memory
|
||||||
|
inMemoryCacheStore struct {
|
||||||
|
store *otter.CacheWithVariableTTL[string, any]
|
||||||
|
tagIndex *tagIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
// tagIndex maintains an index to support cache tags for in-memory cache stores.
|
||||||
|
// There is a performance and memory impact to using cache tags since set and get operations using tags will require
|
||||||
|
// locking, and we need to keep track of this index in order to keep everything in sync.
|
||||||
|
// If using something like Redis for caching, you can leverage sets to store the index.
|
||||||
|
// Cache tags can be useful and convenient, so you should decide if your app benefits enough from this.
|
||||||
|
// As it stands there, there is no limiting how much memory this will consume and it will track all keys
|
||||||
|
// and tags added and removed from the cache.
|
||||||
|
tagIndex struct {
|
||||||
|
sync.Mutex
|
||||||
|
tags map[string]map[string]struct{} // tag->keys
|
||||||
|
keys map[string]map[string]struct{} // key->tags
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewCacheClient creates a new cache client
|
// NewCacheClient creates a new cache client
|
||||||
func NewCacheClient(cfg *config.Config) (*CacheClient, error) {
|
func NewCacheClient(store CacheStore) *CacheClient {
|
||||||
cache, err := otter.MustBuilder[string, any](10000).
|
return &CacheClient{store: store}
|
||||||
WithVariableTTL().
|
|
||||||
DeletionListener(func(key string, value any, cause otter.DeletionCause) {
|
|
||||||
// todo
|
|
||||||
}).
|
|
||||||
Build()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &CacheClient{cache: &cache}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the connection to the cache
|
// Close closes the connection to the cache
|
||||||
func (c *CacheClient) Close() {
|
func (c *CacheClient) Close() {
|
||||||
c.cache.Close()
|
c.store.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set creates a cache set operation
|
// Set creates a cache set operation
|
||||||
func (c *CacheClient) Set() *cacheSet {
|
func (c *CacheClient) Set() *CacheSetOp {
|
||||||
return &cacheSet{
|
return &CacheSetOp{
|
||||||
client: c,
|
client: c,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get creates a cache get operation
|
// Get creates a cache get operation
|
||||||
func (c *CacheClient) Get() *cacheGet {
|
func (c *CacheClient) Get() *CacheGetOp {
|
||||||
return &cacheGet{
|
return &CacheGetOp{
|
||||||
client: c,
|
client: c,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush creates a cache flush operation
|
// Flush creates a cache flush operation
|
||||||
func (c *CacheClient) Flush() *cacheFlush {
|
func (c *CacheClient) Flush() *CacheFlushOp {
|
||||||
return &cacheFlush{
|
return &CacheFlushOp{
|
||||||
client: c,
|
client: c,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -94,74 +120,117 @@ func (c *CacheClient) cacheKey(group, key string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Key sets the cache key
|
// Key sets the cache key
|
||||||
func (c *cacheSet) Key(key string) *cacheSet {
|
func (c *CacheSetOp) Key(key string) *CacheSetOp {
|
||||||
c.key = key
|
c.key = key
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// Group sets the cache group
|
// Group sets the cache group
|
||||||
func (c *cacheSet) Group(group string) *cacheSet {
|
func (c *CacheSetOp) Group(group string) *CacheSetOp {
|
||||||
c.group = group
|
c.group = group
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// Data sets the data to cache
|
// Data sets the data to cache
|
||||||
func (c *cacheSet) Data(data any) *cacheSet {
|
func (c *CacheSetOp) Data(data any) *CacheSetOp {
|
||||||
c.data = data
|
c.data = data
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// Expiration sets the expiration duration of the cached data
|
// Expiration sets the expiration duration of the cached data
|
||||||
func (c *cacheSet) Expiration(expiration time.Duration) *cacheSet {
|
func (c *CacheSetOp) Expiration(expiration time.Duration) *CacheSetOp {
|
||||||
c.expiration = expiration
|
c.expiration = expiration
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tags sets the cache tags
|
// Tags sets the cache tags
|
||||||
func (c *cacheSet) Tags(tags ...string) *cacheSet {
|
func (c *CacheSetOp) Tags(tags ...string) *CacheSetOp {
|
||||||
c.tags = tags
|
c.tags = tags
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save saves the data in the cache
|
// Save saves the data in the cache
|
||||||
func (c *cacheSet) Save(ctx context.Context) error {
|
func (c *CacheSetOp) Save(ctx context.Context) error {
|
||||||
if c.key == "" {
|
switch {
|
||||||
|
case c.key == "":
|
||||||
return errors.New("no cache key specified")
|
return errors.New("no cache key specified")
|
||||||
}
|
case c.data == nil:
|
||||||
|
|
||||||
if c.data == nil {
|
|
||||||
return errors.New("no cache data specified")
|
return errors.New("no cache data specified")
|
||||||
|
case c.expiration == 0:
|
||||||
|
return errors.New("no cache expiration specified")
|
||||||
}
|
}
|
||||||
|
|
||||||
c.client.cache.Set(
|
return c.client.store.set(ctx, c)
|
||||||
c.client.cacheKey(c.group, c.key),
|
|
||||||
c.data,
|
|
||||||
c.expiration,
|
|
||||||
)
|
|
||||||
|
|
||||||
// TODO tags
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Key sets the cache key
|
// Key sets the cache key
|
||||||
func (c *cacheGet) Key(key string) *cacheGet {
|
func (c *CacheGetOp) Key(key string) *CacheGetOp {
|
||||||
c.key = key
|
c.key = key
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// Group sets the cache group
|
// Group sets the cache group
|
||||||
func (c *cacheGet) Group(group string) *cacheGet {
|
func (c *CacheGetOp) Group(group string) *CacheGetOp {
|
||||||
c.group = group
|
c.group = group
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch fetches the data from the cache
|
// Fetch fetches the data from the cache
|
||||||
func (c *cacheGet) Fetch(ctx context.Context) (any, error) {
|
func (c *CacheGetOp) Fetch(ctx context.Context) (any, error) {
|
||||||
if c.key == "" {
|
if c.key == "" {
|
||||||
return nil, errors.New("no cache key specified")
|
return nil, errors.New("no cache key specified")
|
||||||
}
|
}
|
||||||
|
|
||||||
v, exists := c.client.cache.Get(c.client.cacheKey(c.group, c.key))
|
return c.client.store.get(ctx, c)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Key sets the cache key
|
||||||
|
func (c *CacheFlushOp) Key(key string) *CacheFlushOp {
|
||||||
|
c.key = key
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
// Group sets the cache group
|
||||||
|
func (c *CacheFlushOp) Group(group string) *CacheFlushOp {
|
||||||
|
c.group = group
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tags sets the cache tags
|
||||||
|
func (c *CacheFlushOp) Tags(tags ...string) *CacheFlushOp {
|
||||||
|
c.tags = tags
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute flushes the data from the cache
|
||||||
|
func (c *CacheFlushOp) Execute(ctx context.Context) error {
|
||||||
|
return c.client.store.flush(ctx, c)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newInMemoryCache creates a new in-memory CacheStore
|
||||||
|
func newInMemoryCache(capacity int) (CacheStore, error) {
|
||||||
|
s := &inMemoryCacheStore{
|
||||||
|
tagIndex: newTagIndex(),
|
||||||
|
}
|
||||||
|
|
||||||
|
store, err := otter.MustBuilder[string, any](capacity).
|
||||||
|
WithVariableTTL().
|
||||||
|
DeletionListener(func(key string, value any, cause otter.DeletionCause) {
|
||||||
|
s.tagIndex.purgeKeys(key)
|
||||||
|
}).
|
||||||
|
Build()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.store = &store
|
||||||
|
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *inMemoryCacheStore) get(_ context.Context, op *CacheGetOp) (any, error) {
|
||||||
|
v, exists := s.store.Get(op.client.cacheKey(op.group, op.key))
|
||||||
|
|
||||||
if !exists {
|
if !exists {
|
||||||
return nil, ErrCacheMiss
|
return nil, ErrCacheMiss
|
||||||
|
|
@ -170,29 +239,110 @@ func (c *cacheGet) Fetch(ctx context.Context) (any, error) {
|
||||||
return v, nil
|
return v, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var ErrCacheMiss = errors.New("cache miss")
|
func (s *inMemoryCacheStore) set(_ context.Context, op *CacheSetOp) error {
|
||||||
|
key := op.client.cacheKey(op.group, op.key)
|
||||||
|
|
||||||
// Key sets the cache key
|
added := s.store.Set(
|
||||||
func (c *cacheFlush) Key(key string) *cacheFlush {
|
key,
|
||||||
c.key = key
|
op.data,
|
||||||
return c
|
op.expiration,
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(op.tags) > 0 {
|
||||||
|
s.tagIndex.setTags(key, op.tags...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !added {
|
||||||
|
return errors.New("cache set failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Group sets the cache group
|
func (s *inMemoryCacheStore) flush(_ context.Context, op *CacheFlushOp) error {
|
||||||
func (c *cacheFlush) Group(group string) *cacheFlush {
|
keys := make([]string, 0)
|
||||||
c.group = group
|
|
||||||
return c
|
if key := op.client.cacheKey(op.group, op.key); key != "" {
|
||||||
|
keys = append(keys, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(op.tags) > 0 {
|
||||||
|
keys = append(keys, s.tagIndex.purgeTags(op.tags...)...)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
s.store.Delete(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.tagIndex.purgeKeys(keys...)
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tags sets the cache tags
|
func (s *inMemoryCacheStore) close() {
|
||||||
func (c *cacheFlush) Tags(tags ...string) *cacheFlush {
|
s.store.Close()
|
||||||
c.tags = tags
|
|
||||||
return c
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute flushes the data from the cache
|
func newTagIndex() *tagIndex {
|
||||||
func (c *cacheFlush) Execute(ctx context.Context) {
|
return &tagIndex{
|
||||||
// TODO tags
|
tags: make(map[string]map[string]struct{}),
|
||||||
|
keys: make(map[string]map[string]struct{}),
|
||||||
c.client.cache.Delete(c.client.cacheKey(c.group, c.key))
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *tagIndex) setTags(key string, tags ...string) {
|
||||||
|
i.Lock()
|
||||||
|
defer i.Unlock()
|
||||||
|
|
||||||
|
if _, exists := i.keys[key]; !exists {
|
||||||
|
i.keys[key] = make(map[string]struct{})
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tag := range tags {
|
||||||
|
if _, exists := i.tags[tag]; !exists {
|
||||||
|
i.tags[tag] = make(map[string]struct{})
|
||||||
|
}
|
||||||
|
i.tags[tag][key] = struct{}{}
|
||||||
|
i.keys[key][tag] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *tagIndex) purgeTags(tags ...string) []string {
|
||||||
|
i.Lock()
|
||||||
|
defer i.Unlock()
|
||||||
|
|
||||||
|
keys := make([]string, 0)
|
||||||
|
|
||||||
|
for _, tag := range tags {
|
||||||
|
tagKeys := i.tags[tag]
|
||||||
|
delete(i.tags, tag)
|
||||||
|
|
||||||
|
for key := range tagKeys {
|
||||||
|
delete(i.keys[key], tag)
|
||||||
|
if len(i.keys[key]) == 0 {
|
||||||
|
delete(i.keys, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
keys = append(keys, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return keys
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *tagIndex) purgeKeys(keys ...string) {
|
||||||
|
i.Lock()
|
||||||
|
defer i.Unlock()
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
keyTags := i.keys[key]
|
||||||
|
delete(i.keys, key)
|
||||||
|
|
||||||
|
for tag := range keyTags {
|
||||||
|
delete(i.tags[tag], key)
|
||||||
|
if len(i.tags[tag]) == 0 {
|
||||||
|
delete(i.tags, tag)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ func TestCacheClient(t *testing.T) {
|
||||||
Group(group).
|
Group(group).
|
||||||
Key(key).
|
Key(key).
|
||||||
Data(data).
|
Data(data).
|
||||||
|
Expiration(time.Hour).
|
||||||
Save(context.Background())
|
Save(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
@ -32,23 +33,24 @@ func TestCacheClient(t *testing.T) {
|
||||||
Key(key).
|
Key(key).
|
||||||
Fetch(context.Background())
|
Fetch(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
cast, ok := fromCache.(*cacheTest)
|
cast, ok := fromCache.(cacheTest)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
assert.Equal(t, data, *cast)
|
assert.Equal(t, data, cast)
|
||||||
|
|
||||||
// The same key with the wrong group should fail
|
// The same key with the wrong group should fail
|
||||||
_, err = c.Cache.
|
_, err = c.Cache.
|
||||||
Get().
|
Get().
|
||||||
Key(key).
|
Key(key).
|
||||||
Fetch(context.Background())
|
Fetch(context.Background())
|
||||||
assert.Error(t, err)
|
assert.Equal(t, ErrCacheMiss, err)
|
||||||
|
|
||||||
// Flush the data
|
// Flush the data
|
||||||
c.Cache.
|
err = c.Cache.
|
||||||
Flush().
|
Flush().
|
||||||
Group(group).
|
Group(group).
|
||||||
Key(key).
|
Key(key).
|
||||||
Execute(context.Background())
|
Execute(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// The data should be gone
|
// The data should be gone
|
||||||
assertFlushed := func() {
|
assertFlushed := func() {
|
||||||
|
|
@ -69,14 +71,16 @@ func TestCacheClient(t *testing.T) {
|
||||||
Key(key).
|
Key(key).
|
||||||
Data(data).
|
Data(data).
|
||||||
Tags("tag1").
|
Tags("tag1").
|
||||||
|
Expiration(time.Hour).
|
||||||
Save(context.Background())
|
Save(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Flush the tag
|
// Flush the tag
|
||||||
c.Cache.
|
err = c.Cache.
|
||||||
Flush().
|
Flush().
|
||||||
Tags("tag1").
|
Tags("tag1").
|
||||||
Execute(context.Background())
|
Execute(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// The data should be gone
|
// The data should be gone
|
||||||
assertFlushed()
|
assertFlushed()
|
||||||
|
|
@ -92,7 +96,8 @@ func TestCacheClient(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Wait for expiration
|
// Wait for expiration
|
||||||
time.Sleep(time.Millisecond * 2)
|
// TODO why does this need to wait so long?
|
||||||
|
time.Sleep(time.Millisecond * 500)
|
||||||
|
|
||||||
// The data should be gone
|
// The data should be gone
|
||||||
assertFlushed()
|
assertFlushed()
|
||||||
|
|
|
||||||
|
|
@ -117,10 +117,12 @@ func (c *Container) initWeb() {
|
||||||
|
|
||||||
// initCache initializes the cache
|
// initCache initializes the cache
|
||||||
func (c *Container) initCache() {
|
func (c *Container) initCache() {
|
||||||
var err error
|
store, err := newInMemoryCache(c.Config.Cache.Capacity)
|
||||||
if c.Cache, err = NewCacheClient(c.Config); err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.Cache = NewCacheClient(store)
|
||||||
}
|
}
|
||||||
|
|
||||||
// initDatabase initializes the database
|
// initDatabase initializes the database
|
||||||
|
|
@ -142,24 +144,6 @@ func (c *Container) initDatabase() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func openDB(driver, connection string) (*sql.DB, error) {
|
|
||||||
// Helper to automatically create the directories that the specific sqlite file
|
|
||||||
// should reside in
|
|
||||||
if driver == "sqlite3" {
|
|
||||||
d := strings.Split(connection, "/")
|
|
||||||
|
|
||||||
if len(d) > 1 {
|
|
||||||
path := strings.Join(d[:len(d)-1], "/")
|
|
||||||
|
|
||||||
if err := os.MkdirAll(path, 0755); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return sql.Open(driver, connection)
|
|
||||||
}
|
|
||||||
|
|
||||||
// initORM initializes the ORM
|
// initORM initializes the ORM
|
||||||
func (c *Container) initORM() {
|
func (c *Container) initORM() {
|
||||||
drv := entsql.OpenDB(c.Config.Database.Driver, c.Database)
|
drv := entsql.OpenDB(c.Config.Database.Driver, c.Database)
|
||||||
|
|
@ -198,3 +182,22 @@ func (c *Container) initTasks() {
|
||||||
panic(fmt.Sprintf("failed to create task client: %v", err))
|
panic(fmt.Sprintf("failed to create task client: %v", err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// openDB opens a database connection
|
||||||
|
func openDB(driver, connection string) (*sql.DB, error) {
|
||||||
|
// Helper to automatically create the directories that the specified sqlite file
|
||||||
|
// should reside in, if one
|
||||||
|
if driver == "sqlite3" {
|
||||||
|
d := strings.Split(connection, "/")
|
||||||
|
|
||||||
|
if len(d) > 1 {
|
||||||
|
path := strings.Join(d[:len(d)-1], "/")
|
||||||
|
|
||||||
|
if err := os.MkdirAll(path, 0755); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return sql.Open(driver, connection)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,35 +1,28 @@
|
||||||
package services
|
package services
|
||||||
|
|
||||||
import (
|
//func TestTaskClient_New(t *testing.T) {
|
||||||
"testing"
|
// now := time.Now()
|
||||||
"time"
|
// tk := c.Tasks.
|
||||||
|
// New("task1").
|
||||||
"github.com/stretchr/testify/assert"
|
// Payload("payload").
|
||||||
)
|
// Queue("queue").
|
||||||
|
// Periodic("@every 5s").
|
||||||
func TestTaskClient_New(t *testing.T) {
|
// MaxRetries(5).
|
||||||
now := time.Now()
|
// Timeout(5 * time.Second).
|
||||||
tk := c.Tasks.
|
// Deadline(now).
|
||||||
New("task1").
|
// At(now).
|
||||||
Payload("payload").
|
// Wait(6 * time.Second).
|
||||||
Queue("queue").
|
// Retain(7 * time.Second)
|
||||||
Periodic("@every 5s").
|
//
|
||||||
MaxRetries(5).
|
// assert.Equal(t, "task1", tk.typ)
|
||||||
Timeout(5 * time.Second).
|
// assert.Equal(t, "payload", tk.payload.(string))
|
||||||
Deadline(now).
|
// assert.Equal(t, "queue", *tk.queue)
|
||||||
At(now).
|
// assert.Equal(t, "@every 5s", *tk.periodic)
|
||||||
Wait(6 * time.Second).
|
// assert.Equal(t, 5, *tk.maxRetries)
|
||||||
Retain(7 * time.Second)
|
// assert.Equal(t, 5*time.Second, *tk.timeout)
|
||||||
|
// assert.Equal(t, now, *tk.deadline)
|
||||||
assert.Equal(t, "task1", tk.typ)
|
// assert.Equal(t, now, *tk.at)
|
||||||
assert.Equal(t, "payload", tk.payload.(string))
|
// assert.Equal(t, 6*time.Second, *tk.wait)
|
||||||
assert.Equal(t, "queue", *tk.queue)
|
// assert.Equal(t, 7*time.Second, *tk.retain)
|
||||||
assert.Equal(t, "@every 5s", *tk.periodic)
|
// assert.NoError(t, tk.Save())
|
||||||
assert.Equal(t, 5, *tk.maxRetries)
|
//}
|
||||||
assert.Equal(t, 5*time.Second, *tk.timeout)
|
|
||||||
assert.Equal(t, now, *tk.deadline)
|
|
||||||
assert.Equal(t, now, *tk.at)
|
|
||||||
assert.Equal(t, 6*time.Second, *tk.wait)
|
|
||||||
assert.Equal(t, 7*time.Second, *tk.retain)
|
|
||||||
assert.NoError(t, tk.Save())
|
|
||||||
}
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue