Delay API startup options: - duration (WAIT_FOR) - service hosts (WAIT_FOR_SERVICES=host1:80 host2:3306) - service URL (WAIT_FOR_SERVICES=https://cortezaproject.org)
329 lines
6.7 KiB
Go
329 lines
6.7 KiB
Go
package api
|
|
|
|
import (
|
|
"context"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi"
|
|
"github.com/pkg/errors"
|
|
"github.com/spf13/cobra"
|
|
"github.com/titpetric/factory/resputil"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/cortezaproject/corteza-server/internal/auth"
|
|
"github.com/cortezaproject/corteza-server/internal/version"
|
|
"github.com/cortezaproject/corteza-server/pkg/cli/options"
|
|
)
|
|
|
|
type (
|
|
Server struct {
|
|
name string
|
|
|
|
log *zap.Logger
|
|
|
|
httpOpt *options.HTTPOpt
|
|
monitorOpt *options.MonitorOpt
|
|
|
|
endpoints []func(r chi.Router)
|
|
}
|
|
)
|
|
|
|
var (
|
|
Monolith = false
|
|
BaseURL = "/"
|
|
)
|
|
|
|
func NewServer(log *zap.Logger) *Server {
|
|
return &Server{
|
|
endpoints: make([]func(r chi.Router), 0),
|
|
log: log.Named("http"),
|
|
}
|
|
}
|
|
|
|
func (s *Server) Command(ctx context.Context, cmdName, prefix string, preRun func(context.Context) error) (cmd *cobra.Command) {
|
|
s.httpOpt = options.HTTP(prefix)
|
|
s.monitorOpt = options.Monitor(prefix)
|
|
|
|
cmd = &cobra.Command{
|
|
Use: cmdName,
|
|
Short: "Start HTTP Server with REST API",
|
|
|
|
// Connect all the wires, prepare services, run watchers, bind endpoints
|
|
PreRunE: func(cmd *cobra.Command, args []string) error {
|
|
s.waitFor(ctx, options.WaitFor(prefix))
|
|
|
|
if s.monitorOpt.Interval > 0 {
|
|
go NewMonitor(int(s.monitorOpt.Interval / time.Second))
|
|
}
|
|
|
|
return preRun(ctx)
|
|
},
|
|
|
|
// Run the server
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
s.Serve(ctx)
|
|
},
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (s *Server) MountRoutes(mm ...func(chi.Router)) {
|
|
s.endpoints = append(s.endpoints, mm...)
|
|
}
|
|
|
|
func (s Server) Serve(ctx context.Context) {
|
|
s.log.Info("Starting HTTP server with REST API", zap.String("address", s.httpOpt.Addr))
|
|
|
|
// configure resputil options
|
|
resputil.SetConfig(resputil.Options{
|
|
Trace: s.httpOpt.Tracing,
|
|
Logger: func(err error) {
|
|
// @todo: error logging
|
|
},
|
|
})
|
|
|
|
listener, err := net.Listen("tcp", s.httpOpt.Addr)
|
|
if err != nil {
|
|
s.log.Error("Can not start server", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
router := chi.NewRouter()
|
|
|
|
// Base middleware, CORS, RealIP, RequestID, context-logger
|
|
router.Use(Base(s.log)...)
|
|
|
|
// Logging request if enabled
|
|
if s.httpOpt.LogRequest {
|
|
router.Use(LogRequest)
|
|
}
|
|
|
|
// Logging response if enabled
|
|
if s.httpOpt.LogResponse {
|
|
router.Use(LogResponse)
|
|
}
|
|
|
|
// Handle panic (sets 500 Server error headers)
|
|
router.Use(HandlePanic)
|
|
|
|
// Reports error to Sentry if enabled
|
|
if s.httpOpt.EnablePanicReporting {
|
|
router.Use(Sentry())
|
|
}
|
|
|
|
// Metrics tracking middleware
|
|
if s.httpOpt.EnableMetrics {
|
|
router.Use(Middleware(s.httpOpt.MetricsServiceLabel))
|
|
}
|
|
|
|
router.Group(func(r chi.Router) {
|
|
r.Use(
|
|
auth.DefaultJwtHandler.Verifier(),
|
|
auth.DefaultJwtHandler.Authenticator(),
|
|
)
|
|
|
|
for _, mountRoutes := range s.endpoints {
|
|
mountRoutes(r)
|
|
}
|
|
})
|
|
|
|
if s.httpOpt.EnableMetrics {
|
|
Mount(router, s.httpOpt.MetricsUsername, s.httpOpt.MetricsPassword)
|
|
}
|
|
|
|
if s.httpOpt.EnableDebugRoute {
|
|
Debug(router)
|
|
}
|
|
|
|
if s.httpOpt.EnableVersionRoute {
|
|
router.Get("/version", version.HttpHandler)
|
|
}
|
|
|
|
go func() {
|
|
err = http.Serve(listener, router)
|
|
}()
|
|
<-ctx.Done()
|
|
|
|
if err == nil {
|
|
err = ctx.Err()
|
|
if err == context.Canceled {
|
|
err = nil
|
|
}
|
|
}
|
|
|
|
s.log.Info("HTTP server stopped", zap.Error(err))
|
|
|
|
return
|
|
}
|
|
|
|
// waitFor sets up a simple status page, delays execution and probes services
|
|
func (s Server) waitFor(ctx context.Context, opt *options.WaitForOpt) {
|
|
var (
|
|
services = opt.GetServices()
|
|
)
|
|
|
|
if len(services) == 0 && opt.Delay == 0 {
|
|
// Nothing to do here..
|
|
return
|
|
}
|
|
|
|
var (
|
|
log = s.log.Named("wait-for")
|
|
depChan = make(chan struct{})
|
|
wg sync.WaitGroup
|
|
serviceAddr string
|
|
serviceURL *url.URL
|
|
err error
|
|
)
|
|
|
|
// Setup a simple HTTP server that will inform the impatent users
|
|
listener, err := net.Listen("tcp", s.httpOpt.Addr)
|
|
if err != nil {
|
|
s.log.Error("Can not start server", zap.Error(err))
|
|
os.Exit(1)
|
|
}
|
|
defer listener.Close()
|
|
go func() {
|
|
router := chi.NewRouter()
|
|
router.Get("/*", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusPreconditionFailed)
|
|
w.Write([]byte("waiting for services..."))
|
|
})
|
|
_ = http.Serve(listener, router)
|
|
}()
|
|
|
|
if opt.Delay > 0 {
|
|
s.log.Info("delaying", zap.Duration("delay", opt.Delay))
|
|
|
|
// First delay execution
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Debug("canceled")
|
|
return
|
|
case <-time.After(opt.Delay):
|
|
// all good...
|
|
}
|
|
}
|
|
|
|
if len(services) == 0 {
|
|
return
|
|
}
|
|
|
|
log.Info("waiting for services", zap.Strings("services", services))
|
|
// Probe services
|
|
wg.Add(len(services))
|
|
go func() {
|
|
|
|
for _, service := range services {
|
|
slog := log.With(zap.String("service", service))
|
|
|
|
go func(ctx context.Context, service string) {
|
|
defer wg.Done()
|
|
|
|
if serviceAddr, serviceURL, err = s.resolveService(service); err != nil {
|
|
log.Error("could not resolve service", zap.Error(err))
|
|
}
|
|
|
|
for {
|
|
ctx, cancelFn := context.WithTimeout(ctx, opt.ServicesProbeTimeout)
|
|
defer cancelFn()
|
|
|
|
if serviceURL == nil {
|
|
if err = s.probeService(ctx, serviceAddr); err != nil {
|
|
slog.Warn("service probe failed", zap.Error(err))
|
|
time.Sleep(opt.ServicesProbeInterval)
|
|
continue
|
|
}
|
|
} else {
|
|
if err = s.probeServiceURL(ctx, serviceURL); err != nil {
|
|
slog.Warn("service URL probe failed", zap.Error(err))
|
|
time.Sleep(opt.ServicesProbeInterval)
|
|
continue
|
|
}
|
|
}
|
|
|
|
slog.Debug("service ready")
|
|
return
|
|
}
|
|
}(ctx, service)
|
|
}
|
|
wg.Wait()
|
|
close(depChan)
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Debug("canceled")
|
|
return
|
|
case <-depChan: // services are ready
|
|
log.Debug("all services ready")
|
|
return
|
|
case <-time.After(opt.ServicesTimeout):
|
|
log.Debug("services not ready")
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func (s Server) resolveService(service string) (addr string, u *url.URL, err error) {
|
|
addr = service
|
|
|
|
if strings.Contains(addr, "://") {
|
|
// Is service an URL?
|
|
u, err = url.Parse(addr)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
addr = u.Host
|
|
|
|
if u.Port() == "" {
|
|
if u.Scheme == "https" {
|
|
addr += ":443"
|
|
}
|
|
}
|
|
}
|
|
|
|
// Default to port 80
|
|
if !strings.Contains(addr, ":") {
|
|
addr += ":80"
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (s Server) probeService(ctx context.Context, addr string) (err error) {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dialer := net.Dialer{}
|
|
_, err = dialer.DialContext(ctx, "tcp", addr)
|
|
return
|
|
}
|
|
|
|
func (s Server) probeServiceURL(ctx context.Context, u *url.URL) error {
|
|
req, err := http.NewRequest("GET", u.String(), nil)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to assemble service request")
|
|
}
|
|
|
|
rsp, err := http.DefaultClient.Do(req.WithContext(ctx))
|
|
if err != nil {
|
|
return errors.Wrap(err, "service URL request failed")
|
|
}
|
|
|
|
defer rsp.Body.Close()
|
|
if rsp.StatusCode == http.StatusOK {
|
|
return nil
|
|
}
|
|
|
|
return errors.Errorf("service responded with unexpected status '%s'", rsp.Status)
|
|
}
|