3
0

upd(sam): configs refactor

This commit is contained in:
Tit Petric 2018-09-11 17:50:29 +02:00
parent dfd9985552
commit 3e3af76b65
11 changed files with 140 additions and 96 deletions

View File

@ -1,67 +1,49 @@
package sam
import (
"github.com/namsral/flag"
"github.com/pkg/errors"
"github.com/crusttech/crust/sam/websocket"
"github.com/crusttech/crust/config"
"github.com/crusttech/crust/sam/repository"
)
type (
configuration struct {
http struct {
addr string
logging bool
pretty bool
tracing bool
metrics bool
}
db struct {
dsn string
profiler string
}
websocket websocket.Configuration
appFlags struct {
http *config.HTTP
db *config.Database
repository *repository.Flags
}
)
var config *configuration
var flags *appFlags
func (c *configuration) Validate() error {
func (c *appFlags) Validate() error {
if c == nil {
return errors.New("SAM config is not initialized, need to call Flags()")
return errors.New("SAM flags are not initialized, need to call Flags()")
}
if c.http.addr == "" {
return errors.New("No HTTP Addr is set, can't listen for HTTP")
if err := c.http.Validate(); err != nil {
return err
}
if c.db.dsn == "" {
return errors.New("No DB DSN is set, can't connect to database")
if err := c.db.Validate(); err != nil {
return err
}
if err := c.websocket.Validate(); err != nil {
if err := c.repository.Validate(); err != nil {
return err
}
return nil
}
func Flags(prefix ...string) {
if config != nil {
if flags != nil {
return
}
if len(prefix) == 0 {
panic("sam.Flags() needs prefix on first call")
}
config = new(configuration)
(&config.websocket).Init()
p := func(s string) string {
return prefix[0] + "-" + s
flags = &appFlags{
new(config.HTTP).Init(prefix...),
new(config.Database).Init(prefix...),
new(repository.Flags).Init(prefix...),
}
flag.StringVar(&config.http.addr, p("http-addr"), ":3000", "Listen address for HTTP server")
flag.BoolVar(&config.http.logging, p("http-log"), true, "Enable/disable HTTP request log")
flag.BoolVar(&config.http.pretty, p("http-pretty-json"), false, "Prettify returned JSON output")
flag.BoolVar(&config.http.tracing, p("http-error-tracing"), false, "Return error stack frame")
flag.BoolVar(&config.http.metrics, p("http-metrics"), false, "Provide metrics export for prometheus")
flag.StringVar(&config.db.dsn, p("db-dsn"), "crust:crust@tcp(db1:3306)/crust?collation=utf8mb4_general_ci", "DSN for database connection")
flag.StringVar(&config.db.profiler, p("db-profiler"), "", "Profiler for DB queries (none, stdout)")
}

View File

@ -1,5 +1,9 @@
package repository
import (
"github.com/pkg/errors"
)
type (
repositoryError string
)
@ -7,6 +11,7 @@ type (
const (
ErrDatabaseError = repositoryError("DatabaseError")
ErrNotImplemented = repositoryError("NotImplemented")
ErrConfigError = repositoryError("ConfigError")
)
func (e repositoryError) Error() string {
@ -16,3 +21,7 @@ func (e repositoryError) Error() string {
func (e repositoryError) String() string {
return "crust.sam.repository." + string(e)
}
func (e repositoryError) New() error {
return errors.WithStack(e)
}

38
sam/repository/flags.go Normal file
View File

@ -0,0 +1,38 @@
package repository
import (
"github.com/crusttech/crust/config"
)
type (
Flags struct {
PubSub *config.PubSub
Websocket *config.Websocket
}
)
var flags *Flags
func (f *Flags) Validate() error {
if flags == nil {
return ErrConfigError.New()
}
if err := f.PubSub.Validate(); err != nil {
return err
}
if err := f.Websocket.Validate(); err != nil {
return err
}
return nil
}
func (f *Flags) Init(prefix ...string) *Flags {
if flags != nil {
return flags
}
flags = &Flags{
new(config.PubSub).Init(prefix...),
new(config.Websocket).Init(prefix...),
}
return flags
}

View File

@ -7,15 +7,17 @@ import (
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/crusttech/crust/config"
)
func mountRoutes(r chi.Router, opts *configuration, mounts ...func(r chi.Router)) {
func mountRoutes(r chi.Router, opts *config.HTTP, mounts ...func(r chi.Router)) {
r.Use(handleCORS)
if opts.http.logging {
if opts.Logging {
r.Use(middleware.Logger)
}
if opts.http.metrics {
if opts.Metrics {
r.Use(metrics{}.Middleware("sam"))
}
@ -24,14 +26,14 @@ func mountRoutes(r chi.Router, opts *configuration, mounts ...func(r chi.Router)
}
}
func mountSystemRoutes(r chi.Router, opts *configuration) {
if opts.http.metrics {
func mountSystemRoutes(r chi.Router, opts *config.HTTP) {
if opts.Metrics {
r.Handle("/metrics", metrics{}.Handler())
}
r.Mount("/debug", middleware.Profiler())
}
func printRoutes(r chi.Router, opts *configuration) {
func printRoutes(r chi.Router, opts *config.HTTP) {
var printRoutes func(chi.Routes, string, string)
printRoutes = func(r chi.Routes, indent string, prefix string) {
routes := r.Routes()

View File

@ -6,6 +6,7 @@ import (
"github.com/crusttech/crust/auth"
"github.com/crusttech/crust/sam/repository"
"github.com/crusttech/crust/sam/types"
"github.com/crusttech/crust/store"
"github.com/titpetric/factory"
"io"
"net/http"
@ -38,14 +39,7 @@ type (
repository.Attachment
}
attachmentStore interface {
Original(id uint64, ext string) string
Preview(id uint64, ext string) string
Save(filename string, contents io.Reader) error
Remove(filename string) error
Open(filename string) (io.Reader, error)
}
attachmentStore store.Store
)
func Attachment(store attachmentStore) *attachment {

View File

@ -2,9 +2,12 @@ package service
import (
"context"
"time"
"github.com/gomodule/redigo/redis"
"github.com/pkg/errors"
"time"
"github.com/crusttech/crust/config"
)
type PubSub struct {
@ -15,12 +18,15 @@ type PubSub struct {
healthCheckInterval time.Duration
}
func (ps PubSub) New(redisServerAddr string, ctx context.Context) *PubSub {
func (ps PubSub) New(config *config.PubSub, ctx context.Context) (*PubSub, error) {
if err := config.Validate(); err != nil {
return nil, err
}
return &PubSub{
ctx: ctx,
redisServerAddr: redisServerAddr,
redisServerAddr: config.RedisAddr,
healthCheckInterval: time.Minute,
}
}, nil
}
func (ps *PubSub) With(ctx context.Context) *PubSub {

View File

@ -23,18 +23,18 @@ import (
func Init() error {
// validate configuration
if err := config.Validate(); err != nil {
if err := flags.Validate(); err != nil {
return err
}
// start/configure database connection
factory.Database.Add("default", config.db.dsn)
factory.Database.Add("default", flags.db.DSN)
db, err := factory.Database.Get()
if err != nil {
return err
}
// @todo: profiling as an external service?
switch config.db.profiler {
switch flags.db.Profiler {
case "stdout":
db.Profiler = &factory.Database.ProfilerStdout
default:
@ -43,8 +43,8 @@ func Init() error {
// configure resputil options
resputil.SetConfig(resputil.Options{
Pretty: config.http.pretty,
Trace: config.http.tracing,
Pretty: flags.http.Pretty,
Trace: flags.http.Tracing,
Logger: func(err error) {
// @todo: error logging
},
@ -56,10 +56,10 @@ func Init() error {
func Start() error {
deadline := sigctx.New()
log.Println("Starting http server on address " + config.http.addr)
listener, err := net.Listen("tcp", config.http.addr)
log.Println("Starting http server on address " + flags.http.Addr)
listener, err := net.Listen("tcp", flags.http.Addr)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("Can't listen on addr %s", config.http.addr))
return errors.Wrap(err, fmt.Sprintf("Can't listen on addr %s", flags.http.Addr))
}
// JWT Auth
@ -74,11 +74,11 @@ func Start() error {
// Only protect application routes with JWT
r.Group(func(r chi.Router) {
r.Use(jwtAuth.Verifier(), jwtAuth.Authenticator())
mountRoutes(r, config, rest.MountRoutes(jwtAuth), websocket.MountRoutes(ctx.AsContext(deadline), config.websocket))
mountRoutes(r, flags.http, rest.MountRoutes(jwtAuth), websocket.MountRoutes(ctx.AsContext(deadline), flags.repository))
})
printRoutes(r, config)
mountSystemRoutes(r, config)
printRoutes(r, flags.http)
mountSystemRoutes(r, flags.http)
go http.Serve(listener, r)
<-deadline.Done()

View File

@ -2,10 +2,14 @@ package websocket
import (
"context"
"log"
"time"
"github.com/titpetric/factory"
"github.com/crusttech/crust/sam/repository"
"github.com/crusttech/crust/sam/service"
"github.com/crusttech/crust/sam/types"
"github.com/titpetric/factory"
"time"
)
type (
@ -57,31 +61,37 @@ func (eq *eventQueue) store(ctx context.Context, qp eventQueuePusher) {
}()
}
func (eq *eventQueue) feedSessions(ctx context.Context, config Configuration, qp eventQueuePuller, store eventQueueWalker) error {
func (eq *eventQueue) feedSessions(ctx context.Context, config *repository.Flags, qp eventQueuePuller, store eventQueueWalker) error {
newMessageEvent := make(chan struct{}, eventQueueBacklog)
done := make(chan error, 1)
// feed events from redis into newMessageEvent channel
if config.pubSubMode == "redis" && config.pubSubRedis != "" {
onConnect := func() error {
return nil
if config.PubSub.Mode == "redis" {
pubsub, err := service.PubSub{}.New(config.PubSub, ctx)
if err != nil {
log.Printf("PubSub: Error when starting in mode=redis, %+v\n", err)
log.Println("PubSub: Reverting to polling mode")
config.PubSub.Mode = "poll"
} else {
go func() {
onConnect := func() error {
return nil
}
onMessage := func(message string, payload []byte) error {
newMessageEvent <- struct{}{}
return nil
}
done <- pubsub.Subscribe(onConnect, onMessage, "events")
}()
}
onMessage := func(message string, payload []byte) error {
newMessageEvent <- struct{}{}
return nil
}
pubsub := service.PubSub{}.New(config.pubSubRedis, ctx)
go func() {
done <- pubsub.Subscribe(onConnect, onMessage, "events")
}()
}
if config.pubSubMode == "poll" {
if config.PubSub.Mode == "poll" {
polling := func() error {
for {
select {
case <-ctx.Done():
case <-time.After(config.pubSubInterval):
case <-time.After(config.PubSub.PollingInterval):
newMessageEvent <- struct{}{}
}
}

View File

@ -7,7 +7,7 @@ import (
"github.com/go-chi/chi"
)
func MountRoutes(ctx context.Context, config Configuration) func(chi.Router) {
func MountRoutes(ctx context.Context, config *repository.Flags) func(chi.Router) {
return func(r chi.Router) {
var (
// @todo move this 1 level up & join with rest init functions

View File

@ -8,6 +8,7 @@ import (
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/crusttech/crust/sam/repository"
"github.com/crusttech/crust/sam/service"
"github.com/crusttech/crust/sam/types"
"github.com/crusttech/crust/sam/websocket/outgoing"
@ -27,13 +28,13 @@ type (
remoteAddr string
config Configuration
config *repository.Flags
user *types.User
}
)
func (Session) New(ctx context.Context, config Configuration, conn *websocket.Conn) *Session {
func (Session) New(ctx context.Context, config *repository.Flags, conn *websocket.Conn) *Session {
return &Session{
conn: conn,
ctx: ctx,
@ -87,9 +88,9 @@ func (sess *Session) readLoop() error {
sess.Close()
}()
sess.conn.SetReadDeadline(time.Now().Add(sess.config.pingTimeout))
sess.conn.SetReadDeadline(time.Now().Add(sess.config.Websocket.PingTimeout))
sess.conn.SetPongHandler(func(string) error {
sess.conn.SetReadDeadline(time.Now().Add(sess.config.pingTimeout))
sess.conn.SetReadDeadline(time.Now().Add(sess.config.Websocket.PingTimeout))
return nil
})
sess.remoteAddr = sess.conn.RemoteAddr().String()
@ -108,7 +109,7 @@ func (sess *Session) readLoop() error {
}
func (sess *Session) writeLoop() error {
ticker := time.NewTicker(sess.config.pingPeriod)
ticker := time.NewTicker(sess.config.Websocket.PingPeriod)
defer func() {
ticker.Stop()
@ -116,7 +117,7 @@ func (sess *Session) writeLoop() error {
}()
write := func(msg []byte) error {
sess.conn.SetWriteDeadline(time.Now().Add(sess.config.writeTimeout))
sess.conn.SetWriteDeadline(time.Now().Add(sess.config.Websocket.Timeout))
if msg != nil {
return sess.conn.WriteMessage(websocket.TextMessage, msg)
}
@ -124,7 +125,7 @@ func (sess *Session) writeLoop() error {
}
ping := func() error {
sess.conn.SetWriteDeadline(time.Now().Add(sess.config.writeTimeout))
sess.conn.SetWriteDeadline(time.Now().Add(sess.config.Websocket.Timeout))
return sess.conn.WriteMessage(websocket.PingMessage, nil)
}

View File

@ -1,15 +1,17 @@
package websocket
import (
"context"
"log"
"net/http"
"context"
"github.com/crusttech/crust/auth"
"github.com/crusttech/crust/sam/types"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/titpetric/factory/resputil"
"log"
"github.com/crusttech/crust/auth"
"github.com/crusttech/crust/sam/repository"
"github.com/crusttech/crust/sam/types"
)
type (
@ -17,7 +19,7 @@ type (
svc struct {
userFinder wsUserFinder
}
config Configuration
config *repository.Flags
}
wsUserFinder interface {
@ -25,7 +27,7 @@ type (
}
)
func (Websocket) New(svcUser wsUserFinder, config Configuration) *Websocket {
func (Websocket) New(svcUser wsUserFinder, config *repository.Flags) *Websocket {
ws := &Websocket{
config: config,
}