3
0

Rewired db connection to store/rdbms

This commit is contained in:
Denis Arh 2020-09-07 16:43:56 +02:00
parent 75d0853c8d
commit 024e6ce890
16 changed files with 511 additions and 342 deletions

View File

@ -141,10 +141,10 @@ watch.test.%: $(NODEMON)
# codegen: $(PROTOGEN)
codegen: $(CODEGEN)
$(CODEGEN)
@ $(CODEGEN) -v
watch.codegen: $(CODEGEN)
$(CODEGEN) -w -v
@ $(CODEGEN) -w -v
########################################################################################################################
# Quality Assurance

View File

@ -11,7 +11,6 @@ import (
"github.com/cortezaproject/corteza-server/pkg/actionlog"
"github.com/cortezaproject/corteza-server/pkg/auth"
"github.com/cortezaproject/corteza-server/pkg/corredor"
"github.com/cortezaproject/corteza-server/pkg/db"
"github.com/cortezaproject/corteza-server/pkg/eventbus"
"github.com/cortezaproject/corteza-server/pkg/healthcheck"
"github.com/cortezaproject/corteza-server/pkg/http"
@ -23,7 +22,7 @@ import (
"github.com/cortezaproject/corteza-server/provision/compose"
"github.com/cortezaproject/corteza-server/provision/messaging"
"github.com/cortezaproject/corteza-server/provision/system"
"github.com/cortezaproject/corteza-server/store/mysql"
"github.com/cortezaproject/corteza-server/store"
"github.com/cortezaproject/corteza-server/system/auth/external"
sysService "github.com/cortezaproject/corteza-server/system/service"
sysEvent "github.com/cortezaproject/corteza-server/system/service/event"
@ -60,7 +59,6 @@ func (app *CortezaApp) Setup() (err error) {
hcd.Add(scheduler.Healthcheck, "Scheduler")
hcd.Add(mail.Healthcheck, "Mail")
hcd.Add(corredor.Healthcheck, "Corredor")
hcd.Add(db.Healthcheck(), "Database")
if err = sentry.Init(app.Opt.Sentry); err != nil {
return fmt.Errorf("could not initialize Sentry: %w", err)
@ -108,12 +106,15 @@ func (app *CortezaApp) InitStore(ctx context.Context) (err error) {
return err
}
defer sentry.Recover()
// Do not re-initialize store
// This will make integration test setup a bit more painless
if app.Store == nil {
defer sentry.Recover()
// @todo this should be configurable
app.Store, err = mysql.New(ctx, app.Opt.DB.DSN)
if err != nil {
return err
app.Store, err = store.Connect(ctx, app.Opt.DB.DSN)
if err != nil {
return err
}
}
if upgradableStore, ok := app.Store.(storeUpgrader); !ok {
@ -138,13 +139,6 @@ func (app *CortezaApp) InitStore(ctx context.Context) (err error) {
}
}
// deprecated connector
// current state of Corteza (repos...) still requires it
_, err = db.TryToConnect(ctx, app.Log, app.Opt.DB)
if err != nil {
return fmt.Errorf("could not connect to database: %w", err)
}
app.lvl = bootLevelStoreInitialized
return nil
}

9
app/store.go Normal file
View File

@ -0,0 +1,9 @@
package app
// Registers all supported store backends
//
// SQLite is intentionally ignored here
import (
_ "github.com/cortezaproject/corteza-server/store/mysql"
_ "github.com/cortezaproject/corteza-server/store/pgsql"
)

View File

@ -1,132 +0,0 @@
package db
import (
"context"
"regexp"
"strings"
"time"
"github.com/pkg/errors"
"github.com/titpetric/factory"
"github.com/titpetric/factory/logger"
"go.uber.org/zap"
"github.com/cortezaproject/corteza-server/pkg/options"
"github.com/cortezaproject/corteza-server/pkg/sentry"
)
var (
dsnMasker = regexp.MustCompile("(.)(?:.*)(.):(.)(?:.*)(.)@")
)
func TryToConnect(ctx context.Context, log *zap.Logger, opt options.DBOpt) (db *factory.DB, err error) {
if opt.DSN == "" {
err = errors.Errorf("invalid or empty DSN: %q", opt.DSN)
return
}
name := "default"
// Trim off the schema, as titpetric/factory does not support it
if strings.HasPrefix(opt.DSN, "mysql://") {
opt.DSN = opt.DSN[8:]
}
factory.Database.Add(name, factory.DatabaseCredential{DSN: opt.DSN, DriverName: "mysql"})
var (
connErrCh = make(chan error, 1)
)
// We'll not add this to the general log because we do not want to carry it with us for every query.
dsnField := zap.String("dsn", dsnMasker.ReplaceAllString(opt.DSN, "$1****$2:$3****$4@"))
// This logger is also used inside profiler
log = log.Named("database").With(zap.String("name", name))
defer close(connErrCh)
log.Debug("connecting to the database",
dsnField,
zap.Int("tries", opt.MaxTries),
zap.Duration("delay", opt.Delay),
zap.Duration("timeout", opt.Timeout))
go func() {
defer sentry.Recover()
var (
try = 0
)
for {
try++
if opt.MaxTries <= try {
err = errors.Errorf("could not connect to %q, in %d tries", name, try)
return
}
db, err = factory.Database.Get(name)
if err != nil {
log.Warn(
"could not connect to the database",
zap.Error(err),
zap.Int("try", try),
dsnField,
zap.Float64("delay", opt.Delay.Seconds()),
)
select {
case <-ctx.Done():
// Forced break
break
case <-time.After(opt.Delay):
// Wait before next try
continue
}
}
// hardcoded values for POC
// @todo make this configurable, same as other options (DB_*)
db.SetConnMaxLifetime(10 * time.Minute)
db.SetMaxOpenConns(256)
db.SetMaxIdleConns(32)
log.Debug("connected to the database", dsnField)
// Connected
break
}
connErrCh <- err
}()
select {
case err = <-connErrCh:
break
case <-time.After(opt.Timeout):
// Wait before next try
return nil, errors.Errorf("db init for %q timedout", name)
case <-ctx.Done():
return nil, errors.Errorf("db connection for %q cancelled", name)
}
if opt.Logger {
db.SetLogger(
NewZapLogger(
// Skip 3 levels in call stack to get to the actual function used
log.WithOptions(zap.AddCallerSkip(3)),
),
)
} else {
db.SetLogger(logger.Silent{})
}
if err != nil {
return nil, err
}
return db, nil
}

View File

@ -1,23 +0,0 @@
package db
import (
"context"
"github.com/titpetric/factory"
)
func Healthcheck() func(ctx context.Context) error {
const name = "default"
return func(ctx context.Context) error {
db, err := factory.Database.Get(name)
if err != nil {
return err
}
err = db.PingContext(ctx)
if err != nil {
return err
}
return nil
}
}

View File

@ -1,36 +0,0 @@
package db
import (
"context"
dbLogger "github.com/titpetric/factory/logger"
"go.uber.org/zap"
"github.com/cortezaproject/corteza-server/pkg/logger"
)
type (
zapLogger struct {
logger *zap.Logger
}
)
func NewZapLogger(logger *zap.Logger) *zapLogger {
return &zapLogger{
logger: logger,
}
}
func (z *zapLogger) Log(ctx context.Context, msg string, fields ...dbLogger.Field) {
// @todo when factory.DatabaseProfilerContext gets access to context from
// db functions, try to extract RequestID with middleware.GetReqID()
zapFields := []zap.Field{}
for _, v := range fields {
zapFields = append(zapFields, zap.Any(v.Name(), v.Value()))
}
logger.
AddRequestID(ctx, z.logger).
Debug(msg, zapFields...)
}

View File

@ -7,11 +7,7 @@ import (
type (
DBOpt struct {
DSN string `env:"DB_DSN"`
Logger bool `env:"DB_LOGGER"`
MaxTries int `env:"DB_MAX_TRIES"`
Delay time.Duration `env:"DB_CONN_ERR_DELAY"`
Timeout time.Duration `env:"DB_CONN_TIMEOUT"`
DSN string `env:"DB_DSN"`
}
)
@ -20,11 +16,7 @@ func DB(pfix string) (o *DBOpt) {
const maxTries = 100
o = &DBOpt{
DSN: "mysql://corteza:corteza@tcp(db:3306)/corteza?collation=utf8mb4_general_ci",
Logger: false,
MaxTries: maxTries,
Delay: delay,
Timeout: maxTries * delay,
DSN: "mysql://corteza:corteza@tcp(db:3306)/corteza?collation=utf8mb4_general_ci",
}
fill(o)

View File

@ -7,7 +7,22 @@ COVER_MODE ?= count
COVER_PROFILE ?= .cover.out
COVER_FLAGS ?= -covermode=$(COVER_MODE) -coverprofile=$(COVER_PROFILE)
# Run go test cmd with flags, eg:
# $> TEST_FLAGS="-v" make test
# $> TEST_FLAGS="-v -run 'Test_Store/.+/ComposeCharts'" make test
TEST_FLAGS ?=
cover:
$(GOTEST) -coverprofile=$(COVER_PROFILE) -coverpkg=./... ./tests/...
$(GOTOOL) cover -html=$(COVER_PROFILE)
test.store:
$(GOTEST) $(TEST_FLAGS) ./tests/...
test.cover.store:
@ $(GOTEST) $(TEST_FLAGS) -coverprofile=$(COVER_PROFILE) -coverpkg=./... ./tests/...
@ $(GOTOOL) cover -html=$(COVER_PROFILE)
@ $(GOTOOL) cover
@ rm $(COVER_PROFILE)
watch.codegen:
@ make -C ../ $@
codegen:
@ make -C ../ $@

View File

@ -3,6 +3,14 @@
Provides unified storage for Corteza with composable and interchangable backends.
Backends can be any RDBMS, Key-Value, NoSQL, FS, Memory.
== Development
====
@todo definition (YAML) structure
@todo how to add definitions
@todo how to add stores
====
== FAQ:
Why are we changing create/update function signature (input struct is no longer returned)?::
@ -19,6 +27,20 @@ Separation of concerns
+ consistency with store backends that do not support db tags
+ de-cluttering types* namespace
== Testing
Running store tests:
====
make test.store
====
Running store tests with code coverage:
====
make test.store
====
See Makefile for details
== Known issues
SQLite, transactions & locking::

36
store/connect.go Normal file
View File

@ -0,0 +1,36 @@
package store
import (
"context"
"fmt"
"strings"
)
type (
ConnectorFn func(ctx context.Context, dsn string) (s Storable, err error)
)
var (
registered = make(map[string]ConnectorFn)
)
func Connect(ctx context.Context, dsn string) (s Storable, err error) {
var storeType = strings.SplitN(dsn, "://", 2)[0]
if storeType == "" {
// Backward compatibility
storeType = "mysql"
}
if conn, ok := registered[storeType]; ok {
return conn(ctx, dsn)
} else {
return nil, fmt.Errorf("unknown store type used: %q (check your storage configuration)", storeType)
}
}
// Register add on ore more store types and their connector fn
func Register(fn ConnectorFn, tt ...string) {
for _, t := range tt {
registered[t] = fn
}
}

View File

@ -18,8 +18,17 @@ type (
}
)
func New(ctx context.Context, dsn string) (s *Store, err error) {
var cfg *rdbms.Config
func init() {
store.Register(Connect, "mysql")
}
func Connect(ctx context.Context, dsn string) (store.Storable, error) {
var (
err error
cfg *rdbms.Config
s = new(Store)
)
if cfg, err = ProcDataSourceName(dsn); err != nil {
return nil, err
@ -31,8 +40,7 @@ func New(ctx context.Context, dsn string) (s *Store, err error) {
cfg.UpsertBuilder = UpsertBuilder
cfg.CastModuleFieldToColumnType = fieldToColumnTypeCaster
s = new(Store)
if s.Store, err = rdbms.New(ctx, cfg); err != nil {
if s.Store, err = rdbms.Connect(ctx, cfg); err != nil {
return nil, err
}

View File

@ -18,8 +18,17 @@ type (
}
)
func New(ctx context.Context, dsn string) (s *Store, err error) {
var cfg *rdbms.Config
func init() {
store.Register(Connect, "postgresql", "postgres", "pgsql")
}
func Connect(ctx context.Context, dsn string) (store.Storable, error) {
var (
err error
cfg *rdbms.Config
s = new(Store)
)
if cfg, err = ProcDataSourceName(dsn); err != nil {
return nil, err
@ -30,8 +39,7 @@ func New(ctx context.Context, dsn string) (s *Store, err error) {
cfg.SqlFunctionHandler = sqlFunctionHandler
cfg.CastModuleFieldToColumnType = fieldToColumnTypeCaster
s = new(Store)
if s.Store, err = rdbms.New(ctx, cfg); err != nil {
if s.Store, err = rdbms.Connect(ctx, cfg); err != nil {
return nil, err
}

View File

@ -6,11 +6,15 @@ import (
"fmt"
"github.com/Masterminds/squirrel"
"github.com/cortezaproject/corteza-server/pkg/filter"
"github.com/cortezaproject/corteza-server/pkg/healthcheck"
"github.com/cortezaproject/corteza-server/pkg/logger"
"github.com/cortezaproject/corteza-server/pkg/ql"
"github.com/cortezaproject/corteza-server/pkg/sentry"
"github.com/cortezaproject/corteza-server/pkg/slice"
"github.com/cortezaproject/corteza-server/store"
"github.com/cortezaproject/corteza-server/store/rdbms/ddl"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
"strings"
"time"
)
@ -26,80 +30,12 @@ import (
//
type (
txRetryOnErrHandler func(int, error) bool
columnPreprocFn func(string, string) string
valuePreprocFn func(interface{}, string) interface{}
errorHandler func(error) error
triggerKey string
schemaUpgradeGenerator interface {
TableExists(string) bool
CreateTable(t *ddl.Table) string
CreateIndexes(ii ...*ddl.Index) []string
}
rowScanner interface {
Scan(...interface{}) error
}
TriggerHandlers map[triggerKey]interface{}
Config struct {
DriverName string
DataSourceName string
DBName string
PlaceholderFormat squirrel.PlaceholderFormat
// These 3 are passed directly to connection
MaxOpenConns int
ConnMaxLifetime time.Duration
MaxIdleConns int
// Disable transactions
TxDisabled bool
// How many times should we retry failed transaction?
TxMaxRetries int
// TxRetryErrHandler should return true if transaction should be retried
//
// Because retry algorithm varies between concrete rdbms implementations
//
// Handler must return true if failed transaction should be replied
// and false if we're safe to terminate it
TxRetryErrHandler txRetryOnErrHandler
ColumnPreprocessors map[string]columnPreprocFn
ValuePreprocessors map[string]valuePreprocFn
ErrorHandler errorHandler
// Implementations can override internal RDBMS row scanners
RowScanners map[string]interface{}
// Different store backend implementation might handle upsert differently...
UpsertBuilder func(*Config, string, store.Payload, ...string) (squirrel.InsertBuilder, error)
// TriggerHandlers handle various exceptions that can not be handled generally within RDBMS package.
// see triggerKey type and defined constants to see where the hooks are and how can they be called
TriggerHandlers TriggerHandlers
// UniqueConstraintCheck flag controls if unique constraints should be explicitly checked within
// store or is this handled inside the storage
//
//
UniqueConstraintCheck bool
// FunctionHandler takes care of translation & transformation of (sql) functions
// and their parameters
//
// Functions are used in filters and aggregations
SqlFunctionHandler func(f ql.Function) (ql.ASTNode, error)
CastModuleFieldToColumnType func(field ModuleFieldTypeDetector, ident ql.Ident) (ql.Ident, error)
}
Store struct {
config *Config
@ -131,7 +67,7 @@ type (
)
const (
// This is the absolute maximum retries we'll allow
// TxRetryHardLimit is the absolute maximum retries we'll allow
TxRetryHardLimit = 100
DefaultSliceCapacity = 1000
@ -140,42 +76,17 @@ const (
MaxRefetches = 100
)
func New(ctx context.Context, cfg *Config) (*Store, error) {
var s = &Store{
func Connect(ctx context.Context, cfg *Config) (s *Store, err error) {
if err = cfg.ParseExtra(); err != nil {
return nil, err
}
cfg.SetDefaults()
s = &Store{
config: cfg,
}
if s.config.PlaceholderFormat == nil {
s.config.PlaceholderFormat = squirrel.Question
}
if s.config.TxMaxRetries == 0 {
s.config.TxMaxRetries = TxRetryHardLimit
}
if s.config.TxRetryErrHandler == nil {
// Default transaction retry handler
s.config.TxRetryErrHandler = TxNoRetry
}
if s.config.ErrorHandler == nil {
s.config.ErrorHandler = ErrHandlerFallthrough
}
if s.config.UpsertBuilder == nil {
s.config.UpsertBuilder = UpsertBuilder
}
if s.config.MaxIdleConns == 0 {
// Same as default in the db/sql
s.config.MaxIdleConns = 2
}
if s.config.TriggerHandlers == nil {
s.config.TriggerHandlers = TriggerHandlers{}
}
if err := s.Connect(ctx); err != nil {
if err = s.Connect(ctx); err != nil {
return nil, err
}
@ -191,20 +102,111 @@ func (s *Store) withTx(tx dbLayer) *Store {
}
}
// Temporary solution for logging
func (s *Store) log(ctx context.Context) *zap.Logger {
// @todo extract logger from context
return logger.Default().
Named("store.rdbms").
WithOptions(zap.AddStacktrace(zap.FatalLevel))
}
func (s *Store) Connect(ctx context.Context) error {
db, err := sqlx.ConnectContext(ctx, s.config.DriverName, s.config.DataSourceName)
s.log(ctx).Debug("opening connection", zap.String("driver", s.config.DriverName), zap.String("dsn", s.config.MaskedDSN()))
db, err := sqlx.Open(s.config.DriverName, s.config.DataSourceName)
healthcheck.Defaults().Add(dbHealthcheck(db), "Store/RDBMS/"+s.config.DriverName)
if err != nil {
return err
}
s.log(ctx).Debug(
"setting connection parameters",
zap.Int("MaxOpenConns", s.config.MaxOpenConns),
zap.Duration("MaxLifetime", s.config.ConnMaxLifetime),
zap.Int("MaxIdleConns", s.config.MaxIdleConns),
)
db.SetMaxOpenConns(s.config.MaxOpenConns)
db.SetConnMaxLifetime(s.config.ConnMaxLifetime)
db.SetMaxIdleConns(s.config.MaxIdleConns)
if err = s.tryToConnect(ctx, db); err != nil {
return err
}
s.db = db
return err
}
func (s Store) tryToConnect(ctx context.Context, db *sqlx.DB) error {
var (
connErrCh = make(chan error, 1)
patience = time.Now().Add(s.config.ConnTryPatience)
)
//defer close(connErrCh)
go func() {
defer sentry.Recover()
var (
err error
try = 0
)
for {
try++
if s.config.ConnTryMax <= try {
connErrCh <- fmt.Errorf("could not connect in %d tries", try)
return
}
if err = db.PingContext(ctx); err != nil {
if time.Now().After(patience) {
// don't make too much fuss
// if we're in patience mode
s.log(ctx).Warn(
"could not connect to the database",
zap.Error(err),
zap.Int("try", try),
zap.Float64("delay", s.config.ConnTryBackoffDelay.Seconds()),
)
}
select {
case <-ctx.Done():
// Forced break
break
case <-time.After(s.config.ConnTryBackoffDelay):
// Wait before next try
continue
}
}
s.log(ctx).Debug("connected to the database")
break
}
connErrCh <- err
}()
to := s.config.ConnTryTimeout * time.Duration(s.config.ConnTryMax*2)
select {
case err := <-connErrCh:
return err
case <-time.After(to):
// Wait before next try
return fmt.Errorf("timedout after %ds", to.Seconds())
case <-ctx.Done():
return fmt.Errorf("connection cancelled")
}
}
func dbHealthcheck(db *sqlx.DB) func(ctx context.Context) error {
return db.PingContext
}
func (s Store) Query(ctx context.Context, q squirrel.Sqlizer) (*sql.Rows, error) {
query, args, err := q.ToSql()
if err != nil {

271
store/rdbms/rdbms_config.go Normal file
View File

@ -0,0 +1,271 @@
package rdbms
import (
"github.com/Masterminds/squirrel"
"github.com/cortezaproject/corteza-server/pkg/ql"
"github.com/cortezaproject/corteza-server/store"
"net/url"
"regexp"
"strconv"
"strings"
"time"
)
// persistance layer
//
// all functions go under one struct
// why? because it will be easier to initialize and pass around
//
// each domain will be in it's own file
//
// connection logic will be built in the persistence layer (making pkg/db obsolete)
//
type (
txRetryOnErrHandler func(int, error) bool
columnPreprocFn func(string, string) string
valuePreprocFn func(interface{}, string) interface{}
errorHandler func(error) error
triggerKey string
rowScanner interface {
Scan(...interface{}) error
}
TriggerHandlers map[triggerKey]interface{}
Config struct {
DriverName string
DataSourceName string
DBName string
// MaxOpenConns sets maximum number of open connections to the database
// defaults to same value as set in the db/sql
MaxOpenConns int
// ConnMaxLifetime sets the maximum amount of time a connection may be reused
// defaults to same value as set in the db/sql
ConnMaxLifetime time.Duration
// MaxIdleConns sets the maximum number of connections in the idle connection pool
// defaults to same value as set in the db/sql
MaxIdleConns int
// ConnTryPatience sets time window in which we do not complaining about failed connection tries
ConnTryPatience time.Duration
// ConnTryBackoffDelay sets backoff delay after failed try
ConnTryBackoffDelay time.Duration
// ConnTryTimeout sets timeout per try
ConnTryTimeout time.Duration
// ConnTryMax maximum number of retrys for getting the connection
ConnTryMax int
// PlaceholderFormat used by squirrel query generator
PlaceholderFormat squirrel.PlaceholderFormat
// Disable transactions
TxDisabled bool
// How many times should we retry failed transaction?
TxMaxRetries int
// TxRetryErrHandler should return true if transaction should be retried
//
// Because retry algorithm varies between concrete rdbms implementations
//
// Handler must return true if failed transaction should be replied
// and false if we're safe to terminate it
TxRetryErrHandler txRetryOnErrHandler
ColumnPreprocessors map[string]columnPreprocFn
ValuePreprocessors map[string]valuePreprocFn
ErrorHandler errorHandler
// Implementations can override internal RDBMS row scanners
RowScanners map[string]interface{}
// Different store backend implementation might handle upsert differently...
UpsertBuilder func(*Config, string, store.Payload, ...string) (squirrel.InsertBuilder, error)
// TriggerHandlers handle various exceptions that can not be handled generally within RDBMS package.
// see triggerKey type and defined constants to see where the hooks are and how can they be called
TriggerHandlers TriggerHandlers
// UniqueConstraintCheck flag controls if unique constraints should be explicitly checked within
// store or is this handled inside the storage
//
//
UniqueConstraintCheck bool
// FunctionHandler takes care of translation & transformation of (sql) functions
// and their parameters
//
// Functions are used in filters and aggregations
SqlFunctionHandler func(f ql.Function) (ql.ASTNode, error)
CastModuleFieldToColumnType func(field ModuleFieldTypeDetector, ident ql.Ident) (ql.Ident, error)
}
)
var (
dsnMasker = regexp.MustCompile("(.)(?:.*)(.):(.)(?:.*)(.)@")
)
// MaskedDSN replaces username & password from DSN string so it's usable for logging
func (c *Config) MaskedDSN() string {
return dsnMasker.ReplaceAllString(c.DataSourceName, "$1****$2:$3****$4@")
}
func (c *Config) SetDefaults() {
if c.PlaceholderFormat == nil {
c.PlaceholderFormat = squirrel.Question
}
if c.TxMaxRetries == 0 {
c.TxMaxRetries = TxRetryHardLimit
}
if c.TxRetryErrHandler == nil {
// Default transaction retry handler
c.TxRetryErrHandler = TxNoRetry
}
if c.ErrorHandler == nil {
c.ErrorHandler = ErrHandlerFallthrough
}
if c.UpsertBuilder == nil {
c.UpsertBuilder = UpsertBuilder
}
// ** ** ** ** ** ** ** ** ** ** ** ** ** **
if c.MaxIdleConns == 0 {
// Same as default in the db/sql
c.MaxIdleConns = 32
}
if c.MaxOpenConns == 0 {
// Same as default in the db/sql
c.MaxOpenConns = 256
}
if c.ConnMaxLifetime == 0 {
// Same as default in the db/sql
c.ConnMaxLifetime = 10 * time.Minute
}
// ** ** ** ** ** ** ** ** ** ** ** ** ** **
if c.ConnTryPatience == 0 {
c.ConnTryPatience = 1 * time.Minute
}
if c.ConnTryBackoffDelay == 0 {
c.ConnTryBackoffDelay = 10 * time.Second
}
if c.ConnTryTimeout == 0 {
c.ConnTryTimeout = 30 * time.Second
}
if c.ConnTryMax == 0 {
c.ConnTryMax = 99
}
if c.TriggerHandlers == nil {
c.TriggerHandlers = TriggerHandlers{}
}
}
// ParseExtra parses extra params (params starting with *)
// from DSN's querystring (after ?)
func (c *Config) ParseExtra() (err error) {
// Make sure we only got qs
const q = "?"
var (
dsn = c.DataSourceName
qs string
)
if pos := strings.LastIndex(dsn, q); pos == -1 {
return nil
} else {
// Trim qs from DSN, we'll re-attach the remaining params
c.DataSourceName, qs = dsn[:pos], dsn[pos+1:]
}
var vv url.Values
if vv, err = url.ParseQuery(qs); err != nil {
return err
}
var (
val string
parseInt = func(s string) (int, error) {
if tmp, err := strconv.ParseInt(s, 10, 32); err != nil {
return 0, err
} else {
return int(tmp), nil
}
}
)
for key := range vv {
val = vv.Get(key)
switch key {
case "*connTryPatience":
delete(vv, key)
if c.ConnTryPatience, err = time.ParseDuration(val); err != nil {
return
}
case "*connTryBackoffDelay":
delete(vv, key)
if c.ConnTryBackoffDelay, err = time.ParseDuration(val); err != nil {
return
}
case "*connTryTimeout":
delete(vv, key)
if c.ConnTryTimeout, err = time.ParseDuration(val); err != nil {
return
}
case "*connMaxTries":
delete(vv, key)
if c.ConnTryMax, err = parseInt(val); err != nil {
return
}
case "*connMaxOpen":
delete(vv, key)
if c.MaxOpenConns, err = parseInt(val); err != nil {
return
}
case "*connMaxLifetime":
delete(vv, key)
if c.ConnMaxLifetime, err = time.ParseDuration(val); err != nil {
return
}
case "*connMaxIdle":
delete(vv, key)
if c.MaxIdleConns, err = parseInt(val); err != nil {
return
}
}
}
// Encode QS back to DSN
c.DataSourceName += q + vv.Encode()
return nil
}

View File

@ -18,9 +18,13 @@ type (
}
)
func New(ctx context.Context, dsn string) (s *Store, err error) {
var cfg *rdbms.Config
func Connect(ctx context.Context, dsn string) (store.Storable, error) {
var (
err error
cfg *rdbms.Config
s = new(Store)
)
if cfg, err = ProcDataSourceName(dsn); err != nil {
return nil, err
}
@ -35,16 +39,15 @@ func New(ctx context.Context, dsn string) (s *Store, err error) {
cfg.SqlFunctionHandler = sqlFunctionHandler
cfg.CastModuleFieldToColumnType = fieldToColumnTypeCaster
s = new(Store)
if s.Store, err = rdbms.New(ctx, cfg); err != nil {
if s.Store, err = rdbms.Connect(ctx, cfg); err != nil {
return nil, err
}
return s, nil
}
func NewInMemory(ctx context.Context) (s *Store, err error) {
return New(ctx, "sqlite3://file::memory:?cache=shared&mode=memory")
func ConnectInMemory(ctx context.Context) (s store.Storable, err error) {
return Connect(ctx, "sqlite3://file::memory:?cache=shared&mode=memory")
}
func (s *Store) Upgrade(ctx context.Context, log *zap.Logger) (err error) {

View File

@ -30,7 +30,7 @@ func Test_Store(t *testing.T) {
suite struct {
name string
dsnEnvKey string
init func(ctx context.Context, dsn string) (store.Storable, error)
init store.ConnectorFn
}
upgrader interface {
@ -45,12 +45,12 @@ func Test_Store(t *testing.T) {
{
name: "MySQL",
dsnEnvKey: "RDBMS_MYSQL_DSN",
init: func(ctx context.Context, dsn string) (store.Storable, error) { return mysql.New(ctx, dsn) },
init: mysql.Connect,
},
{
name: "PostgreSQL",
dsnEnvKey: "RDBMS_PGSQL_DSN",
init: func(ctx context.Context, dsn string) (store.Storable, error) { return pgsql.New(ctx, dsn) },
init: pgsql.Connect,
},
{
name: "CockroachDB",
@ -60,7 +60,7 @@ func Test_Store(t *testing.T) {
{
name: "SQLite",
dsnEnvKey: "RDBMS_SQLITE_DSN",
init: func(ctx context.Context, dsn string) (store.Storable, error) { return sqlite.New(ctx, dsn) },
init: sqlite.Connect,
},
{
name: "InMemory",