From 024e6ce89018f31f52d0690ffcb089047ce10aa2 Mon Sep 17 00:00:00 2001 From: Denis Arh Date: Mon, 7 Sep 2020 16:43:56 +0200 Subject: [PATCH] Rewired db connection to store/rdbms --- Makefile | 4 +- app/boot_levels.go | 24 ++-- app/store.go | 9 ++ pkg/db/connector.go | 132 ------------------ pkg/db/healthcheck.go | 23 --- pkg/db/logger.go | 36 ----- pkg/options/db.go | 12 +- store/Makefile | 21 ++- store/README.adoc | 22 +++ store/connect.go | 36 +++++ store/mysql/mysql.go | 16 ++- store/pgsql/pgsql.go | 16 ++- store/rdbms/rdbms.go | 208 +++++++++++++-------------- store/rdbms/rdbms_config.go | 271 ++++++++++++++++++++++++++++++++++++ store/sqlite/sqlite.go | 15 +- store/tests/main_test.go | 8 +- 16 files changed, 511 insertions(+), 342 deletions(-) create mode 100644 app/store.go delete mode 100644 pkg/db/connector.go delete mode 100644 pkg/db/healthcheck.go delete mode 100644 pkg/db/logger.go create mode 100644 store/connect.go create mode 100644 store/rdbms/rdbms_config.go diff --git a/Makefile b/Makefile index d3fd6b278..a0358a510 100644 --- a/Makefile +++ b/Makefile @@ -141,10 +141,10 @@ watch.test.%: $(NODEMON) # codegen: $(PROTOGEN) codegen: $(CODEGEN) - $(CODEGEN) + @ $(CODEGEN) -v watch.codegen: $(CODEGEN) - $(CODEGEN) -w -v + @ $(CODEGEN) -w -v ######################################################################################################################## # Quality Assurance diff --git a/app/boot_levels.go b/app/boot_levels.go index 39fd77a26..ec7bcf7d1 100644 --- a/app/boot_levels.go +++ b/app/boot_levels.go @@ -11,7 +11,6 @@ import ( "github.com/cortezaproject/corteza-server/pkg/actionlog" "github.com/cortezaproject/corteza-server/pkg/auth" "github.com/cortezaproject/corteza-server/pkg/corredor" - "github.com/cortezaproject/corteza-server/pkg/db" "github.com/cortezaproject/corteza-server/pkg/eventbus" "github.com/cortezaproject/corteza-server/pkg/healthcheck" "github.com/cortezaproject/corteza-server/pkg/http" @@ -23,7 +22,7 @@ import ( "github.com/cortezaproject/corteza-server/provision/compose" "github.com/cortezaproject/corteza-server/provision/messaging" "github.com/cortezaproject/corteza-server/provision/system" - "github.com/cortezaproject/corteza-server/store/mysql" + "github.com/cortezaproject/corteza-server/store" "github.com/cortezaproject/corteza-server/system/auth/external" sysService "github.com/cortezaproject/corteza-server/system/service" sysEvent "github.com/cortezaproject/corteza-server/system/service/event" @@ -60,7 +59,6 @@ func (app *CortezaApp) Setup() (err error) { hcd.Add(scheduler.Healthcheck, "Scheduler") hcd.Add(mail.Healthcheck, "Mail") hcd.Add(corredor.Healthcheck, "Corredor") - hcd.Add(db.Healthcheck(), "Database") if err = sentry.Init(app.Opt.Sentry); err != nil { return fmt.Errorf("could not initialize Sentry: %w", err) @@ -108,12 +106,15 @@ func (app *CortezaApp) InitStore(ctx context.Context) (err error) { return err } - defer sentry.Recover() + // Do not re-initialize store + // This will make integration test setup a bit more painless + if app.Store == nil { + defer sentry.Recover() - // @todo this should be configurable - app.Store, err = mysql.New(ctx, app.Opt.DB.DSN) - if err != nil { - return err + app.Store, err = store.Connect(ctx, app.Opt.DB.DSN) + if err != nil { + return err + } } if upgradableStore, ok := app.Store.(storeUpgrader); !ok { @@ -138,13 +139,6 @@ func (app *CortezaApp) InitStore(ctx context.Context) (err error) { } } - // deprecated connector - // current state of Corteza (repos...) still requires it - _, err = db.TryToConnect(ctx, app.Log, app.Opt.DB) - if err != nil { - return fmt.Errorf("could not connect to database: %w", err) - } - app.lvl = bootLevelStoreInitialized return nil } diff --git a/app/store.go b/app/store.go new file mode 100644 index 000000000..643d4474c --- /dev/null +++ b/app/store.go @@ -0,0 +1,9 @@ +package app + +// Registers all supported store backends +// +// SQLite is intentionally ignored here +import ( + _ "github.com/cortezaproject/corteza-server/store/mysql" + _ "github.com/cortezaproject/corteza-server/store/pgsql" +) diff --git a/pkg/db/connector.go b/pkg/db/connector.go deleted file mode 100644 index 307e6f036..000000000 --- a/pkg/db/connector.go +++ /dev/null @@ -1,132 +0,0 @@ -package db - -import ( - "context" - "regexp" - "strings" - "time" - - "github.com/pkg/errors" - "github.com/titpetric/factory" - "github.com/titpetric/factory/logger" - "go.uber.org/zap" - - "github.com/cortezaproject/corteza-server/pkg/options" - "github.com/cortezaproject/corteza-server/pkg/sentry" -) - -var ( - dsnMasker = regexp.MustCompile("(.)(?:.*)(.):(.)(?:.*)(.)@") -) - -func TryToConnect(ctx context.Context, log *zap.Logger, opt options.DBOpt) (db *factory.DB, err error) { - if opt.DSN == "" { - err = errors.Errorf("invalid or empty DSN: %q", opt.DSN) - return - } - - name := "default" - - // Trim off the schema, as titpetric/factory does not support it - if strings.HasPrefix(opt.DSN, "mysql://") { - opt.DSN = opt.DSN[8:] - } - - factory.Database.Add(name, factory.DatabaseCredential{DSN: opt.DSN, DriverName: "mysql"}) - - var ( - connErrCh = make(chan error, 1) - ) - - // We'll not add this to the general log because we do not want to carry it with us for every query. - dsnField := zap.String("dsn", dsnMasker.ReplaceAllString(opt.DSN, "$1****$2:$3****$4@")) - - // This logger is also used inside profiler - log = log.Named("database").With(zap.String("name", name)) - - defer close(connErrCh) - - log.Debug("connecting to the database", - dsnField, - zap.Int("tries", opt.MaxTries), - zap.Duration("delay", opt.Delay), - zap.Duration("timeout", opt.Timeout)) - - go func() { - defer sentry.Recover() - - var ( - try = 0 - ) - - for { - try++ - - if opt.MaxTries <= try { - err = errors.Errorf("could not connect to %q, in %d tries", name, try) - return - } - - db, err = factory.Database.Get(name) - if err != nil { - log.Warn( - "could not connect to the database", - zap.Error(err), - zap.Int("try", try), - dsnField, - zap.Float64("delay", opt.Delay.Seconds()), - ) - - select { - case <-ctx.Done(): - // Forced break - break - case <-time.After(opt.Delay): - // Wait before next try - continue - } - } - - // hardcoded values for POC - // @todo make this configurable, same as other options (DB_*) - db.SetConnMaxLifetime(10 * time.Minute) - db.SetMaxOpenConns(256) - db.SetMaxIdleConns(32) - - log.Debug("connected to the database", dsnField) - - // Connected - break - - } - - connErrCh <- err - }() - - select { - case err = <-connErrCh: - break - case <-time.After(opt.Timeout): - // Wait before next try - return nil, errors.Errorf("db init for %q timedout", name) - case <-ctx.Done(): - return nil, errors.Errorf("db connection for %q cancelled", name) - } - - if opt.Logger { - db.SetLogger( - NewZapLogger( - // Skip 3 levels in call stack to get to the actual function used - log.WithOptions(zap.AddCallerSkip(3)), - ), - ) - } else { - db.SetLogger(logger.Silent{}) - } - - if err != nil { - return nil, err - } - - return db, nil -} diff --git a/pkg/db/healthcheck.go b/pkg/db/healthcheck.go deleted file mode 100644 index c1861e8fa..000000000 --- a/pkg/db/healthcheck.go +++ /dev/null @@ -1,23 +0,0 @@ -package db - -import ( - "context" - "github.com/titpetric/factory" -) - -func Healthcheck() func(ctx context.Context) error { - const name = "default" - return func(ctx context.Context) error { - db, err := factory.Database.Get(name) - if err != nil { - return err - } - - err = db.PingContext(ctx) - if err != nil { - return err - } - - return nil - } -} diff --git a/pkg/db/logger.go b/pkg/db/logger.go deleted file mode 100644 index 04a89dfeb..000000000 --- a/pkg/db/logger.go +++ /dev/null @@ -1,36 +0,0 @@ -package db - -import ( - "context" - - dbLogger "github.com/titpetric/factory/logger" - "go.uber.org/zap" - - "github.com/cortezaproject/corteza-server/pkg/logger" -) - -type ( - zapLogger struct { - logger *zap.Logger - } -) - -func NewZapLogger(logger *zap.Logger) *zapLogger { - return &zapLogger{ - logger: logger, - } -} - -func (z *zapLogger) Log(ctx context.Context, msg string, fields ...dbLogger.Field) { - // @todo when factory.DatabaseProfilerContext gets access to context from - // db functions, try to extract RequestID with middleware.GetReqID() - - zapFields := []zap.Field{} - for _, v := range fields { - zapFields = append(zapFields, zap.Any(v.Name(), v.Value())) - } - - logger. - AddRequestID(ctx, z.logger). - Debug(msg, zapFields...) -} diff --git a/pkg/options/db.go b/pkg/options/db.go index 12ac0d66a..8afa7adc2 100644 --- a/pkg/options/db.go +++ b/pkg/options/db.go @@ -7,11 +7,7 @@ import ( type ( DBOpt struct { - DSN string `env:"DB_DSN"` - Logger bool `env:"DB_LOGGER"` - MaxTries int `env:"DB_MAX_TRIES"` - Delay time.Duration `env:"DB_CONN_ERR_DELAY"` - Timeout time.Duration `env:"DB_CONN_TIMEOUT"` + DSN string `env:"DB_DSN"` } ) @@ -20,11 +16,7 @@ func DB(pfix string) (o *DBOpt) { const maxTries = 100 o = &DBOpt{ - DSN: "mysql://corteza:corteza@tcp(db:3306)/corteza?collation=utf8mb4_general_ci", - Logger: false, - MaxTries: maxTries, - Delay: delay, - Timeout: maxTries * delay, + DSN: "mysql://corteza:corteza@tcp(db:3306)/corteza?collation=utf8mb4_general_ci", } fill(o) diff --git a/store/Makefile b/store/Makefile index 40fec9acf..c333419de 100644 --- a/store/Makefile +++ b/store/Makefile @@ -7,7 +7,22 @@ COVER_MODE ?= count COVER_PROFILE ?= .cover.out COVER_FLAGS ?= -covermode=$(COVER_MODE) -coverprofile=$(COVER_PROFILE) +# Run go test cmd with flags, eg: +# $> TEST_FLAGS="-v" make test +# $> TEST_FLAGS="-v -run 'Test_Store/.+/ComposeCharts'" make test +TEST_FLAGS ?= -cover: - $(GOTEST) -coverprofile=$(COVER_PROFILE) -coverpkg=./... ./tests/... - $(GOTOOL) cover -html=$(COVER_PROFILE) +test.store: + $(GOTEST) $(TEST_FLAGS) ./tests/... + +test.cover.store: + @ $(GOTEST) $(TEST_FLAGS) -coverprofile=$(COVER_PROFILE) -coverpkg=./... ./tests/... + @ $(GOTOOL) cover -html=$(COVER_PROFILE) + @ $(GOTOOL) cover + @ rm $(COVER_PROFILE) + +watch.codegen: + @ make -C ../ $@ + +codegen: + @ make -C ../ $@ diff --git a/store/README.adoc b/store/README.adoc index f55400dac..76f3b75f3 100644 --- a/store/README.adoc +++ b/store/README.adoc @@ -3,6 +3,14 @@ Provides unified storage for Corteza with composable and interchangable backends. Backends can be any RDBMS, Key-Value, NoSQL, FS, Memory. +== Development + +==== +@todo definition (YAML) structure +@todo how to add definitions +@todo how to add stores +==== + == FAQ: Why are we changing create/update function signature (input struct is no longer returned)?:: @@ -19,6 +27,20 @@ Separation of concerns + consistency with store backends that do not support db tags + de-cluttering types* namespace +== Testing + +Running store tests: +==== +make test.store +==== + +Running store tests with code coverage: +==== +make test.store +==== + +See Makefile for details + == Known issues SQLite, transactions & locking:: diff --git a/store/connect.go b/store/connect.go new file mode 100644 index 000000000..7adb8c490 --- /dev/null +++ b/store/connect.go @@ -0,0 +1,36 @@ +package store + +import ( + "context" + "fmt" + "strings" +) + +type ( + ConnectorFn func(ctx context.Context, dsn string) (s Storable, err error) +) + +var ( + registered = make(map[string]ConnectorFn) +) + +func Connect(ctx context.Context, dsn string) (s Storable, err error) { + var storeType = strings.SplitN(dsn, "://", 2)[0] + if storeType == "" { + // Backward compatibility + storeType = "mysql" + } + + if conn, ok := registered[storeType]; ok { + return conn(ctx, dsn) + } else { + return nil, fmt.Errorf("unknown store type used: %q (check your storage configuration)", storeType) + } +} + +// Register add on ore more store types and their connector fn +func Register(fn ConnectorFn, tt ...string) { + for _, t := range tt { + registered[t] = fn + } +} diff --git a/store/mysql/mysql.go b/store/mysql/mysql.go index 43439715a..c75eca4e3 100644 --- a/store/mysql/mysql.go +++ b/store/mysql/mysql.go @@ -18,8 +18,17 @@ type ( } ) -func New(ctx context.Context, dsn string) (s *Store, err error) { - var cfg *rdbms.Config +func init() { + store.Register(Connect, "mysql") +} + +func Connect(ctx context.Context, dsn string) (store.Storable, error) { + var ( + err error + cfg *rdbms.Config + + s = new(Store) + ) if cfg, err = ProcDataSourceName(dsn); err != nil { return nil, err @@ -31,8 +40,7 @@ func New(ctx context.Context, dsn string) (s *Store, err error) { cfg.UpsertBuilder = UpsertBuilder cfg.CastModuleFieldToColumnType = fieldToColumnTypeCaster - s = new(Store) - if s.Store, err = rdbms.New(ctx, cfg); err != nil { + if s.Store, err = rdbms.Connect(ctx, cfg); err != nil { return nil, err } diff --git a/store/pgsql/pgsql.go b/store/pgsql/pgsql.go index 3df91a1de..8b92307e5 100644 --- a/store/pgsql/pgsql.go +++ b/store/pgsql/pgsql.go @@ -18,8 +18,17 @@ type ( } ) -func New(ctx context.Context, dsn string) (s *Store, err error) { - var cfg *rdbms.Config +func init() { + store.Register(Connect, "postgresql", "postgres", "pgsql") +} + +func Connect(ctx context.Context, dsn string) (store.Storable, error) { + var ( + err error + cfg *rdbms.Config + + s = new(Store) + ) if cfg, err = ProcDataSourceName(dsn); err != nil { return nil, err @@ -30,8 +39,7 @@ func New(ctx context.Context, dsn string) (s *Store, err error) { cfg.SqlFunctionHandler = sqlFunctionHandler cfg.CastModuleFieldToColumnType = fieldToColumnTypeCaster - s = new(Store) - if s.Store, err = rdbms.New(ctx, cfg); err != nil { + if s.Store, err = rdbms.Connect(ctx, cfg); err != nil { return nil, err } diff --git a/store/rdbms/rdbms.go b/store/rdbms/rdbms.go index df3de3d10..5c73b820c 100644 --- a/store/rdbms/rdbms.go +++ b/store/rdbms/rdbms.go @@ -6,11 +6,15 @@ import ( "fmt" "github.com/Masterminds/squirrel" "github.com/cortezaproject/corteza-server/pkg/filter" + "github.com/cortezaproject/corteza-server/pkg/healthcheck" + "github.com/cortezaproject/corteza-server/pkg/logger" "github.com/cortezaproject/corteza-server/pkg/ql" + "github.com/cortezaproject/corteza-server/pkg/sentry" "github.com/cortezaproject/corteza-server/pkg/slice" "github.com/cortezaproject/corteza-server/store" "github.com/cortezaproject/corteza-server/store/rdbms/ddl" "github.com/jmoiron/sqlx" + "go.uber.org/zap" "strings" "time" ) @@ -26,80 +30,12 @@ import ( // type ( - txRetryOnErrHandler func(int, error) bool - columnPreprocFn func(string, string) string - valuePreprocFn func(interface{}, string) interface{} - errorHandler func(error) error - triggerKey string - schemaUpgradeGenerator interface { TableExists(string) bool CreateTable(t *ddl.Table) string CreateIndexes(ii ...*ddl.Index) []string } - rowScanner interface { - Scan(...interface{}) error - } - - TriggerHandlers map[triggerKey]interface{} - - Config struct { - DriverName string - DataSourceName string - DBName string - - PlaceholderFormat squirrel.PlaceholderFormat - - // These 3 are passed directly to connection - MaxOpenConns int - ConnMaxLifetime time.Duration - MaxIdleConns int - - // Disable transactions - TxDisabled bool - - // How many times should we retry failed transaction? - TxMaxRetries int - - // TxRetryErrHandler should return true if transaction should be retried - // - // Because retry algorithm varies between concrete rdbms implementations - // - // Handler must return true if failed transaction should be replied - // and false if we're safe to terminate it - TxRetryErrHandler txRetryOnErrHandler - - ColumnPreprocessors map[string]columnPreprocFn - ValuePreprocessors map[string]valuePreprocFn - - ErrorHandler errorHandler - - // Implementations can override internal RDBMS row scanners - RowScanners map[string]interface{} - - // Different store backend implementation might handle upsert differently... - UpsertBuilder func(*Config, string, store.Payload, ...string) (squirrel.InsertBuilder, error) - - // TriggerHandlers handle various exceptions that can not be handled generally within RDBMS package. - // see triggerKey type and defined constants to see where the hooks are and how can they be called - TriggerHandlers TriggerHandlers - - // UniqueConstraintCheck flag controls if unique constraints should be explicitly checked within - // store or is this handled inside the storage - // - // - UniqueConstraintCheck bool - - // FunctionHandler takes care of translation & transformation of (sql) functions - // and their parameters - // - // Functions are used in filters and aggregations - SqlFunctionHandler func(f ql.Function) (ql.ASTNode, error) - - CastModuleFieldToColumnType func(field ModuleFieldTypeDetector, ident ql.Ident) (ql.Ident, error) - } - Store struct { config *Config @@ -131,7 +67,7 @@ type ( ) const ( - // This is the absolute maximum retries we'll allow + // TxRetryHardLimit is the absolute maximum retries we'll allow TxRetryHardLimit = 100 DefaultSliceCapacity = 1000 @@ -140,42 +76,17 @@ const ( MaxRefetches = 100 ) -func New(ctx context.Context, cfg *Config) (*Store, error) { - var s = &Store{ +func Connect(ctx context.Context, cfg *Config) (s *Store, err error) { + if err = cfg.ParseExtra(); err != nil { + return nil, err + } + + cfg.SetDefaults() + s = &Store{ config: cfg, } - if s.config.PlaceholderFormat == nil { - s.config.PlaceholderFormat = squirrel.Question - } - - if s.config.TxMaxRetries == 0 { - s.config.TxMaxRetries = TxRetryHardLimit - } - - if s.config.TxRetryErrHandler == nil { - // Default transaction retry handler - s.config.TxRetryErrHandler = TxNoRetry - } - - if s.config.ErrorHandler == nil { - s.config.ErrorHandler = ErrHandlerFallthrough - } - - if s.config.UpsertBuilder == nil { - s.config.UpsertBuilder = UpsertBuilder - } - - if s.config.MaxIdleConns == 0 { - // Same as default in the db/sql - s.config.MaxIdleConns = 2 - } - - if s.config.TriggerHandlers == nil { - s.config.TriggerHandlers = TriggerHandlers{} - } - - if err := s.Connect(ctx); err != nil { + if err = s.Connect(ctx); err != nil { return nil, err } @@ -191,20 +102,111 @@ func (s *Store) withTx(tx dbLayer) *Store { } } +// Temporary solution for logging +func (s *Store) log(ctx context.Context) *zap.Logger { + // @todo extract logger from context + return logger.Default(). + Named("store.rdbms"). + WithOptions(zap.AddStacktrace(zap.FatalLevel)) +} + func (s *Store) Connect(ctx context.Context) error { - db, err := sqlx.ConnectContext(ctx, s.config.DriverName, s.config.DataSourceName) + s.log(ctx).Debug("opening connection", zap.String("driver", s.config.DriverName), zap.String("dsn", s.config.MaskedDSN())) + db, err := sqlx.Open(s.config.DriverName, s.config.DataSourceName) + healthcheck.Defaults().Add(dbHealthcheck(db), "Store/RDBMS/"+s.config.DriverName) if err != nil { return err } + s.log(ctx).Debug( + "setting connection parameters", + zap.Int("MaxOpenConns", s.config.MaxOpenConns), + zap.Duration("MaxLifetime", s.config.ConnMaxLifetime), + zap.Int("MaxIdleConns", s.config.MaxIdleConns), + ) + db.SetMaxOpenConns(s.config.MaxOpenConns) db.SetConnMaxLifetime(s.config.ConnMaxLifetime) db.SetMaxIdleConns(s.config.MaxIdleConns) + if err = s.tryToConnect(ctx, db); err != nil { + return err + } + s.db = db return err } +func (s Store) tryToConnect(ctx context.Context, db *sqlx.DB) error { + var ( + connErrCh = make(chan error, 1) + patience = time.Now().Add(s.config.ConnTryPatience) + ) + + //defer close(connErrCh) + + go func() { + defer sentry.Recover() + + var ( + err error + try = 0 + ) + + for { + try++ + + if s.config.ConnTryMax <= try { + connErrCh <- fmt.Errorf("could not connect in %d tries", try) + return + } + + if err = db.PingContext(ctx); err != nil { + + if time.Now().After(patience) { + // don't make too much fuss + // if we're in patience mode + s.log(ctx).Warn( + "could not connect to the database", + zap.Error(err), + zap.Int("try", try), + zap.Float64("delay", s.config.ConnTryBackoffDelay.Seconds()), + ) + } + + select { + case <-ctx.Done(): + // Forced break + break + case <-time.After(s.config.ConnTryBackoffDelay): + // Wait before next try + continue + } + } + + s.log(ctx).Debug("connected to the database") + break + } + + connErrCh <- err + }() + + to := s.config.ConnTryTimeout * time.Duration(s.config.ConnTryMax*2) + select { + case err := <-connErrCh: + return err + case <-time.After(to): + // Wait before next try + return fmt.Errorf("timedout after %ds", to.Seconds()) + case <-ctx.Done(): + return fmt.Errorf("connection cancelled") + } +} + +func dbHealthcheck(db *sqlx.DB) func(ctx context.Context) error { + return db.PingContext +} + func (s Store) Query(ctx context.Context, q squirrel.Sqlizer) (*sql.Rows, error) { query, args, err := q.ToSql() if err != nil { diff --git a/store/rdbms/rdbms_config.go b/store/rdbms/rdbms_config.go new file mode 100644 index 000000000..a40f685b5 --- /dev/null +++ b/store/rdbms/rdbms_config.go @@ -0,0 +1,271 @@ +package rdbms + +import ( + "github.com/Masterminds/squirrel" + "github.com/cortezaproject/corteza-server/pkg/ql" + "github.com/cortezaproject/corteza-server/store" + "net/url" + "regexp" + "strconv" + "strings" + "time" +) + +// persistance layer +// +// all functions go under one struct +// why? because it will be easier to initialize and pass around +// +// each domain will be in it's own file +// +// connection logic will be built in the persistence layer (making pkg/db obsolete) +// + +type ( + txRetryOnErrHandler func(int, error) bool + columnPreprocFn func(string, string) string + valuePreprocFn func(interface{}, string) interface{} + errorHandler func(error) error + triggerKey string + + rowScanner interface { + Scan(...interface{}) error + } + + TriggerHandlers map[triggerKey]interface{} + + Config struct { + DriverName string + DataSourceName string + DBName string + + // MaxOpenConns sets maximum number of open connections to the database + // defaults to same value as set in the db/sql + MaxOpenConns int + + // ConnMaxLifetime sets the maximum amount of time a connection may be reused + // defaults to same value as set in the db/sql + ConnMaxLifetime time.Duration + + // MaxIdleConns sets the maximum number of connections in the idle connection pool + // defaults to same value as set in the db/sql + MaxIdleConns int + + // ConnTryPatience sets time window in which we do not complaining about failed connection tries + ConnTryPatience time.Duration + + // ConnTryBackoffDelay sets backoff delay after failed try + ConnTryBackoffDelay time.Duration + + // ConnTryTimeout sets timeout per try + ConnTryTimeout time.Duration + + // ConnTryMax maximum number of retrys for getting the connection + ConnTryMax int + + // PlaceholderFormat used by squirrel query generator + PlaceholderFormat squirrel.PlaceholderFormat + + // Disable transactions + TxDisabled bool + + // How many times should we retry failed transaction? + TxMaxRetries int + + // TxRetryErrHandler should return true if transaction should be retried + // + // Because retry algorithm varies between concrete rdbms implementations + // + // Handler must return true if failed transaction should be replied + // and false if we're safe to terminate it + TxRetryErrHandler txRetryOnErrHandler + + ColumnPreprocessors map[string]columnPreprocFn + ValuePreprocessors map[string]valuePreprocFn + + ErrorHandler errorHandler + + // Implementations can override internal RDBMS row scanners + RowScanners map[string]interface{} + + // Different store backend implementation might handle upsert differently... + UpsertBuilder func(*Config, string, store.Payload, ...string) (squirrel.InsertBuilder, error) + + // TriggerHandlers handle various exceptions that can not be handled generally within RDBMS package. + // see triggerKey type and defined constants to see where the hooks are and how can they be called + TriggerHandlers TriggerHandlers + + // UniqueConstraintCheck flag controls if unique constraints should be explicitly checked within + // store or is this handled inside the storage + // + // + UniqueConstraintCheck bool + + // FunctionHandler takes care of translation & transformation of (sql) functions + // and their parameters + // + // Functions are used in filters and aggregations + SqlFunctionHandler func(f ql.Function) (ql.ASTNode, error) + + CastModuleFieldToColumnType func(field ModuleFieldTypeDetector, ident ql.Ident) (ql.Ident, error) + } +) + +var ( + dsnMasker = regexp.MustCompile("(.)(?:.*)(.):(.)(?:.*)(.)@") +) + +// MaskedDSN replaces username & password from DSN string so it's usable for logging +func (c *Config) MaskedDSN() string { + return dsnMasker.ReplaceAllString(c.DataSourceName, "$1****$2:$3****$4@") +} + +func (c *Config) SetDefaults() { + if c.PlaceholderFormat == nil { + c.PlaceholderFormat = squirrel.Question + } + + if c.TxMaxRetries == 0 { + c.TxMaxRetries = TxRetryHardLimit + } + + if c.TxRetryErrHandler == nil { + // Default transaction retry handler + c.TxRetryErrHandler = TxNoRetry + } + + if c.ErrorHandler == nil { + c.ErrorHandler = ErrHandlerFallthrough + } + + if c.UpsertBuilder == nil { + c.UpsertBuilder = UpsertBuilder + } + + // ** ** ** ** ** ** ** ** ** ** ** ** ** ** + + if c.MaxIdleConns == 0 { + // Same as default in the db/sql + c.MaxIdleConns = 32 + } + + if c.MaxOpenConns == 0 { + // Same as default in the db/sql + c.MaxOpenConns = 256 + } + + if c.ConnMaxLifetime == 0 { + // Same as default in the db/sql + c.ConnMaxLifetime = 10 * time.Minute + } + + // ** ** ** ** ** ** ** ** ** ** ** ** ** ** + + if c.ConnTryPatience == 0 { + c.ConnTryPatience = 1 * time.Minute + } + + if c.ConnTryBackoffDelay == 0 { + c.ConnTryBackoffDelay = 10 * time.Second + } + + if c.ConnTryTimeout == 0 { + c.ConnTryTimeout = 30 * time.Second + } + + if c.ConnTryMax == 0 { + c.ConnTryMax = 99 + } + + if c.TriggerHandlers == nil { + c.TriggerHandlers = TriggerHandlers{} + } +} + +// ParseExtra parses extra params (params starting with *) +// from DSN's querystring (after ?) +func (c *Config) ParseExtra() (err error) { + // Make sure we only got qs + const q = "?" + var ( + dsn = c.DataSourceName + qs string + ) + + if pos := strings.LastIndex(dsn, q); pos == -1 { + return nil + } else { + // Trim qs from DSN, we'll re-attach the remaining params + c.DataSourceName, qs = dsn[:pos], dsn[pos+1:] + } + + var vv url.Values + if vv, err = url.ParseQuery(qs); err != nil { + return err + } + + var ( + val string + + parseInt = func(s string) (int, error) { + if tmp, err := strconv.ParseInt(s, 10, 32); err != nil { + return 0, err + } else { + return int(tmp), nil + } + + } + ) + + for key := range vv { + val = vv.Get(key) + switch key { + case "*connTryPatience": + delete(vv, key) + if c.ConnTryPatience, err = time.ParseDuration(val); err != nil { + return + } + + case "*connTryBackoffDelay": + delete(vv, key) + if c.ConnTryBackoffDelay, err = time.ParseDuration(val); err != nil { + return + } + + case "*connTryTimeout": + delete(vv, key) + if c.ConnTryTimeout, err = time.ParseDuration(val); err != nil { + return + } + + case "*connMaxTries": + delete(vv, key) + if c.ConnTryMax, err = parseInt(val); err != nil { + return + } + + case "*connMaxOpen": + delete(vv, key) + if c.MaxOpenConns, err = parseInt(val); err != nil { + return + } + + case "*connMaxLifetime": + delete(vv, key) + if c.ConnMaxLifetime, err = time.ParseDuration(val); err != nil { + return + } + + case "*connMaxIdle": + delete(vv, key) + if c.MaxIdleConns, err = parseInt(val); err != nil { + return + } + } + } + + // Encode QS back to DSN + c.DataSourceName += q + vv.Encode() + + return nil +} diff --git a/store/sqlite/sqlite.go b/store/sqlite/sqlite.go index 8bf3c54d7..96d64ccc8 100644 --- a/store/sqlite/sqlite.go +++ b/store/sqlite/sqlite.go @@ -18,9 +18,13 @@ type ( } ) -func New(ctx context.Context, dsn string) (s *Store, err error) { - var cfg *rdbms.Config +func Connect(ctx context.Context, dsn string) (store.Storable, error) { + var ( + err error + cfg *rdbms.Config + s = new(Store) + ) if cfg, err = ProcDataSourceName(dsn); err != nil { return nil, err } @@ -35,16 +39,15 @@ func New(ctx context.Context, dsn string) (s *Store, err error) { cfg.SqlFunctionHandler = sqlFunctionHandler cfg.CastModuleFieldToColumnType = fieldToColumnTypeCaster - s = new(Store) - if s.Store, err = rdbms.New(ctx, cfg); err != nil { + if s.Store, err = rdbms.Connect(ctx, cfg); err != nil { return nil, err } return s, nil } -func NewInMemory(ctx context.Context) (s *Store, err error) { - return New(ctx, "sqlite3://file::memory:?cache=shared&mode=memory") +func ConnectInMemory(ctx context.Context) (s store.Storable, err error) { + return Connect(ctx, "sqlite3://file::memory:?cache=shared&mode=memory") } func (s *Store) Upgrade(ctx context.Context, log *zap.Logger) (err error) { diff --git a/store/tests/main_test.go b/store/tests/main_test.go index 79b8d2af5..5ec914a5b 100644 --- a/store/tests/main_test.go +++ b/store/tests/main_test.go @@ -30,7 +30,7 @@ func Test_Store(t *testing.T) { suite struct { name string dsnEnvKey string - init func(ctx context.Context, dsn string) (store.Storable, error) + init store.ConnectorFn } upgrader interface { @@ -45,12 +45,12 @@ func Test_Store(t *testing.T) { { name: "MySQL", dsnEnvKey: "RDBMS_MYSQL_DSN", - init: func(ctx context.Context, dsn string) (store.Storable, error) { return mysql.New(ctx, dsn) }, + init: mysql.Connect, }, { name: "PostgreSQL", dsnEnvKey: "RDBMS_PGSQL_DSN", - init: func(ctx context.Context, dsn string) (store.Storable, error) { return pgsql.New(ctx, dsn) }, + init: pgsql.Connect, }, { name: "CockroachDB", @@ -60,7 +60,7 @@ func Test_Store(t *testing.T) { { name: "SQLite", dsnEnvKey: "RDBMS_SQLITE_DSN", - init: func(ctx context.Context, dsn string) (store.Storable, error) { return sqlite.New(ctx, dsn) }, + init: sqlite.Connect, }, { name: "InMemory",