3
0

import some websocket groundwork

This commit is contained in:
Tit Petric
2018-07-19 20:19:57 +02:00
parent a7957fa804
commit 60b7f712b6
12 changed files with 446 additions and 0 deletions

15
sam/websocket/auth.go Normal file
View File

@@ -0,0 +1,15 @@
package websocket
import (
"net/http"
)
var pass = func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
})
}
func (*Websocket) Authenticator() func(http.Handler) http.Handler {
return pass
}

26
sam/websocket/flags.go Normal file
View File

@@ -0,0 +1,26 @@
package websocket
import (
"time"
)
type (
configuration struct {
writeTimeout time.Duration
pingTimeout time.Duration
pingPeriod time.Duration
}
)
var config configuration
func (c configuration) validate() error {
return nil
}
// Flags should be called from main to register flags
func Flags() {
config.writeTimeout = 15 * time.Second
config.pingTimeout = 120 * time.Second
config.pingPeriod = (config.pingTimeout * 10) / 9
}

22
sam/websocket/incoming.go Normal file
View File

@@ -0,0 +1,22 @@
package websocket
import (
"encoding/json"
"log"
"github.com/crusttech/crust/sam/websocket/incoming"
"github.com/pkg/errors"
)
func (s *Session) dispatch(raw []byte) error {
log.Printf("%s> %s", s.remoteAddr, string(raw))
msg := incoming.Message{}.New()
if err := json.Unmarshal(raw, msg); err != nil {
return errors.Wrap(err, "Session.incoming: malformed json payload")
}
// @todo: do stuff with msg
return nil
}

View File

@@ -0,0 +1,33 @@
package incoming
import (
"time"
)
type Message struct {
// User login
Login *Login `json:"login"`
// Channel actions
Join *Join `json:"join"`
Leave *Leave `json:"leave"`
// Get channel message history
History *History `json:"history"`
// Message actions
Create *Create `json:"create"`
Edit *Edit `json:"edit"`
Delete *Delete `json:"delete"`
// Client notifications (message received, message read, typing indicator)
Note *Note `json:"note"`
timestamp time.Time
}
func (Message) New() *Message {
return &Message{
timestamp: time.Now().UTC(),
}
}

View File

@@ -0,0 +1,44 @@
package incoming
type Login struct {
Username string `json:"username,omitempty"`
Password []byte `json:"password"`
}
type Join struct {
Topic string `json:"topic"`
}
type Leave struct {
Topic string `json:"topic"`
}
type History struct {
Topic string `json:"topic"`
// if 0 = last 50 messages, else where message.id < Since
Since uint64 `json:"since,omitempty"`
// @todo: extend API (search,...)
}
type Create struct {
Topic string `json:"topic"`
Content interface{} `json:"content"`
}
type Edit struct {
ID string `json:"id"`
Topic string `json:"topic"`
Content interface{} `json:"content"`
}
type Delete struct {
ID string `json:"id"`
Topic string `json:"topic"`
}
type Note struct {
Topic string `json:"topic"`
Event string `json:"what"`
}

View File

@@ -0,0 +1,28 @@
package outgoing
import (
"time"
"github.com/titpetric/factory"
)
type Message struct {
Error *Error `json:"error,omitempty"`
// @todo: implement outgoing message types
id uint64
timestamp time.Time
}
func (Message) New() *Message {
return &Message{
id: factory.Sonyflake.NextID(),
timestamp: time.Now().UTC(),
}
}
func (m *Message) FromError(err error) *Message {
m.Error = &Error{err.Error()}
return m
}

View File

@@ -0,0 +1,5 @@
package outgoing
type Error struct {
Message string `json:"message"`
}

15
sam/websocket/router.go Normal file
View File

@@ -0,0 +1,15 @@
package websocket
import (
"github.com/go-chi/chi"
)
func MountRoutes(r chi.Router) {
websocket := Websocket{}.New()
r.Group(func(r chi.Router) {
r.Use(websocket.Authenticator())
r.Route("/websocket", func(r chi.Router) {
r.Get("/", websocket.Open)
})
})
}

122
sam/websocket/session.go Normal file
View File

