Cleanup, instrumentation
This commit is contained in:
@@ -5,5 +5,5 @@ package app
|
||||
// SQLite is intentionally ignored here
|
||||
import (
|
||||
_ "github.com/cortezaproject/corteza-server/store/mysql"
|
||||
_ "github.com/cortezaproject/corteza-server/store/pgsql"
|
||||
_ "github.com/cortezaproject/corteza-server/store/postgres"
|
||||
)
|
||||
|
||||
@@ -85,7 +85,6 @@ func (imp *Importer) Cast(def interface{}) (err error) {
|
||||
var nsHandle string
|
||||
// Solving a special case where namespace is defined as string
|
||||
// and we're treating value as namespace's handle
|
||||
println("provisioning namespaces")
|
||||
deinterfacer.KVsetString(&nsHandle, "namespace", def)
|
||||
if nsHandle != "" {
|
||||
delete(def.(map[interface{}]interface{}), "namespace")
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"github.com/cortezaproject/corteza-server/pkg/id"
|
||||
"github.com/cortezaproject/corteza-server/pkg/permissions"
|
||||
"github.com/cortezaproject/corteza-server/store"
|
||||
"github.com/cortezaproject/corteza-server/store/sqlite"
|
||||
"github.com/cortezaproject/corteza-server/store/sqlite3"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"testing"
|
||||
@@ -16,7 +16,7 @@ import (
|
||||
func TestCharts(t *testing.T) {
|
||||
var (
|
||||
ctx = context.Background()
|
||||
s, err = sqlite.ConnectInMemory(ctx)
|
||||
s, err = sqlite3.ConnectInMemory(ctx)
|
||||
|
||||
namespaceID = id.Next()
|
||||
ns *types.Namespace
|
||||
|
||||
@@ -12,19 +12,9 @@ package store
|
||||
// the code is regenerated.
|
||||
//
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type (
|
||||
Transactioner interface {
|
||||
Tx(context.Context, func(context.Context, Storer) error) error
|
||||
}
|
||||
|
||||
// Sortable interface combines interfaces of all supported store interfaces
|
||||
Storer interface {
|
||||
Transactioner
|
||||
|
||||
storerGenerated interface {
|
||||
{{ range .Definitions -}}
|
||||
{{ export .Types.Plural }}
|
||||
{{ end }}
|
||||
|
||||
@@ -2,7 +2,6 @@ package logger
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/go-chi/chi/middleware"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@@ -11,19 +10,35 @@ type (
|
||||
ctxLogKey struct{}
|
||||
)
|
||||
|
||||
func ContextWithValue(ctx context.Context, log *zap.Logger) context.Context {
|
||||
return context.WithValue(ctx, ctxLogKey{}, log)
|
||||
// ContextWithValue allows us to pack custom logger to context and pass that to the
|
||||
func ContextWithValue(ctx context.Context, logger *zap.Logger) context.Context {
|
||||
return context.WithValue(ctx, ctxLogKey{}, logger)
|
||||
}
|
||||
|
||||
func ContextValue(ctx context.Context) *zap.Logger {
|
||||
return ctx.Value(ctxLogKey{}).(*zap.Logger)
|
||||
}
|
||||
|
||||
// NamedDefault returns default logger with requestID (from context) and extended name
|
||||
func AddRequestID(ctx context.Context, log *zap.Logger) *zap.Logger {
|
||||
if reqID := middleware.GetReqID(ctx); reqID != "" {
|
||||
log = log.With(zap.String("requestID", reqID))
|
||||
// ContextValue retrieves logger from given context or falls back to
|
||||
// any of the logger passed to it. If no loggers are found it uses default logger from pkg/logger
|
||||
func ContextValue(ctx context.Context, fallbacks ...*zap.Logger) *zap.Logger {
|
||||
if ctx != nil {
|
||||
if ctxLogger := ctx.Value(ctxLogKey{}); ctxLogger != nil {
|
||||
// This will panic if we somehow manage to set
|
||||
return ctxLogger.(*zap.Logger)
|
||||
}
|
||||
}
|
||||
|
||||
return log
|
||||
for _, l := range fallbacks {
|
||||
if l != nil {
|
||||
return l
|
||||
}
|
||||
}
|
||||
|
||||
return Default()
|
||||
}
|
||||
|
||||
// AddRequestID sets requestID field from context to logger and returns it
|
||||
func AddRequestID(ctx context.Context, logger *zap.Logger) *zap.Logger {
|
||||
if reqID := middleware.GetReqID(ctx); reqID != "" {
|
||||
logger = logger.With(zap.String("requestID", reqID))
|
||||
}
|
||||
|
||||
return logger
|
||||
}
|
||||
|
||||
@@ -35,19 +35,9 @@ package store
|
||||
// the code is regenerated.
|
||||
//
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type (
|
||||
Transactioner interface {
|
||||
Tx(context.Context, func(context.Context, Storer) error) error
|
||||
}
|
||||
|
||||
// Sortable interface combines interfaces of all supported store interfaces
|
||||
Storer interface {
|
||||
Transactioner
|
||||
|
||||
storerGenerated interface {
|
||||
Actionlogs
|
||||
Applications
|
||||
Attachments
|
||||
|
||||
25
store/interfaces.go
Normal file
25
store/interfaces.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
// Storer interface combines interfaces of all supported store interfaces
|
||||
Storer interface {
|
||||
// SetLogger sets new logging facility
|
||||
//
|
||||
// Store facility should fallback to logger.Default when no logging facility is set
|
||||
//
|
||||
// Intentionally closely coupled with Zap logger since this is not some public lib
|
||||
// and it's highly unlikely we'll support different/multiple logging "backend"
|
||||
SetLogger(*zap.Logger)
|
||||
|
||||
// Tx is a transaction handler
|
||||
Tx(context.Context, func(context.Context, Storer) error) error
|
||||
|
||||
// All generated store interfaces
|
||||
storerGenerated
|
||||
}
|
||||
)
|
||||
@@ -2,12 +2,15 @@ package mysql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/Masterminds/squirrel"
|
||||
"github.com/cortezaproject/corteza-server/store"
|
||||
"github.com/cortezaproject/corteza-server/store/rdbms"
|
||||
"github.com/cortezaproject/corteza-server/store/rdbms/instrumentation"
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/ngrok/sqlmw"
|
||||
"go.uber.org/zap"
|
||||
"strings"
|
||||
)
|
||||
@@ -19,7 +22,8 @@ type (
|
||||
)
|
||||
|
||||
func init() {
|
||||
store.Register(Connect, "mysql")
|
||||
store.Register(Connect, "mysql", "mysql+debug")
|
||||
sql.Register("mysql+debug", sqlmw.Driver(new(mysql.MySQLDriver), instrumentation.Debug()))
|
||||
}
|
||||
|
||||
func Connect(ctx context.Context, dsn string) (store.Storer, error) {
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
package pgsql
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"github.com/Masterminds/squirrel"
|
||||
"github.com/cortezaproject/corteza-server/store"
|
||||
"github.com/cortezaproject/corteza-server/store/rdbms"
|
||||
"github.com/cortezaproject/corteza-server/store/rdbms/instrumentation"
|
||||
"github.com/lib/pq"
|
||||
"github.com/ngrok/sqlmw"
|
||||
"go.uber.org/zap"
|
||||
"net/url"
|
||||
"strings"
|
||||
@@ -19,7 +22,9 @@ type (
|
||||
)
|
||||
|
||||
func init() {
|
||||
store.Register(Connect, "postgresql", "postgres", "pgsql")
|
||||
store.Register(Connect, "postgres", "postgres+debug")
|
||||
sql.Register("postgres+debug", sqlmw.Driver(new(pq.Driver), instrumentation.Debug()))
|
||||
|
||||
}
|
||||
|
||||
func Connect(ctx context.Context, dsn string) (store.Storer, error) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package pgsql
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -1,4 +1,4 @@
|
||||
package pgsql
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -1,4 +1,4 @@
|
||||
package pgsql
|
||||
package postgres
|
||||
|
||||
// PostgreSQL specific prefixes, sql
|
||||
// templates, functions and other helpers
|
||||
23
store/rdbms/instrumentation/README.adoc
Normal file
23
store/rdbms/instrumentation/README.adoc
Normal file
@@ -0,0 +1,23 @@
|
||||
= RDBMS Store instrumentation & debug
|
||||
|
||||
This package provides wrap around `database/sql/driver` that logs and measures db activities that pass through store.
|
||||
|
||||
== Current capabilities
|
||||
|
||||
=== Debug wrapper
|
||||
|
||||
Logging of all activities (connections, queries, execs, transactions) and activity duration measures.
|
||||
This feature should be used in development env and disabled in production.
|
||||
|
||||
.To enable debugging, use extra * parameter for DSN:
|
||||
====
|
||||
DB_DSN="postgres+debug://crust:crust@localhost:5432/crust?sslmode=disable&"
|
||||
====
|
||||
|
||||
In previous versions similar functionality was enabled by setting `DB_LOGGER` environmental variable.
|
||||
This is no longer supported because of migration to "store" architecture and ability to selectively enable this
|
||||
only for specific connections.
|
||||
|
||||
== Plans
|
||||
|
||||
This package will support proper instrumentation, tracing and metrics via OpenTracing/OpenTelemetry.
|
||||
264
store/rdbms/instrumentation/debug.go
Normal file
264
store/rdbms/instrumentation/debug.go
Normal file
@@ -0,0 +1,264 @@
|
||||
package instrumentation
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql/driver"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/cortezaproject/corteza-server/pkg/logger"
|
||||
"github.com/ngrok/sqlmw"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
|
||||
type (
|
||||
// Debug instrumentation wrapper for sql driver
|
||||
//
|
||||
// All calls (with context) are wrapped (using github.com/ngrok/sqlmw package)
|
||||
// and that allows us to log all traffic going in and out of the database
|
||||
debug struct {
|
||||
sqlmw.NullInterceptor
|
||||
fallback *zap.Logger
|
||||
}
|
||||
)
|
||||
|
||||
var (
|
||||
now = func() time.Time {
|
||||
return time.Now()
|
||||
}
|
||||
)
|
||||
|
||||
func Debug() *debug {
|
||||
return &debug{fallback: zap.NewNop()}
|
||||
}
|
||||
|
||||
// Returns logger from context with additional options
|
||||
//
|
||||
// fn tries checks context or uses fallback logger
|
||||
func (ld debug) log(ctx context.Context) *zap.Logger {
|
||||
return logger.
|
||||
ContextValue(ctx, logger.Default(), ld.fallback).
|
||||
WithOptions(
|
||||
// get all the way back to RDBMS layer
|
||||
// @todo not sure that all functions below
|
||||
// have the same stack trace, might
|
||||
// need adjustment on individual fn bellow
|
||||
zap.AddCallerSkip(9),
|
||||
// never add stacktrace to these logs
|
||||
zap.AddStacktrace(zap.FatalLevel),
|
||||
)
|
||||
}
|
||||
|
||||
func (debug) argToZapFields(args []driver.NamedValue) []zap.Field {
|
||||
var (
|
||||
name string
|
||||
out = make([]zap.Field, len(args))
|
||||
)
|
||||
for i := range args {
|
||||
name = "arg."
|
||||
if args[i].Name != "" {
|
||||
name += args[i].Name
|
||||
} else {
|
||||
name += fmt.Sprintf("%d", args[i].Ordinal)
|
||||
}
|
||||
|
||||
out[i] = zap.Any(name, args[i].Value)
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func (ld debug) ConnBeginTx(ctx context.Context, conn driver.ConnBeginTx, txOpts driver.TxOptions) (driver.Tx, error) {
|
||||
var (
|
||||
startedAt = now()
|
||||
tx, err = conn.BeginTx(ctx, txOpts)
|
||||
log = ld.log(ctx).With(zap.Duration("duration", time.Since(startedAt)))
|
||||
message = "conn/begin"
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Error(message, zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Debug(message)
|
||||
return tx, nil
|
||||
}
|
||||
|
||||
func (ld debug) ConnPrepareContext(ctx context.Context, conn driver.ConnPrepareContext, query string) (driver.Stmt, error) {
|
||||
var (
|
||||
startedAt = now()
|
||||
statement, err = conn.PrepareContext(ctx, query)
|
||||
log = ld.log(ctx).With(
|
||||
zap.Duration("duration", time.Since(startedAt)),
|
||||
zap.Any("query", query),
|
||||
)
|
||||
message = "conn/prepare"
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Error(message, zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Debug(message)
|
||||
return statement, nil
|
||||
}
|
||||
|
||||
func (ld debug) ConnPing(ctx context.Context, conn driver.Pinger) error {
|
||||
var (
|
||||
startedAt = now()
|
||||
err = conn.Ping(ctx)
|
||||
log = ld.log(ctx).With(zap.Duration("duration", time.Since(startedAt)))
|
||||
message = "conn/ping"
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Error(message, zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug(message)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ld debug) ConnExecContext(ctx context.Context, conn driver.ExecerContext, query string, args []driver.NamedValue) (driver.Result, error) {
|
||||
var (
|
||||
startedAt = now()
|
||||
result, err = conn.ExecContext(ctx, query, args)
|
||||
log = ld.log(ctx).With(
|
||||
zap.Duration("duration", time.Since(startedAt)),
|
||||
zap.Any("query", query),
|
||||
).With(ld.argToZapFields(args)...)
|
||||
message = "conn/exec"
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, driver.ErrSkip) {
|
||||
// Not actually an error; just informing caller
|
||||
// that it should take an alternative path
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Error(message, zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Debug(message)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (ld debug) ConnQueryContext(ctx context.Context, conn driver.QueryerContext, query string, args []driver.NamedValue) (driver.Rows, error) {
|
||||
var (
|
||||
startedAt = now()
|
||||
rows, err = conn.QueryContext(ctx, query, args)
|
||||
log = ld.log(ctx).With(
|
||||
zap.String("query", query),
|
||||
zap.Duration("duration", time.Since(startedAt)),
|
||||
).With(ld.argToZapFields(args)...)
|
||||
message = "statement/query"
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, driver.ErrSkip) {
|
||||
// Not actually an error; just informing caller
|
||||
// that it should take an alternative path
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Error(message, zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Debug(message)
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
func (ld debug) StmtExecContext(ctx context.Context, stmt driver.StmtExecContext, _ string, args []driver.NamedValue) (driver.Result, error) {
|
||||
var (
|
||||
startedAt = now()
|
||||
result, err = stmt.ExecContext(ctx, args)
|
||||
log = ld.log(ctx).With(
|
||||
zap.Duration("duration", time.Since(startedAt)),
|
||||
).With(ld.argToZapFields(args)...)
|
||||
message = "statement/exec"
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Error(message, zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Debug(message)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (ld debug) StmtQueryContext(ctx context.Context, stmt driver.StmtQueryContext, _ string, args []driver.NamedValue) (driver.Rows, error) {
|
||||
var (
|
||||
startedAt = now()
|
||||
rows, err = stmt.QueryContext(ctx, args)
|
||||
log = ld.log(ctx).With(
|
||||
zap.Duration("duration", time.Since(startedAt)),
|
||||
).With(ld.argToZapFields(args)...)
|
||||
message = "statement/query"
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Error(message, zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Debug(message)
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
func (ld debug) StmtClose(ctx context.Context, stmt driver.Stmt) error {
|
||||
var (
|
||||
startedAt = now()
|
||||
err = stmt.Close()
|
||||
log = ld.log(ctx).With(zap.Duration("duration", time.Since(startedAt)))
|
||||
message = "statement/close"
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Error(message, zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug(message)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ld debug) TxCommit(ctx context.Context, tx driver.Tx) error {
|
||||
var (
|
||||
startedAt = now()
|
||||
err = tx.Commit()
|
||||
log = ld.log(ctx).With(zap.Duration("duration", time.Since(startedAt)))
|
||||
message = "tx/commit"
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Error(message, zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug(message)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ld debug) TxRollback(ctx context.Context, tx driver.Tx) error {
|
||||
var (
|
||||
startedAt = now()
|
||||
err = tx.Rollback()
|
||||
log = ld.log(ctx).With(zap.Duration("duration", time.Since(startedAt)))
|
||||
message = "tx/rollback"
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
log.Error(message, zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
log.Warn(message)
|
||||
return nil
|
||||
}
|
||||
22
store/rdbms/log.go
Normal file
22
store/rdbms/log.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package rdbms
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/cortezaproject/corteza-server/pkg/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// log() returns named logger with caller skip and stacktrace set to Fatal
|
||||
//
|
||||
// It check the given context logger
|
||||
func (s Store) log(ctx context.Context) *zap.Logger {
|
||||
return logger.ContextValue(ctx, s.logger).
|
||||
Named("store.rdbms").
|
||||
WithOptions(zap.AddCallerSkip(2)).
|
||||
// Really not interested in call stack
|
||||
WithOptions(zap.AddStacktrace(zap.FatalLevel))
|
||||
}
|
||||
|
||||
func (s Store) SetLogger(logger *zap.Logger) {
|
||||
s.logger = logger
|
||||
}
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"github.com/Masterminds/squirrel"
|
||||
"github.com/cortezaproject/corteza-server/pkg/filter"
|
||||
"github.com/cortezaproject/corteza-server/pkg/healthcheck"
|
||||
"github.com/cortezaproject/corteza-server/pkg/logger"
|
||||
"github.com/cortezaproject/corteza-server/pkg/ql"
|
||||
"github.com/cortezaproject/corteza-server/pkg/sentry"
|
||||
"github.com/cortezaproject/corteza-server/pkg/slice"
|
||||
@@ -19,16 +18,6 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// persistance layer
|
||||
//
|
||||
// all functions go under one struct
|
||||
// why? because it will be easier to initialize and pass around
|
||||
//
|
||||
// each domain will be in it's own file
|
||||
//
|
||||
// connection logic will be built in the persistence layer (making pkg/db obsolete)
|
||||
//
|
||||
|
||||
type (
|
||||
schemaUpgradeGenerator interface {
|
||||
TableExists(string) bool
|
||||
@@ -36,6 +25,7 @@ type (
|
||||
CreateIndexes(ii ...*ddl.Index) []string
|
||||
}
|
||||
|
||||
// Store - Corteza RDBMS persistence layer
|
||||
Store struct {
|
||||
config *Config
|
||||
|
||||
@@ -44,6 +34,9 @@ type (
|
||||
sug schemaUpgradeGenerator
|
||||
|
||||
db dbLayer
|
||||
|
||||
// Logger for connection
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
dbLayer interface {
|
||||
@@ -66,12 +59,6 @@ type (
|
||||
}
|
||||
)
|
||||
|
||||
var log = logger.MakeDebugLogger().
|
||||
//return logger.Default().
|
||||
Named("store.rdbms").
|
||||
WithOptions(zap.AddCallerSkip(2)).
|
||||
WithOptions(zap.AddStacktrace(zap.FatalLevel))
|
||||
|
||||
const (
|
||||
// TxRetryHardLimit is the absolute maximum retries we'll allow
|
||||
TxRetryHardLimit = 100
|
||||
@@ -83,20 +70,18 @@ const (
|
||||
)
|
||||
|
||||
func Connect(ctx context.Context, cfg *Config) (s *Store, err error) {
|
||||
if err = cfg.ParseExtra(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, func() error {
|
||||
if err = cfg.ParseExtra(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cfg.SetDefaults()
|
||||
s = &Store{
|
||||
config: cfg,
|
||||
}
|
||||
cfg.SetDefaults()
|
||||
s = &Store{
|
||||
config: cfg,
|
||||
}
|
||||
|
||||
if err = s.Connect(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s, nil
|
||||
return s.Connect(ctx)
|
||||
}()
|
||||
}
|
||||
|
||||
// WithTx spins up new store instance with transaction
|
||||
@@ -108,20 +93,17 @@ func (s *Store) withTx(tx dbLayer) *Store {
|
||||
}
|
||||
}
|
||||
|
||||
// Temporary solution for logging
|
||||
func (s *Store) log(ctx context.Context) *zap.Logger {
|
||||
// @todo extract logger from context
|
||||
return log
|
||||
}
|
||||
|
||||
func (s *Store) Connect(ctx context.Context) error {
|
||||
s.log(ctx).Debug("opening connection", zap.String("driver", s.config.DriverName), zap.String("dsn", s.config.MaskedDSN()))
|
||||
db, err := sqlx.Open(s.config.DriverName, s.config.DataSourceName)
|
||||
healthcheck.Defaults().Add(dbHealthcheck(db), "Store/RDBMS/"+s.config.DriverName)
|
||||
|
||||
db, err := sql.Open(s.config.DriverName, s.config.DataSourceName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
healthcheck.Defaults().Add(dbHealthcheck(db), "Store/RDBMS/"+s.config.DriverName)
|
||||
|
||||
dbx := sqlx.NewDb(db, s.config.DriverName)
|
||||
s.log(ctx).Debug(
|
||||
"setting connection parameters",
|
||||
zap.Int("MaxOpenConns", s.config.MaxOpenConns),
|
||||
@@ -129,15 +111,15 @@ func (s *Store) Connect(ctx context.Context) error {
|
||||
zap.Int("MaxIdleConns", s.config.MaxIdleConns),
|
||||
)
|
||||
|
||||
db.SetMaxOpenConns(s.config.MaxOpenConns)
|
||||
db.SetConnMaxLifetime(s.config.ConnMaxLifetime)
|
||||
db.SetMaxIdleConns(s.config.MaxIdleConns)
|
||||
dbx.SetMaxOpenConns(s.config.MaxOpenConns)
|
||||
dbx.SetConnMaxLifetime(s.config.ConnMaxLifetime)
|
||||
dbx.SetMaxIdleConns(s.config.MaxIdleConns)
|
||||
|
||||
if err = s.tryToConnect(ctx, db); err != nil {
|
||||
if err = s.tryToConnect(ctx, dbx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.db = db
|
||||
s.db = dbx
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -147,8 +129,6 @@ func (s Store) tryToConnect(ctx context.Context, db *sqlx.DB) error {
|
||||
patience = time.Now().Add(s.config.ConnTryPatience)
|
||||
)
|
||||
|
||||
//defer close(connErrCh)
|
||||
|
||||
go func() {
|
||||
defer sentry.Recover()
|
||||
|
||||
@@ -207,13 +187,12 @@ func (s Store) tryToConnect(ctx context.Context, db *sqlx.DB) error {
|
||||
}
|
||||
}
|
||||
|
||||
func dbHealthcheck(db *sqlx.DB) func(ctx context.Context) error {
|
||||
func dbHealthcheck(db *sql.DB) func(ctx context.Context) error {
|
||||
return db.PingContext
|
||||
}
|
||||
|
||||
func (s Store) Query(ctx context.Context, q squirrel.Sqlizer) (*sql.Rows, error) {
|
||||
var (
|
||||
start = time.Now()
|
||||
query, args, err = q.ToSql()
|
||||
)
|
||||
|
||||
@@ -221,15 +200,12 @@ func (s Store) Query(ctx context.Context, q squirrel.Sqlizer) (*sql.Rows, error)
|
||||
return nil, fmt.Errorf("could not build query: %w", err)
|
||||
}
|
||||
|
||||
s.log(ctx).Debug(query, zap.Any("args", args), zap.Duration("duration", time.Now().Sub(start)))
|
||||
|
||||
return s.db.QueryContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
// QueryRow returns row instead of filling in the passed struct
|
||||
func (s Store) QueryRow(ctx context.Context, q squirrel.SelectBuilder) (*sql.Row, error) {
|
||||
var (
|
||||
start = time.Now()
|
||||
query, args, err = q.ToSql()
|
||||
)
|
||||
|
||||
@@ -237,14 +213,11 @@ func (s Store) QueryRow(ctx context.Context, q squirrel.SelectBuilder) (*sql.Row
|
||||
return nil, fmt.Errorf("could not build query: %w", err)
|
||||
}
|
||||
|
||||
s.log(ctx).Debug(query, zap.Any("args", args), zap.Duration("duration", time.Now().Sub(start)))
|
||||
|
||||
return s.db.QueryRowContext(ctx, query, args...), nil
|
||||
}
|
||||
|
||||
func (s Store) Exec(ctx context.Context, sqlizer squirrel.Sqlizer) error {
|
||||
var (
|
||||
start = time.Now()
|
||||
query, args, err = sqlizer.ToSql()
|
||||
)
|
||||
|
||||
@@ -252,8 +225,6 @@ func (s Store) Exec(ctx context.Context, sqlizer squirrel.Sqlizer) error {
|
||||
return err
|
||||
}
|
||||
|
||||
s.log(ctx).Debug(query, zap.Any("args", args), zap.Duration("duration", time.Now().Sub(start)))
|
||||
|
||||
_, err = s.db.ExecContext(ctx, query, args...)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package rdbms
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/Masterminds/squirrel"
|
||||
"github.com/cortezaproject/corteza-server/pkg/ql"
|
||||
"github.com/cortezaproject/corteza-server/store"
|
||||
@@ -39,6 +40,9 @@ type (
|
||||
DataSourceName string
|
||||
DBName string
|
||||
|
||||
// Forces debug mode on RDBMS driver
|
||||
Debug bool
|
||||
|
||||
// MaxOpenConns sets maximum number of open connections to the database
|
||||
// defaults to same value as set in the db/sql
|
||||
MaxOpenConns int
|
||||
@@ -162,7 +166,7 @@ func (c *Config) SetDefaults() {
|
||||
// ** ** ** ** ** ** ** ** ** ** ** ** ** **
|
||||
|
||||
if c.ConnTryPatience == 0 {
|
||||
c.ConnTryPatience = 1 * time.Minute
|
||||
//c.ConnTryPatience = 1 * time.Minute
|
||||
}
|
||||
|
||||
if c.ConnTryBackoffDelay == 0 {
|
||||
@@ -217,51 +221,47 @@ func (c *Config) ParseExtra() (err error) {
|
||||
}
|
||||
)
|
||||
|
||||
const storePrefixChar = "*"
|
||||
|
||||
for key := range vv {
|
||||
val = vv.Get(key)
|
||||
|
||||
if storePrefixChar != key[:1] {
|
||||
// skip non-store specific config
|
||||
continue
|
||||
}
|
||||
|
||||
switch key {
|
||||
case "*connTryPatience":
|
||||
delete(vv, key)
|
||||
if c.ConnTryPatience, err = time.ParseDuration(val); err != nil {
|
||||
return
|
||||
}
|
||||
c.ConnTryPatience, err = time.ParseDuration(val)
|
||||
|
||||
case "*connTryBackoffDelay":
|
||||
delete(vv, key)
|
||||
if c.ConnTryBackoffDelay, err = time.ParseDuration(val); err != nil {
|
||||
return
|
||||
}
|
||||
c.ConnTryBackoffDelay, err = time.ParseDuration(val)
|
||||
|
||||
case "*connTryTimeout":
|
||||
delete(vv, key)
|
||||
if c.ConnTryTimeout, err = time.ParseDuration(val); err != nil {
|
||||
return
|
||||
}
|
||||
c.ConnTryTimeout, err = time.ParseDuration(val)
|
||||
|
||||
case "*connMaxTries":
|
||||
delete(vv, key)
|
||||
if c.ConnTryMax, err = parseInt(val); err != nil {
|
||||
return
|
||||
}
|
||||
c.ConnTryMax, err = parseInt(val)
|
||||
|
||||
case "*connMaxOpen":
|
||||
delete(vv, key)
|
||||
if c.MaxOpenConns, err = parseInt(val); err != nil {
|
||||
return
|
||||
}
|
||||
c.MaxOpenConns, err = parseInt(val)
|
||||
|
||||
case "*connMaxLifetime":
|
||||
delete(vv, key)
|
||||
if c.ConnMaxLifetime, err = time.ParseDuration(val); err != nil {
|
||||
return
|
||||
}
|
||||
c.ConnMaxLifetime, err = time.ParseDuration(val)
|
||||
|
||||
case "*connMaxIdle":
|
||||
delete(vv, key)
|
||||
if c.MaxIdleConns, err = parseInt(val); err != nil {
|
||||
return
|
||||
}
|
||||
c.MaxIdleConns, err = parseInt(val)
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("unknown key %q", key)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid store configuration for key %q: %w", key, err)
|
||||
}
|
||||
|
||||
delete(vv, key)
|
||||
}
|
||||
|
||||
// Encode QS back to DSN
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package sqlite
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -1,4 +1,4 @@
|
||||
package sqlite
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -1,13 +1,16 @@
|
||||
package sqlite
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/Masterminds/squirrel"
|
||||
"github.com/cortezaproject/corteza-server/store"
|
||||
"github.com/cortezaproject/corteza-server/store/rdbms"
|
||||
"github.com/cortezaproject/corteza-server/store/rdbms/instrumentation"
|
||||
"github.com/mattn/go-sqlite3"
|
||||
"github.com/ngrok/sqlmw"
|
||||
"go.uber.org/zap"
|
||||
"strings"
|
||||
)
|
||||
@@ -19,7 +22,8 @@ type (
|
||||
)
|
||||
|
||||
func init() {
|
||||
store.Register(Connect, "sqlite3", "sqlite")
|
||||
store.Register(Connect, "sqlite3", "sqlite3+debug")
|
||||
sql.Register("sqlite3+debug", sqlmw.Driver(new(sqlite3.SQLiteDriver), instrumentation.Debug()))
|
||||
}
|
||||
|
||||
func Connect(ctx context.Context, dsn string) (store.Storer, error) {
|
||||
@@ -1,4 +1,4 @@
|
||||
package sqlite
|
||||
package sqlite3
|
||||
|
||||
// PostgreSQL specific prefixes, sql
|
||||
// templates, functions and other helpers
|
||||
@@ -4,8 +4,8 @@ import (
|
||||
"context"
|
||||
"github.com/cortezaproject/corteza-server/store"
|
||||
"github.com/cortezaproject/corteza-server/store/mysql"
|
||||
"github.com/cortezaproject/corteza-server/store/pgsql"
|
||||
"github.com/cortezaproject/corteza-server/store/sqlite"
|
||||
"github.com/cortezaproject/corteza-server/store/postgres"
|
||||
"github.com/cortezaproject/corteza-server/store/sqlite3"
|
||||
"github.com/cortezaproject/corteza-server/tests/helpers"
|
||||
_ "github.com/joho/godotenv/autoload"
|
||||
"go.uber.org/zap"
|
||||
@@ -50,7 +50,7 @@ func Test_Store(t *testing.T) {
|
||||
{
|
||||
name: "PostgreSQL",
|
||||
dsnEnvKey: "RDBMS_PGSQL_DSN",
|
||||
init: pgsql.Connect,
|
||||
init: postgres.Connect,
|
||||
},
|
||||
{
|
||||
name: "CockroachDB",
|
||||
@@ -60,7 +60,7 @@ func Test_Store(t *testing.T) {
|
||||
{
|
||||
name: "SQLite",
|
||||
dsnEnvKey: "RDBMS_SQLITE_DSN",
|
||||
init: sqlite.Connect,
|
||||
init: sqlite3.Connect,
|
||||
},
|
||||
{
|
||||
name: "InMemory",
|
||||
|
||||
@@ -3,17 +3,17 @@ package service
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/cortezaproject/corteza-server/pkg/eventbus"
|
||||
"github.com/cortezaproject/corteza-server/pkg/id"
|
||||
"github.com/cortezaproject/corteza-server/store/sqlite"
|
||||
"github.com/cortezaproject/corteza-server/store"
|
||||
"github.com/cortezaproject/corteza-server/store/sqlite3"
|
||||
"github.com/cortezaproject/corteza-server/system/types"
|
||||
"github.com/markbates/goth"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cortezaproject/corteza-server/pkg/eventbus"
|
||||
"github.com/cortezaproject/corteza-server/system/types"
|
||||
)
|
||||
|
||||
// Mock auth service with nil for current time, dummy provider validator and mock db
|
||||
@@ -21,7 +21,7 @@ func makeMockAuthService() *auth {
|
||||
var (
|
||||
ctx = context.Background()
|
||||
|
||||
mem, err = sqlite.ConnectInMemory(ctx)
|
||||
mem, err = sqlite3.ConnectInMemory(ctx)
|
||||
|
||||
svc = &auth{
|
||||
providerValidator: func(s string) error {
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
// Explicitly register SQLite (not done in the app as for testing only)
|
||||
_ "github.com/cortezaproject/corteza-server/store/sqlite"
|
||||
_ "github.com/cortezaproject/corteza-server/store/sqlite3"
|
||||
)
|
||||
|
||||
func NewIntegrationTestApp(ctx context.Context, initTestServices func(*app.CortezaApp) error) *app.CortezaApp {
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
"github.com/cortezaproject/corteza-server/pkg/permissions"
|
||||
"github.com/cortezaproject/corteza-server/pkg/rand"
|
||||
"github.com/cortezaproject/corteza-server/pkg/store/plain"
|
||||
"github.com/cortezaproject/corteza-server/store/sqlite"
|
||||
"github.com/cortezaproject/corteza-server/store/sqlite3"
|
||||
"github.com/cortezaproject/corteza-server/system/rest"
|
||||
"github.com/cortezaproject/corteza-server/system/service"
|
||||
"github.com/cortezaproject/corteza-server/system/types"
|
||||
@@ -70,7 +70,7 @@ func InitTestApp() {
|
||||
return err
|
||||
}
|
||||
|
||||
service.DefaultNgStore, err = sqlite.ConnectInMemory(ctx)
|
||||
service.DefaultNgStore, err = sqlite3.ConnectInMemory(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user