diff --git a/sam/websocket/session.go b/sam/websocket/session.go index 5605dbd98..123c51bf8 100644 --- a/sam/websocket/session.go +++ b/sam/websocket/session.go @@ -20,9 +20,8 @@ type ( subs *Subscriptions - send chan interface{} - stop chan interface{} - detach chan string + send chan []byte + stop chan []byte remoteAddr string @@ -36,9 +35,8 @@ func (Session) New(ctx context.Context, conn *websocket.Conn) *Session { ctx: ctx, config: config, subs: Subscriptions{}.New(), - send: make(chan interface{}, 512), - stop: make(chan interface{}, 1), - detach: make(chan string, 64), + send: make(chan []byte, 512), + stop: make(chan []byte, 1), } } @@ -76,7 +74,10 @@ func (sess *Session) readLoop() error { if err := sess.dispatch(raw); err != nil { // @todo: log error? - sess.send <- outgoing.NewError(err) + sess.send <- func() []byte { + b, _ := outgoing.NewError(err).EncodeMessage() + return b + }() } } } @@ -89,18 +90,17 @@ func (sess *Session) writeLoop() error { sess.Close() // break readLoop }() - write := func(mt int, msg interface{}) error { + write := func(msg []byte) error { sess.conn.SetWriteDeadline(time.Now().Add(sess.config.writeTimeout)) - - switch msg := msg.(type) { - case outgoing.MessageEncoder: - return sess.conn.WriteJSON(msg) - case []byte: - return sess.conn.WriteMessage(mt, msg) - default: - return sess.conn.WriteMessage(mt, nil) + if msg != nil { + return sess.conn.WriteMessage(websocket.TextMessage, msg) } + return nil + } + ping := func() error { + sess.conn.SetWriteDeadline(time.Now().Add(sess.config.writeTimeout)) + return sess.conn.WriteMessage(websocket.PingMessage, nil) } for { @@ -111,21 +111,17 @@ func (sess *Session) writeLoop() error { return nil } - if err := write(websocket.TextMessage, msg); err != nil { + if err := write(msg); err != nil { return errors.Wrap(err, "writeLoop send") } + case msg := <-sess.stop: // Shutdown requested, don't care if the message is delivered - if msg != nil { - write(websocket.TextMessage, msg) - } + write(msg) return nil - case topic := <-sess.detach: - sess.subs.Delete(topic) - case <-ticker.C: - if err := write(websocket.PingMessage, nil); err != nil { + if err := ping(); err != nil { return errors.Wrap(err, "writeLoop ping") } }