@@ -0,0 +1,122 @@
package websocket
import (
"log"
"time"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/crusttech/crust/sam/websocket/outgoing"
)
type (
// Session
Session struct {
id uint64
conn *websocket.Conn
subs *Subscriptions
send chan interface{}
stop chan interface{}
detach chan string
remoteAddr string
config configuration
}
)
func (Session) New(conn *websocket.Conn) *Session {
return &Session{
conn: conn,
config: config,
subs: Subscriptions{}.New(),
send: make(chan interface{}, 512),
stop: make(chan interface{}, 1),
detach: make(chan string, 64),
}
}
func (sess *Session) Handle() error {
go sess.readLoop()
return sess.writeLoop()
}
func (sess *Session) Close() {
sess.conn.Close()
}
func (sess *Session) readLoop() error {
defer func() {
log.Println("serveWebsocket - stop")
sess.Close()
}()
sess.conn.SetReadDeadline(time.Now().Add(sess.config.pingTimeout))
sess.conn.SetPongHandler(func(string) error {
sess.conn.SetReadDeadline(time.Now().Add(sess.config.pingTimeout))
return nil
})
sess.remoteAddr = sess.conn.RemoteAddr().String()
for {
_, raw, err := sess.conn.ReadMessage()
if err != nil {
return errors.Wrap(err, "sess.readLoop")
}
if err := sess.dispatch(raw); err != nil {
// @todo: log error?
sess.send <- outgoing.Message{}.New().FromError(err)
}
}
}
func (sess *Session) writeLoop() error {
ticker := time.NewTicker(sess.config.pingPeriod)
defer func() {
ticker.Stop()
sess.Close() // break readLoop
}()
write := func(mt int, msg interface{}) error {
var bits []byte
if msg != nil {
bits = msg.([]byte)
} else {
// PintMessage = empty frame
bits = []byte{}
}
sess.conn.SetWriteDeadline(time.Now().Add(sess.config.writeTimeout))
return sess.conn.WriteMessage(mt, bits)
}
for {
select {
case msg, ok := <-sess.send:
if !ok {
// channel closed
return nil
}
if err := write(websocket.TextMessage, msg); err != nil {
return errors.Wrap(err, "writeLoop send")
}
case msg := <-sess.stop:
// Shutdown requested, don't care if the message is delivered
if msg != nil {
write(websocket.TextMessage, msg)
}
return nil
case topic := <-sess.detach:
sess.subs.Delete(topic)
case <-ticker.C:
if err := write(websocket.PingMessage, nil); err != nil {
return errors.Wrap(err, "writeLoop ping")
}
}
}
}

44
sam/websocket/store.go Normal file
View File

@@ -0,0 +1,44 @@
package websocket
import (
"github.com/titpetric/factory"
"sync"
)
type (
Store struct {
sync.RWMutex
Sessions map[uint64]*Session
}
)
func (Store) New() *Store {
return &Store{sync.RWMutex{}, make(map[uint64]*Session)}
}
var store *Store
func init() {
store = Store{}.New()
}
func (s *Store) Save(session *Session) *Session {
session.id = factory.Sonyflake.NextID()
s.Lock()
defer s.Unlock()
s.Sessions[session.id] = session
return session
}
func (s *Store) Get(id uint64) *Session {
s.RLock()
defer s.RUnlock()
return s.Sessions[id]
}
func (s *Store) Delete(id uint64) {
s.Lock()
defer s.Unlock()
delete(s.Sessions, id)
}

View File

@@ -0,0 +1,51 @@
package websocket
import (
"sync"
)
type (
// A subscription holds a "channel" that the user is joined to
Subscription struct {
}
// A list of all user-joined channels
Subscriptions struct {
sync.RWMutex
Subscriptions map[string]*Subscription
}
)
func (Subscriptions) New() *Subscriptions {
return &Subscriptions{sync.RWMutex{}, make(map[string]*Subscription)}
}
// @todo: load/save all subscriptions from database
func (s *Subscriptions) Add(name string, sub *Subscription) string {
s.Lock()
defer s.Unlock()
s.Subscriptions[name] = sub
return name
}
func (s *Subscriptions) Get(name string) *Subscription {
s.RLock()
defer s.RUnlock()
return s.Subscriptions[name]
}
func (s *Subscriptions) Delete(name string) {
s.Lock()
defer s.Unlock()
delete(s.Subscriptions, name)
}
func (s *Subscriptions) DeleteAll() {
s.Lock()
defer s.Unlock()
for index, _ := range s.Subscriptions {
delete(s.Subscriptions, index)
}
}

View File

@@ -0,0 +1,41 @@
package websocket
import (
"net/http"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/titpetric/factory/resputil"
)
type (
Websocket struct{}
)
func (Websocket) New() *Websocket {
return &Websocket{}
}
// Handles websocket requests from peers
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// Allow connections from any Origin
CheckOrigin: func(r *http.Request) bool { return true },
}
func (Websocket) Open(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if _, ok := err.(websocket.HandshakeError); ok {
resputil.JSON(w, errors.Wrap(err, "ws: need a websocket handshake"))
return
} else if err != nil {
resputil.JSON(w, errors.Wrap(err, "ws: failed to upgrade connection"))
return
}
session := store.Save(Session{}.New(ws))
if err := session.Handle(); err != nil {
// @todo: log error, because at this point we can't really write it to w
}
}