From d0c3cb066ce3762dfdb84c406a1e598ed62c6e6c Mon Sep 17 00:00:00 2001 From: Alex X Date: Mon, 21 Apr 2025 20:08:16 +0300 Subject: [PATCH 01/15] Rewrite exec backchannel --- internal/exec/exec.go | 4 +-- pkg/core/codec.go | 33 +++++++++++++++++++ pkg/pcm/backchannel.go | 69 ++++++++++++++++++++++++++++++++++++++++ pkg/stdin/backchannel.go | 59 ---------------------------------- pkg/stdin/client.go | 33 ------------------- 5 files changed, 104 insertions(+), 94 deletions(-) create mode 100644 pkg/pcm/backchannel.go delete mode 100644 pkg/stdin/backchannel.go delete mode 100644 pkg/stdin/client.go diff --git a/internal/exec/exec.go b/internal/exec/exec.go index 89add393..711be8a2 100644 --- a/internal/exec/exec.go +++ b/internal/exec/exec.go @@ -19,9 +19,9 @@ import ( "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/magic" + "github.com/AlexxIT/go2rtc/pkg/pcm" pkg "github.com/AlexxIT/go2rtc/pkg/rtsp" "github.com/AlexxIT/go2rtc/pkg/shell" - "github.com/AlexxIT/go2rtc/pkg/stdin" "github.com/rs/zerolog" ) @@ -86,7 +86,7 @@ func execHandle(rawURL string) (prod core.Producer, err error) { } if query.Get("backchannel") == "1" { - return stdin.NewClient(cmd) + return pcm.NewBackchannel(cmd, query.Get("audio")) } if path == "" { diff --git a/pkg/core/codec.go b/pkg/core/codec.go index 708839b3..c7791df9 100644 --- a/pkg/core/codec.go +++ b/pkg/core/codec.go @@ -249,3 +249,36 @@ func DecodeH264(fmtp string) (profile string, level byte) { } return } + +func ParseCodecString(s string) *Codec { + var codec Codec + + ss := strings.Split(s, "/") + switch strings.ToLower(ss[0]) { + case "pcm_s16be", "s16be", "pcm": + codec.Name = CodecPCM + case "pcm_s16le", "s16le", "pcml": + codec.Name = CodecPCML + case "pcm_alaw", "alaw", "pcma": + codec.Name = CodecPCMA + case "pcm_mulaw", "mulaw", "pcmu": + codec.Name = CodecPCMU + case "aac", "mpeg4-generic": + codec.Name = CodecAAC + case "opus": + codec.Name = CodecOpus + case "flac": + codec.Name = CodecFLAC + default: + return nil + } + + if len(ss) >= 2 { + codec.ClockRate = uint32(Atoi(ss[1])) + } + if len(ss) >= 3 { + codec.Channels = uint16(Atoi(ss[1])) + } + + return &codec +} diff --git a/pkg/pcm/backchannel.go b/pkg/pcm/backchannel.go new file mode 100644 index 00000000..99b6e3aa --- /dev/null +++ b/pkg/pcm/backchannel.go @@ -0,0 +1,69 @@ +package pcm + +import ( + "errors" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/shell" + "github.com/pion/rtp" +) + +type Backchannel struct { + core.Connection + cmd *shell.Command +} + +func NewBackchannel(cmd *shell.Command, audio string) (core.Producer, error) { + var codec *core.Codec + + if audio == "" { + // default codec + codec = &core.Codec{Name: core.CodecPCML, ClockRate: 16000} + } else if codec = core.ParseCodecString(audio); codec == nil { + return nil, errors.New("pcm: unsupported audio format: " + audio) + } + + medias := []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{codec}, + }, + } + + return &Backchannel{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "pcm", + Protocol: "pipe", + Medias: medias, + Transport: cmd, + }, + cmd: cmd, + }, nil +} + +func (c *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + return nil, core.ErrCantGetTrack +} + +func (c *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + wr, err := c.cmd.StdinPipe() + if err != nil { + return err + } + + sender := core.NewSender(media, track.Codec) + sender.Handler = func(packet *rtp.Packet) { + if n, err := wr.Write(packet.Payload); err != nil { + c.Send += n + } + } + sender.HandleRTP(track) + c.Senders = append(c.Senders, sender) + return nil +} + +func (c *Backchannel) Start() error { + return c.cmd.Run() +} diff --git a/pkg/stdin/backchannel.go b/pkg/stdin/backchannel.go deleted file mode 100644 index b154a291..00000000 --- a/pkg/stdin/backchannel.go +++ /dev/null @@ -1,59 +0,0 @@ -package stdin - -import ( - "encoding/json" - - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/pion/rtp" -) - -func (c *Client) GetMedias() []*core.Media { - return c.medias -} - -func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - return nil, core.ErrCantGetTrack -} - -func (c *Client) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error { - if c.sender == nil { - stdin, err := c.cmd.StdinPipe() - if err != nil { - return err - } - - c.sender = core.NewSender(media, track.Codec) - c.sender.Handler = func(packet *rtp.Packet) { - _, _ = stdin.Write(packet.Payload) - c.send += len(packet.Payload) - } - } - - c.sender.HandleRTP(track) - return nil -} - -func (c *Client) Start() (err error) { - return c.cmd.Run() -} - -func (c *Client) Stop() (err error) { - if c.sender != nil { - c.sender.Close() - } - return c.cmd.Close() -} - -func (c *Client) MarshalJSON() ([]byte, error) { - info := &core.Connection{ - ID: core.ID(c), - FormatName: "exec", - Protocol: "pipe", - Medias: c.medias, - Send: c.send, - } - if c.sender != nil { - info.Senders = []*core.Sender{c.sender} - } - return json.Marshal(info) -} diff --git a/pkg/stdin/client.go b/pkg/stdin/client.go deleted file mode 100644 index a77d4459..00000000 --- a/pkg/stdin/client.go +++ /dev/null @@ -1,33 +0,0 @@ -package stdin - -import ( - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/shell" -) - -// Deprecated: should be rewritten to core.Connection -type Client struct { - cmd *shell.Command - - medias []*core.Media - sender *core.Sender - send int -} - -func NewClient(cmd *shell.Command) (*Client, error) { - c := &Client{ - cmd: cmd, - medias: []*core.Media{ - { - Kind: core.KindAudio, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{ - {Name: core.CodecPCMA, ClockRate: 8000}, - {Name: core.CodecPCM}, - }, - }, - }, - } - - return c, nil -} From 7fe23c7bc5b480cadb24a4f3ec131ef3478ff423 Mon Sep 17 00:00:00 2001 From: Alex X Date: Mon, 21 Apr 2025 20:09:04 +0300 Subject: [PATCH 02/15] Add wav backchannel (not used yet) --- pkg/wav/backchannel.go | 67 ++++++++++++++++++++++++++++++++++++++++++ pkg/wav/wav.go | 50 +++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 pkg/wav/backchannel.go create mode 100644 pkg/wav/wav.go diff --git a/pkg/wav/backchannel.go b/pkg/wav/backchannel.go new file mode 100644 index 00000000..f9697ee4 --- /dev/null +++ b/pkg/wav/backchannel.go @@ -0,0 +1,67 @@ +package wav + +import ( + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/shell" + "github.com/pion/rtp" +) + +type Backchannel struct { + core.Connection + cmd *shell.Command +} + +func NewBackchannel(cmd *shell.Command) (core.Producer, error) { + medias := []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + //{Name: core.CodecPCML}, + {Name: core.CodecPCMA}, + {Name: core.CodecPCMU}, + }, + }, + } + + return &Backchannel{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "wav", + Protocol: "pipe", + Medias: medias, + Transport: cmd, + }, + cmd: cmd, + }, nil +} + +func (c *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + return nil, core.ErrCantGetTrack +} + +func (c *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + wr, err := c.cmd.StdinPipe() + if err != nil { + return err + } + + b := Header(track.Codec) + if _, err = wr.Write(b); err != nil { + return err + } + + sender := core.NewSender(media, track.Codec) + sender.Handler = func(packet *rtp.Packet) { + if n, err := wr.Write(packet.Payload); err != nil { + c.Send += n + } + } + sender.HandleRTP(track) + c.Senders = append(c.Senders, sender) + return nil +} + +func (c *Backchannel) Start() error { + return c.cmd.Run() +} diff --git a/pkg/wav/wav.go b/pkg/wav/wav.go new file mode 100644 index 00000000..bf48fdf9 --- /dev/null +++ b/pkg/wav/wav.go @@ -0,0 +1,50 @@ +package wav + +import ( + "encoding/binary" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func Header(codec *core.Codec) []byte { + var fmt, size, extra byte + + switch codec.Name { + case core.CodecPCML: + fmt = 1 + size = 2 + case core.CodecPCMA: + fmt = 6 + size = 1 + extra = 2 + case core.CodecPCMU: + fmt = 7 + size = 1 + extra = 2 + default: + return nil + } + + channels := byte(codec.Channels) + if channels == 0 { + channels = 1 + } + + b := make([]byte, 0, 46) // cap with extra + b = append(b, "RIFF\xFF\xFF\xFF\xFFWAVEfmt "...) + + b = append(b, 0x10+extra, 0, 0, 0) + b = append(b, fmt, 0) + b = append(b, channels, 0) + b = binary.LittleEndian.AppendUint32(b, codec.ClockRate) + b = binary.LittleEndian.AppendUint32(b, uint32(size*channels)*codec.ClockRate) + b = append(b, size*channels, 0) + b = append(b, size*8, 0) + if extra > 0 { + b = append(b, 0, 0) // ExtraParamSize (if PCM, then doesn't exist) + } + + b = append(b, "data\xFF\xFF\xFF\xFF"...) + + return b +} From 902af5e5d7752e6494abf156af9680b47b28262a Mon Sep 17 00:00:00 2001 From: Alex X Date: Mon, 21 Apr 2025 20:30:38 +0300 Subject: [PATCH 03/15] Add wyoming module --- internal/streams/play.go | 16 +- internal/wyoming/wyoming.go | 70 +++++++ main.go | 2 + pkg/core/codec.go | 2 +- pkg/pcm/pcm.go | 20 ++ pkg/pcm/s16le/s16le.go | 42 ++++ pkg/wyoming/api.go | 98 +++++++++ pkg/wyoming/backchannel.go | 42 ++++ pkg/wyoming/producer.go | 43 ++++ pkg/wyoming/satellite.go | 395 ++++++++++++++++++++++++++++++++++++ pkg/wyoming/wakeword.go | 120 +++++++++++ pkg/wyoming/wyoming.go | 42 ++++ 12 files changed, 886 insertions(+), 6 deletions(-) create mode 100644 internal/wyoming/wyoming.go create mode 100644 pkg/pcm/s16le/s16le.go create mode 100644 pkg/wyoming/api.go create mode 100644 pkg/wyoming/backchannel.go create mode 100644 pkg/wyoming/producer.go create mode 100644 pkg/wyoming/satellite.go create mode 100644 pkg/wyoming/wakeword.go create mode 100644 pkg/wyoming/wyoming.go diff --git a/internal/streams/play.go b/internal/streams/play.go index d72c5e0c..1f8c4ade 100644 --- a/internal/streams/play.go +++ b/internal/streams/play.go @@ -7,7 +7,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/core" ) -func (s *Stream) Play(source string) error { +func (s *Stream) Play(urlOrProd any) error { s.mu.Lock() for _, producer := range s.producers { if producer.state == stateInternal && producer.conn != nil { @@ -16,12 +16,18 @@ func (s *Stream) Play(source string) error { } s.mu.Unlock() - if source == "" { - return nil - } - + var source string var src core.Producer + switch urlOrProd.(type) { + case string: + if source = urlOrProd.(string); source == "" { + return nil + } + case core.Producer: + src = urlOrProd.(core.Producer) + } + for _, producer := range s.producers { if producer.conn == nil { continue diff --git a/internal/wyoming/wyoming.go b/internal/wyoming/wyoming.go new file mode 100644 index 00000000..41ae27c3 --- /dev/null +++ b/internal/wyoming/wyoming.go @@ -0,0 +1,70 @@ +package wyoming + +import ( + "net" + + "github.com/AlexxIT/go2rtc/internal/app" + "github.com/AlexxIT/go2rtc/internal/streams" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/wyoming" + "github.com/rs/zerolog" +) + +func Init() { + streams.HandleFunc("wyoming", wyoming.Dial) + + // server + var cfg struct { + Mod map[string]struct { + Listen string `yaml:"listen"` + Name string `yaml:"name"` + WakeURI string `yaml:"wake_uri"` + VADThreshold float32 `yaml:"vad_threshold"` + } `yaml:"wyoming"` + } + app.LoadConfig(&cfg) + + log = app.GetLogger("wyoming") + + for name, cfg := range cfg.Mod { + stream := streams.Get(name) + if stream == nil { + log.Warn().Msgf("[wyoming] missing stream: %s", name) + continue + } + + ln, err := net.Listen("tcp", cfg.Listen) + if err != nil { + log.Warn().Msgf("[wyoming] listen error: %s", err) + continue + } + + if cfg.Name == "" { + cfg.Name = name + } + + srv := wyoming.Server{ + Name: cfg.Name, + VADThreshold: int16(1000 * cfg.VADThreshold), // 1.0 => 1000 + WakeURI: cfg.WakeURI, + MicHandler: func(cons core.Consumer) error { + if err := stream.AddConsumer(cons); err != nil { + return err + } + // not best solution + if i, ok := cons.(interface{ OnClose(func()) }); ok { + i.OnClose(func() { + stream.RemoveConsumer(cons) + }) + } + return nil + }, + SndHandler: func(prod core.Producer) error { + return stream.Play(prod) + }, + } + go srv.Serve(ln) + } +} + +var log zerolog.Logger diff --git a/main.go b/main.go index f8aba89e..295de219 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,7 @@ import ( "github.com/AlexxIT/go2rtc/internal/v4l2" "github.com/AlexxIT/go2rtc/internal/webrtc" "github.com/AlexxIT/go2rtc/internal/webtorrent" + "github.com/AlexxIT/go2rtc/internal/wyoming" "github.com/AlexxIT/go2rtc/pkg/shell" ) @@ -69,6 +70,7 @@ func main() { hass.Init() // hass source, Hass API server onvif.Init() // onvif source, ONVIF API server webtorrent.Init() // webtorrent source, WebTorrent module + wyoming.Init() // 5. Other sources diff --git a/pkg/core/codec.go b/pkg/core/codec.go index c7791df9..ba0c656a 100644 --- a/pkg/core/codec.go +++ b/pkg/core/codec.go @@ -277,7 +277,7 @@ func ParseCodecString(s string) *Codec { codec.ClockRate = uint32(Atoi(ss[1])) } if len(ss) >= 3 { - codec.Channels = uint16(Atoi(ss[1])) + codec.Channels = uint8(Atoi(ss[1])) } return &codec diff --git a/pkg/pcm/pcm.go b/pkg/pcm/pcm.go index 6872c503..bf54a6cf 100644 --- a/pkg/pcm/pcm.go +++ b/pkg/pcm/pcm.go @@ -185,3 +185,23 @@ func Transcode(dst, src *core.Codec) func([]byte) []byte { return writer(samples) } } + +func ConsumerCodecs() []*core.Codec { + return []*core.Codec{ + {Name: core.CodecPCML}, + {Name: core.CodecPCM}, + {Name: core.CodecPCMA}, + {Name: core.CodecPCMU}, + } +} + +func ProducerCodecs() []*core.Codec { + return []*core.Codec{ + {Name: core.CodecPCML, ClockRate: 16000}, + {Name: core.CodecPCM, ClockRate: 16000}, + {Name: core.CodecPCML, ClockRate: 8000}, + {Name: core.CodecPCM, ClockRate: 8000}, + {Name: core.CodecPCMA, ClockRate: 8000}, + {Name: core.CodecPCMU, ClockRate: 8000}, + } +} diff --git a/pkg/pcm/s16le/s16le.go b/pkg/pcm/s16le/s16le.go new file mode 100644 index 00000000..acd2d4fc --- /dev/null +++ b/pkg/pcm/s16le/s16le.go @@ -0,0 +1,42 @@ +package s16le + +func PeaksRMS(b []byte) int16 { + // RMS of sine wave = peak / sqrt2 + // https://en.wikipedia.org/wiki/Root_mean_square + // https://www.youtube.com/watch?v=MUDkL4KZi0I + var peaks int32 + var peaksSum int32 + var prevSample int16 + var prevUp bool + + var i int + for n := len(b); i < n; { + lo := b[i] + i++ + hi := b[i] + i++ + + sample := int16(hi)<<8 | int16(lo) + up := sample >= prevSample + + if i >= 4 { + if up != prevUp { + if prevSample >= 0 { + peaksSum += int32(prevSample) + } else { + peaksSum -= int32(prevSample) + } + peaks++ + } + } + + prevSample = sample + prevUp = up + } + + if peaks == 0 { + return 0 + } + + return int16(peaksSum / peaks) +} diff --git a/pkg/wyoming/api.go b/pkg/wyoming/api.go new file mode 100644 index 00000000..59de747c --- /dev/null +++ b/pkg/wyoming/api.go @@ -0,0 +1,98 @@ +package wyoming + +import ( + "bufio" + "encoding/json" + "io" + "net" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +type API struct { + conn net.Conn + rd *bufio.Reader +} + +func DialAPI(address string) (*API, error) { + conn, err := net.DialTimeout("tcp", address, core.ConnDialTimeout) + if err != nil { + return nil, err + } + + return NewAPI(conn), nil +} + +const Version = "1.5.4" + +func NewAPI(conn net.Conn) *API { + return &API{conn: conn, rd: bufio.NewReader(conn)} +} + +func (w *API) WriteEvent(evt *Event) (err error) { + hdr := EventHeader{ + Type: evt.Type, + Version: Version, + DataLength: len(evt.Data), + PayloadLength: len(evt.Payload), + } + + buf, err := json.Marshal(hdr) + if err != nil { + return err + } + + buf = append(buf, '\n') + buf = append(buf, evt.Data...) + buf = append(buf, evt.Payload...) + + _, err = w.conn.Write(buf) + return err +} + +func (w *API) ReadEvent() (*Event, error) { + data, err := w.rd.ReadBytes('\n') + if err != nil { + return nil, err + } + + var hdr EventHeader + if err = json.Unmarshal(data, &hdr); err != nil { + return nil, err + } + + evt := Event{Type: hdr.Type} + + if hdr.DataLength > 0 { + evt.Data = make([]byte, hdr.DataLength) + if _, err = io.ReadFull(w.rd, evt.Data); err != nil { + return nil, err + } + } + + if hdr.PayloadLength > 0 { + evt.Payload = make([]byte, hdr.PayloadLength) + if _, err = io.ReadFull(w.rd, evt.Payload); err != nil { + return nil, err + } + } + + return &evt, nil +} + +func (w *API) Close() error { + return w.conn.Close() +} + +type Event struct { + Type string + Data []byte + Payload []byte +} + +type EventHeader struct { + Type string `json:"type"` + Version string `json:"version"` + DataLength int `json:"data_length,omitempty"` + PayloadLength int `json:"payload_length,omitempty"` +} diff --git a/pkg/wyoming/backchannel.go b/pkg/wyoming/backchannel.go new file mode 100644 index 00000000..8760789e --- /dev/null +++ b/pkg/wyoming/backchannel.go @@ -0,0 +1,42 @@ +package wyoming + +import ( + "fmt" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/pion/rtp" +) + +type Backchannel struct { + core.Connection + api *API +} + +func (b *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + return nil, core.ErrCantGetTrack +} + +func (b *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + sender := core.NewSender(media, codec) + sender.Handler = func(pkt *rtp.Packet) { + ts := time.Now().Nanosecond() + evt := &Event{ + Type: "audio-chunk", + Data: []byte(fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, ts)), + Payload: pkt.Payload, + } + _ = b.api.WriteEvent(evt) + } + sender.HandleRTP(track) + b.Senders = append(b.Senders, sender) + return nil +} + +func (b *Backchannel) Start() error { + for { + if _, err := b.api.ReadEvent(); err != nil { + return err + } + } +} diff --git a/pkg/wyoming/producer.go b/pkg/wyoming/producer.go new file mode 100644 index 00000000..9cd6abb6 --- /dev/null +++ b/pkg/wyoming/producer.go @@ -0,0 +1,43 @@ +package wyoming + +import ( + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/pion/rtp" +) + +type Producer struct { + core.Connection + api *API +} + +func (p *Producer) Start() error { + var seq uint16 + var ts uint32 + + for { + evt, err := p.api.ReadEvent() + if err != nil { + return err + } + + if evt.Type != "audio-chunk" { + continue + } + + p.Recv += len(evt.Payload) + + pkt := &core.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + SequenceNumber: seq, + Timestamp: ts, + }, + Payload: evt.Payload, + } + p.Receivers[0].WriteRTP(pkt) + + seq++ + ts += uint32(len(evt.Payload) / 2) + } +} diff --git a/pkg/wyoming/satellite.go b/pkg/wyoming/satellite.go new file mode 100644 index 00000000..14bded32 --- /dev/null +++ b/pkg/wyoming/satellite.go @@ -0,0 +1,395 @@ +package wyoming + +import ( + "errors" + "fmt" + "net" + "sync" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/pcm" + "github.com/AlexxIT/go2rtc/pkg/pcm/s16le" + "github.com/pion/rtp" +) + +type Server struct { + Name string + + VADThreshold int16 + WakeURI string + + MicHandler func(cons core.Consumer) error + SndHandler func(prod core.Producer) error +} + +func (s *Server) Serve(l net.Listener) error { + for { + conn, err := l.Accept() + if err != nil { + return err + } + + go s.Handle(conn) + } +} + +func (s *Server) Handle(conn net.Conn) error { + api := NewAPI(conn) + sat := newSatellite(api, s) + defer sat.Close() + + //log.Debug().Msgf("[wyoming] new client: %s", conn.RemoteAddr()) + + var snd []byte + + for { + evt, err := api.ReadEvent() + if err != nil { + return err + } + + //log.Printf("%s %s %d", evt.Type, evt.Data, len(evt.Payload)) + + switch evt.Type { + case "ping": // {"text": null} + _ = api.WriteEvent(&Event{Type: "pong", Data: evt.Data}) + case "describe": + // {"asr": [], "tts": [], "handle": [], "intent": [], "wake": [], "satellite": {"name": "my satellite", "attribution": {"name": "", "url": ""}, "installed": true, "description": "my satellite", "version": "1.4.1", "area": null, "snd_format": null}} + data := fmt.Sprintf(`{"satellite":{"name":%q,"attribution":{"name":"go2rtc","url":"https://github.com/AlexxIT/go2rtc"},"installed":true}}`, s.Name) + _ = api.WriteEvent(&Event{Type: "info", Data: []byte(data)}) + case "run-satellite": + if err = sat.run(); err != nil { + return err + } + case "pause-satellite": + sat.pause() + case "detect": // WAKE_WORD_START {"names": null} + case "detection": // WAKE_WORD_END {"name": "ok_nabu_v0.1", "timestamp": 17580, "speaker": null} + case "transcribe": // STT_START {"language": "en"} + case "voice-started": // STT_VAD_START {"timestamp": 1160} + case "voice-stopped": // STT_VAD_END {"timestamp": 2470} + sat.idle() + case "transcript": // STT_END {"text": "how are you"} + case "synthesize": // TTS_START {"text": "Sorry, I couldn't understand that", "voice": {"language": "en"}} + case "audio-start": // TTS_END {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0} + snd = snd[:0] + case "audio-chunk": // {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0} + snd = append(snd, evt.Payload...) + case "audio-stop": // {"timestamp": 2.880000000000002} + sat.respond(snd) + case "error": + sat.start() + } + } +} + +// states like Home Assistant +const ( + stateUnavailable = iota + stateIdle + stateWaitVAD // aka wait VAD + stateWaitWakeWord + stateStreaming +) + +type satellite struct { + api *API + srv *Server + + state uint8 + mu sync.Mutex + + timestamp int + + mic *micConsumer + wake *WakeWord +} + +func newSatellite(api *API, srv *Server) *satellite { + sat := &satellite{api: api, srv: srv} + return sat +} + +func (s *satellite) Close() error { + s.pause() + return s.api.Close() +} + +func (s *satellite) run() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.state != stateUnavailable { + return errors.New("wyoming: wrong satellite state") + } + + s.mic = newMicConsumer(s.onMicChunk) + s.mic.RemoteAddr = s.api.conn.RemoteAddr().String() + + if err := s.srv.MicHandler(s.mic); err != nil { + return err + } + + s.state = stateIdle + go s.start() + + return nil +} + +func (s *satellite) pause() { + s.mu.Lock() + + s.state = stateUnavailable + if s.mic != nil { + if s.mic.onClose != nil { + s.mic.onClose() + } + _ = s.mic.Stop() + s.mic = nil + } + if s.wake != nil { + _ = s.wake.Close() + s.wake = nil + } + + s.mu.Unlock() +} + +func (s *satellite) start() { + s.mu.Lock() + + if s.state != stateUnavailable { + s.state = stateWaitVAD + } + + s.mu.Unlock() +} + +func (s *satellite) idle() { + s.mu.Lock() + + if s.state != stateUnavailable { + s.state = stateIdle + } + + s.mu.Unlock() +} + +const wakeTimeout = 5 * 2 * 16000 // 5 seconds + +func (s *satellite) onMicChunk(chunk []byte) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.state == stateIdle { + return + } + + if s.state == stateWaitVAD { + // tests show that values over 1000 are most likely speech + if s.srv.VADThreshold == 0 || s16le.PeaksRMS(chunk) > s.srv.VADThreshold { + if s.wake == nil && s.srv.WakeURI != "" { + s.wake, _ = DialWakeWord(s.srv.WakeURI) + } + if s.wake == nil { + // some problems with wake word - redirect to HA + evt := &Event{ + Type: "run-pipeline", + Data: []byte(`{"start_stage":"wake","end_stage":"tts","restart_on_end":false}`), + } + if err := s.api.WriteEvent(evt); err != nil { + return + } + s.state = stateStreaming + } else { + s.state = stateWaitWakeWord + } + s.timestamp = 0 + } + } + + if s.state == stateWaitWakeWord { + if s.wake.Detection != "" { + // check if wake word detected + evt := &Event{ + Type: "run-pipeline", + Data: []byte(`{"start_stage":"asr","end_stage":"tts","restart_on_end":false}`), + } + _ = s.api.WriteEvent(evt) + s.state = stateStreaming + s.timestamp = 0 + } else if err := s.wake.WriteChunk(chunk); err != nil { + // wake word service failed + s.state = stateWaitVAD + _ = s.wake.Close() + s.wake = nil + } else if s.timestamp > wakeTimeout { + // wake word detection timeout + s.state = stateWaitVAD + } + } else if s.wake != nil { + _ = s.wake.Close() + s.wake = nil + } + + if s.state == stateStreaming { + data := fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, s.timestamp) + evt := &Event{Type: "audio-chunk", Data: []byte(data), Payload: chunk} + _ = s.api.WriteEvent(evt) + } + + s.timestamp += len(chunk) / 2 +} + +func (s *satellite) respond(data []byte) { + prod := newSndProducer(data, func() { + _ = s.api.WriteEvent(&Event{Type: "played"}) + s.start() + }) + if err := s.srv.SndHandler(prod); err != nil { + prod.onClose() + } +} + +type micConsumer struct { + core.Connection + onData func(chunk []byte) + onClose func() +} + +func newMicConsumer(onData func(chunk []byte)) *micConsumer { + medias := []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: pcm.ConsumerCodecs(), + }, + } + + return &micConsumer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "wyoming", + Protocol: "tcp", + Medias: medias, + }, + onData: onData, + } +} + +func (c *micConsumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + src := track.Codec + dst := &core.Codec{ + Name: core.CodecPCML, + ClockRate: 16000, + Channels: 1, + } + sender := core.NewSender(media, dst) + sender.Handler = pcm.TranscodeHandler(dst, src, + repack(func(packet *core.Packet) { + c.onData(packet.Payload) + }), + ) + sender.HandleRTP(track) + c.Senders = append(c.Senders, sender) + return nil +} + +type sndProducer struct { + core.Connection + data []byte + onClose func() +} + +func newSndProducer(data []byte, onClose func()) *sndProducer { + medias := []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionRecvonly, + Codecs: pcm.ProducerCodecs(), + }, + } + + return &sndProducer{ + core.Connection{ + ID: core.NewID(), + FormatName: "wyoming", + Protocol: "tcp", + Medias: medias, + }, + data, + onClose, + } +} + +func (s *sndProducer) Start() error { + if len(s.Receivers) == 0 { + return nil + } + + var pts time.Duration + var seq uint16 + + t0 := time.Now() + + src := &core.Codec{Name: core.CodecPCML, ClockRate: 22050} + dst := s.Receivers[0].Codec + f := pcm.Transcode(dst, src) + + bps := uint32(pcm.BytesPerFrame(dst)) + + chunkBytes := int(2 * src.ClockRate / 50) // 20ms + + for { + n := len(s.data) + if n == 0 { + break + } + if chunkBytes > n { + chunkBytes = n + } + + pkt := &core.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + SequenceNumber: seq, + Timestamp: uint32(s.Recv/2) * bps, + }, + Payload: f(s.data[:chunkBytes]), + } + + if d := pts - time.Since(t0); d > 0 { + time.Sleep(d) + } + + s.Receivers[0].WriteRTP(pkt) + + s.Recv += chunkBytes + s.data = s.data[chunkBytes:] + + pts += 10 * time.Millisecond + seq++ + } + + s.onClose() + + return nil +} + +func repack(handler core.HandlerFunc) core.HandlerFunc { + const PacketSize = 2 * 16000 / 50 // 20ms + + var buf []byte + + return func(pkt *rtp.Packet) { + buf = append(buf, pkt.Payload...) + + for len(buf) >= PacketSize { + pkt = &core.Packet{Payload: buf[:PacketSize]} + buf = buf[PacketSize:] + handler(pkt) + } + } +} diff --git a/pkg/wyoming/wakeword.go b/pkg/wyoming/wakeword.go new file mode 100644 index 00000000..3603e22a --- /dev/null +++ b/pkg/wyoming/wakeword.go @@ -0,0 +1,120 @@ +package wyoming + +import ( + "encoding/json" + "fmt" + "net/url" +) + +type WakeWord struct { + *API + names []string + send int + + Detection string +} + +func DialWakeWord(rawURL string) (*WakeWord, error) { + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + + api, err := DialAPI(u.Host) + if err != nil { + return nil, err + } + + names := u.Query()["name"] + if len(names) == 0 { + names = []string{"ok_nabu_v0.1"} + } + + wake := &WakeWord{API: api, names: names} + if err = wake.Start(); err != nil { + _ = wake.Close() + return nil, err + } + + go wake.handle() + return wake, nil +} + +func (w *WakeWord) handle() { + defer w.Close() + + for { + evt, err := w.ReadEvent() + if err != nil { + return + } + + if evt.Type == "detection" { + var data struct { + Name string `json:"name"` + } + if err = json.Unmarshal(evt.Data, &data); err != nil { + return + } + w.Detection = data.Name + } + } +} + +//func (w *WakeWord) Describe() error { +// if err := w.WriteEvent(&Event{Type: "describe"}); err != nil { +// return err +// } +// +// evt, err := w.ReadEvent() +// if err != nil { +// return err +// } +// +// var info struct { +// Wake []struct { +// Models []struct { +// Name string `json:"name"` +// } `json:"models"` +// } `json:"wake"` +// } +// if err = json.Unmarshal(evt.Data, &info); err != nil { +// return err +// } +// +// return nil +//} + +func (w *WakeWord) Start() error { + msg := struct { + Names []string `json:"names"` + }{ + Names: w.names, + } + data, err := json.Marshal(msg) + if err != nil { + return err + } + evt := &Event{Type: "detect", Data: data} + if err := w.WriteEvent(evt); err != nil { + return err + } + + evt = &Event{Type: "audio-start", Data: audioData(0)} + return w.WriteEvent(evt) +} + +func (w *WakeWord) Close() error { + return w.conn.Close() +} + +func (w *WakeWord) WriteChunk(payload []byte) error { + evt := &Event{Type: "audio-chunk", Data: audioData(w.send), Payload: payload} + w.send += len(payload) + return w.WriteEvent(evt) +} + +func audioData(send int) []byte { + // timestamp in ms = send / 2 * 1000 / 16000 = send / 32 + return []byte(fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, send/32)) +} diff --git a/pkg/wyoming/wyoming.go b/pkg/wyoming/wyoming.go new file mode 100644 index 00000000..96d1dc5e --- /dev/null +++ b/pkg/wyoming/wyoming.go @@ -0,0 +1,42 @@ +package wyoming + +import ( + "net" + "net/url" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func Dial(rawURL string) (core.Producer, error) { + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + + conn, err := net.DialTimeout("tcp", u.Host, core.ConnDialTimeout) + if err != nil { + return nil, err + } + + cc := core.Connection{ + ID: core.NewID(), + FormatName: "wyoming", + Medias: []*core.Media{ + { + Kind: core.KindAudio, + Codecs: []*core.Codec{ + {Name: core.CodecPCML, ClockRate: 16000}, + }, + }, + }, + Transport: conn, + } + + if u.Query().Get("backchannel") != "1" { + cc.Medias[0].Direction = core.DirectionRecvonly + return &Producer{cc, NewAPI(conn)}, nil + } else { + cc.Medias[0].Direction = core.DirectionSendonly + return &Backchannel{cc, NewAPI(conn)}, nil + } +} From df2e98209017ac9d1275b0776f7f356ea006205d Mon Sep 17 00:00:00 2001 From: Alex X Date: Tue, 22 Apr 2025 10:25:22 +0300 Subject: [PATCH 04/15] Add logs to wyoming module --- internal/wyoming/wyoming.go | 41 +++++++++++++++++++++++++++++-------- pkg/wyoming/satellite.go | 6 +++--- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/internal/wyoming/wyoming.go b/internal/wyoming/wyoming.go index 41ae27c3..af5cd65d 100644 --- a/internal/wyoming/wyoming.go +++ b/internal/wyoming/wyoming.go @@ -33,17 +33,11 @@ func Init() { continue } - ln, err := net.Listen("tcp", cfg.Listen) - if err != nil { - log.Warn().Msgf("[wyoming] listen error: %s", err) - continue - } - if cfg.Name == "" { cfg.Name = name } - srv := wyoming.Server{ + srv := &wyoming.Server{ Name: cfg.Name, VADThreshold: int16(1000 * cfg.VADThreshold), // 1.0 => 1000 WakeURI: cfg.WakeURI, @@ -62,9 +56,40 @@ func Init() { SndHandler: func(prod core.Producer) error { return stream.Play(prod) }, + Trace: func(format string, v ...any) { + log.Trace().Msgf("[wyoming] "+format, v...) + }, } - go srv.Serve(ln) + go serve(srv, cfg.Listen) } } var log zerolog.Logger + +func serve(srv *wyoming.Server, address string) { + ln, err := net.Listen("tcp", address) + if err != nil { + log.Warn().Msgf("[wyoming] listen error: %s", err) + } + + for { + conn, err := ln.Accept() + if err != nil { + return + } + + go handle(srv, conn) + } +} + +func handle(srv *wyoming.Server, conn net.Conn) { + addr := conn.RemoteAddr() + + log.Trace().Msgf("[wyoming] %s connected", addr) + + if err := srv.Handle(conn); err != nil { + log.Error().Msgf("[wyoming] %s error: %s", addr, err) + } + + log.Trace().Msgf("[wyoming] %s disconnected", addr) +} diff --git a/pkg/wyoming/satellite.go b/pkg/wyoming/satellite.go index 14bded32..bab3c761 100644 --- a/pkg/wyoming/satellite.go +++ b/pkg/wyoming/satellite.go @@ -21,6 +21,8 @@ type Server struct { MicHandler func(cons core.Consumer) error SndHandler func(prod core.Producer) error + + Trace func(format string, v ...any) } func (s *Server) Serve(l net.Listener) error { @@ -39,8 +41,6 @@ func (s *Server) Handle(conn net.Conn) error { sat := newSatellite(api, s) defer sat.Close() - //log.Debug().Msgf("[wyoming] new client: %s", conn.RemoteAddr()) - var snd []byte for { @@ -49,7 +49,7 @@ func (s *Server) Handle(conn net.Conn) error { return err } - //log.Printf("%s %s %d", evt.Type, evt.Data, len(evt.Payload)) + s.Trace("event: %s data: %s payload: %d", evt.Type, evt.Data, len(evt.Payload)) switch evt.Type { case "ping": // {"text": null} From 6df1e68a5fac7be6f1dd91f22cd7a20ac43afa5b Mon Sep 17 00:00:00 2001 From: Alex X Date: Tue, 22 Apr 2025 10:26:00 +0300 Subject: [PATCH 05/15] Update wyoming producer and backchannel --- pkg/pcm/pcm.go | 1 + pkg/wyoming/backchannel.go | 23 ++++++++++++++++++++++- pkg/wyoming/producer.go | 22 ++++++++++++++++++++++ pkg/wyoming/wyoming.go | 20 ++------------------ 4 files changed, 47 insertions(+), 19 deletions(-) diff --git a/pkg/pcm/pcm.go b/pkg/pcm/pcm.go index bf54a6cf..13405ad4 100644 --- a/pkg/pcm/pcm.go +++ b/pkg/pcm/pcm.go @@ -203,5 +203,6 @@ func ProducerCodecs() []*core.Codec { {Name: core.CodecPCM, ClockRate: 8000}, {Name: core.CodecPCMA, ClockRate: 8000}, {Name: core.CodecPCMU, ClockRate: 8000}, + {Name: core.CodecPCML, ClockRate: 22050}, // wyoming-snd-external } } diff --git a/pkg/wyoming/backchannel.go b/pkg/wyoming/backchannel.go index 8760789e..b4167ff1 100644 --- a/pkg/wyoming/backchannel.go +++ b/pkg/wyoming/backchannel.go @@ -2,6 +2,7 @@ package wyoming import ( "fmt" + "net" "time" "github.com/AlexxIT/go2rtc/pkg/core" @@ -13,6 +14,26 @@ type Backchannel struct { api *API } +func newBackchannel(conn net.Conn) *Backchannel { + return &Backchannel{ + core.Connection{ + ID: core.NewID(), + FormatName: "wyoming", + Medias: []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + {Name: core.CodecPCML, ClockRate: 22050}, + }, + }, + }, + Transport: conn, + }, + NewAPI(conn), + } +} + func (b *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { return nil, core.ErrCantGetTrack } @@ -23,7 +44,7 @@ func (b *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core ts := time.Now().Nanosecond() evt := &Event{ Type: "audio-chunk", - Data: []byte(fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, ts)), + Data: []byte(fmt.Sprintf(`{"rate":22050,"width":2,"channels":1,"timestamp":%d}`, ts)), Payload: pkt.Payload, } _ = b.api.WriteEvent(evt) diff --git a/pkg/wyoming/producer.go b/pkg/wyoming/producer.go index 9cd6abb6..09451333 100644 --- a/pkg/wyoming/producer.go +++ b/pkg/wyoming/producer.go @@ -1,6 +1,8 @@ package wyoming import ( + "net" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/pion/rtp" ) @@ -10,6 +12,26 @@ type Producer struct { api *API } +func newProducer(conn net.Conn) *Producer { + return &Producer{ + core.Connection{ + ID: core.NewID(), + FormatName: "wyoming", + Medias: []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + {Name: core.CodecPCML, ClockRate: 16000}, + }, + }, + }, + Transport: conn, + }, + NewAPI(conn), + } +} + func (p *Producer) Start() error { var seq uint16 var ts uint32 diff --git a/pkg/wyoming/wyoming.go b/pkg/wyoming/wyoming.go index 96d1dc5e..0c8eebae 100644 --- a/pkg/wyoming/wyoming.go +++ b/pkg/wyoming/wyoming.go @@ -18,25 +18,9 @@ func Dial(rawURL string) (core.Producer, error) { return nil, err } - cc := core.Connection{ - ID: core.NewID(), - FormatName: "wyoming", - Medias: []*core.Media{ - { - Kind: core.KindAudio, - Codecs: []*core.Codec{ - {Name: core.CodecPCML, ClockRate: 16000}, - }, - }, - }, - Transport: conn, - } - if u.Query().Get("backchannel") != "1" { - cc.Medias[0].Direction = core.DirectionRecvonly - return &Producer{cc, NewAPI(conn)}, nil + return newProducer(conn), nil } else { - cc.Medias[0].Direction = core.DirectionSendonly - return &Backchannel{cc, NewAPI(conn)}, nil + return newBackchannel(conn), nil } } From 3b7309d9f78649ac34560b1fc7b62bc1508b2028 Mon Sep 17 00:00:00 2001 From: Alex X Date: Tue, 22 Apr 2025 11:49:08 +0300 Subject: [PATCH 06/15] Add support mic mode for wyoming module --- internal/wyoming/wyoming.go | 20 +++++++++++++++----- pkg/wyoming/mic.go | 35 +++++++++++++++++++++++++++++++++++ pkg/wyoming/satellite.go | 10 +++++++--- 3 files changed, 57 insertions(+), 8 deletions(-) create mode 100644 pkg/wyoming/mic.go diff --git a/internal/wyoming/wyoming.go b/internal/wyoming/wyoming.go index af5cd65d..bdff4b03 100644 --- a/internal/wyoming/wyoming.go +++ b/internal/wyoming/wyoming.go @@ -18,6 +18,7 @@ func Init() { Mod map[string]struct { Listen string `yaml:"listen"` Name string `yaml:"name"` + Mode string `yaml:"mode"` WakeURI string `yaml:"wake_uri"` VADThreshold float32 `yaml:"vad_threshold"` } `yaml:"wyoming"` @@ -60,13 +61,13 @@ func Init() { log.Trace().Msgf("[wyoming] "+format, v...) }, } - go serve(srv, cfg.Listen) + go serve(srv, cfg.Mode, cfg.Listen) } } var log zerolog.Logger -func serve(srv *wyoming.Server, address string) { +func serve(srv *wyoming.Server, mode, address string) { ln, err := net.Listen("tcp", address) if err != nil { log.Warn().Msgf("[wyoming] listen error: %s", err) @@ -78,16 +79,25 @@ func serve(srv *wyoming.Server, address string) { return } - go handle(srv, conn) + go handle(srv, mode, conn) } } -func handle(srv *wyoming.Server, conn net.Conn) { +func handle(srv *wyoming.Server, mode string, conn net.Conn) { addr := conn.RemoteAddr() log.Trace().Msgf("[wyoming] %s connected", addr) - if err := srv.Handle(conn); err != nil { + var err error + + switch mode { + case "mic": + err = srv.HandleMic(conn) + default: + err = srv.Handle(conn) + } + + if err != nil { log.Error().Msgf("[wyoming] %s error: %s", addr, err) } diff --git a/pkg/wyoming/mic.go b/pkg/wyoming/mic.go new file mode 100644 index 00000000..014ba4ea --- /dev/null +++ b/pkg/wyoming/mic.go @@ -0,0 +1,35 @@ +package wyoming + +import ( + "fmt" + "net" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func (s *Server) HandleMic(conn net.Conn) error { + defer conn.Close() + + var closed core.Waiter + var timestamp int + + api := NewAPI(conn) + mic := newMicConsumer(func(chunk []byte) { + data := fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, timestamp) + evt := &Event{Type: "audio-chunk", Data: []byte(data), Payload: chunk} + if err := api.WriteEvent(evt); err != nil { + closed.Done(nil) + } + + timestamp += len(chunk) / 2 + }) + mic.RemoteAddr = api.conn.RemoteAddr().String() + + if err := s.MicHandler(mic); err != nil { + return err + } + + defer mic.Stop() + + return closed.Wait() +} diff --git a/pkg/wyoming/satellite.go b/pkg/wyoming/satellite.go index bab3c761..a787dfde 100644 --- a/pkg/wyoming/satellite.go +++ b/pkg/wyoming/satellite.go @@ -142,9 +142,6 @@ func (s *satellite) pause() { s.state = stateUnavailable if s.mic != nil { - if s.mic.onClose != nil { - s.mic.onClose() - } _ = s.mic.Stop() s.mic = nil } @@ -296,6 +293,13 @@ func (c *micConsumer) AddTrack(media *core.Media, codec *core.Codec, track *core return nil } +func (c *micConsumer) Stop() error { + if c.onClose != nil { + c.onClose() + } + return c.Connection.Stop() +} + type sndProducer struct { core.Connection data []byte From 80f57a02922f6fb635f535eda988978b5a26783a Mon Sep 17 00:00:00 2001 From: Alex X Date: Tue, 22 Apr 2025 13:16:57 +0300 Subject: [PATCH 07/15] Add support snd mode for wyoming module --- internal/wyoming/wyoming.go | 2 ++ pkg/wyoming/satellite.go | 4 +++- pkg/wyoming/snd.go | 40 +++++++++++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) create mode 100644 pkg/wyoming/snd.go diff --git a/internal/wyoming/wyoming.go b/internal/wyoming/wyoming.go index bdff4b03..aa76eab7 100644 --- a/internal/wyoming/wyoming.go +++ b/internal/wyoming/wyoming.go @@ -93,6 +93,8 @@ func handle(srv *wyoming.Server, mode string, conn net.Conn) { switch mode { case "mic": err = srv.HandleMic(conn) + case "snd": + err = srv.HandleSnd(conn) default: err = srv.Handle(conn) } diff --git a/pkg/wyoming/satellite.go b/pkg/wyoming/satellite.go index a787dfde..c542e3a9 100644 --- a/pkg/wyoming/satellite.go +++ b/pkg/wyoming/satellite.go @@ -377,7 +377,9 @@ func (s *sndProducer) Start() error { seq++ } - s.onClose() + if s.onClose != nil { + s.onClose() + } return nil } diff --git a/pkg/wyoming/snd.go b/pkg/wyoming/snd.go new file mode 100644 index 00000000..71148efe --- /dev/null +++ b/pkg/wyoming/snd.go @@ -0,0 +1,40 @@ +package wyoming + +import ( + "io" + "net" + "time" +) + +func (s *Server) HandleSnd(conn net.Conn) error { + defer conn.Close() + + var snd []byte + + api := NewAPI(conn) + for { + evt, err := api.ReadEvent() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + s.Trace("event: %s data: %s payload: %d", evt.Type, evt.Data, len(evt.Payload)) + + switch evt.Type { + case "audio-start": + snd = snd[:0] + case "audio-chunk": + snd = append(snd, evt.Payload...) + case "audio-stop": + prod := newSndProducer(snd, func() { + time.Sleep(time.Second) // some extra delay before close + }) + if err = s.SndHandler(prod); err != nil { + return err + } + } + } +} From 7cf672da84ec32b15943fe2fd2c4f463001d0f5d Mon Sep 17 00:00:00 2001 From: Alex X Date: Tue, 22 Apr 2025 14:19:40 +0300 Subject: [PATCH 08/15] Add readme for exec and wyoming modules --- internal/exec/README.md | 12 +++ internal/wyoming/README.md | 197 +++++++++++++++++++++++++++++++++++++ pkg/wyoming/README.md | 14 +++ 3 files changed, 223 insertions(+) create mode 100644 internal/exec/README.md create mode 100644 internal/wyoming/README.md create mode 100644 pkg/wyoming/README.md diff --git a/internal/exec/README.md b/internal/exec/README.md new file mode 100644 index 00000000..e15a9657 --- /dev/null +++ b/internal/exec/README.md @@ -0,0 +1,12 @@ +## Backchannel + +- You can check audio card names in the **Go2rtc > WebUI > Add** +- You can specify multiple backchannel lines with different codecs + +```yaml +sources: + two_way_audio_win: + - exec:ffmpeg -hide_banner -f dshow -i "audio=Microphone (High Definition Audio Device)" -c pcm_s16le -ar 16000 -ac 1 -f wav - + - exec:ffplay -nodisp -probesize 32 -f s16le -ar 16000 -#backchannel=1#audio=s16le/16000 + - exec:ffplay -nodisp -probesize 32 -f alaw -ar 8000 -#backchannel=1#audio=alaw/8000 +``` diff --git a/internal/wyoming/README.md b/internal/wyoming/README.md new file mode 100644 index 00000000..445af2c8 --- /dev/null +++ b/internal/wyoming/README.md @@ -0,0 +1,197 @@ +# Wyoming + +This module provide [Wyoming Protocol](https://www.home-assistant.io/integrations/wyoming/) support to create local voice assistants using [Home Assistant](https://www.home-assistant.io/). + +- go2rtc can act as [Wyoming Satellite](https://github.com/rhasspy/wyoming-satellite) +- go2rtc can act as [Wyoming External Microphone](https://github.com/rhasspy/wyoming-mic-external) +- go2rtc can act as [Wyoming External Sound](https://github.com/rhasspy/wyoming-snd-external) +- any supported audio source with PCM codec can be used as audio input +- any supported two-way audio source with PCM codec can be used as audio output +- any desktop/server microphone/speaker can be used as two-way audio source + - supported any OS via FFmpeg or any similar software + - supported Linux via alsa source + +## Typical Voice Pipeline + +1. Audio stream (MIC) + - any audio source with PCM codec support (include PCMA/PCMU) +2. Voice Activity Detector (VAD) +3. Wake Word (WAKE) + - [OpenWakeWord](https://www.home-assistant.io/voice_control/create_wake_word/) +4. Speech-to-Text (STT) + - [Whisper](https://github.com/home-assistant/addons/blob/master/whisper/README.md) + - [Vosk](https://github.com/rhasspy/hassio-addons/blob/master/vosk/README.md) +5. Conversation agent (INTENT) + - [Home Assistant](https://www.home-assistant.io/integrations/conversation/) +6. Text-to-speech (TTS) + - [Google Translate](https://www.home-assistant.io/integrations/google_translate/) + - [Piper](https://github.com/home-assistant/addons/blob/master/piper/README.md) +7. Audio stream (SND) + - any source with two-way audio (backchannel) and PCM codec support (include PCMA/PCMU) + +You can use a large number of different projects for WAKE, STT, INTENT and TTS thanks to the Home Assistant. + +And you can use a large number of different technologies for MIC and SND thanks to Go2rtc. + +## Configuration + +You can optionally specify WAKE service. So go2rtc will start transmitting audio to Home Assistant only after WAKE word. If the WAKE service cannot be connected to or not specified - go2rtc will pass all audio to Home Assistant. In this case WAKE service must be configured in your Voice Assistant pipeline. + +You can optionally specify VAD threshold. So go2rtc will start transmitting audio to WAKE service only after some audio noise. + +Your stream must support audio transmission in PCM codec (include PCMA/PCMU). + +```yaml +wyoming: + stream_name_from_streams_section: + listen: :10700 + name: "My Satellite" # optional name + wake_uri: tcp://192.168.1.23:10400 # optional WAKE service + vad_threshold: 1 # optional VAD threshold (from 0.1 to 3.5) +``` + +Home Assistant -> Settings -> Integrations -> Add -> Wyoming Protocol -> Host + Port from `go2rtc.yaml` + +Select one or multiple wake words: +```yaml +wake_uri: tcp://192.168.1.23:10400?name=alexa_v0.1&name=hey_jarvis_v0.1&name=hey_mycroft_v0.1&name=hey_rhasspy_v0.1&name=ok_nabu_v0.1 +``` + +## Config examples + +Satellite on Windows server using FFmpeg and FFplay. + +```yaml +streams: + satellite_win: + - exec:ffmpeg -hide_banner -f dshow -i "audio=Microphone (High Definition Audio Device)" -c pcm_s16le -ar 16000 -ac 1 -f wav - + - exec:ffplay -hide_banner -nodisp -probesize 32 -f s16le -ar 22050 -#backchannel=1#audio=s16le/22050 + +wyoming: + satellite_win: + listen: :10700 + name: "Windows Satellite" + wake_uri: tcp://192.168.1.23:10400 + vad_threshold: 1 +``` + +Satellite on Dahua camera with two-way audio support. + +```yaml +streams: + dahua_camera: + - rtsp://admin:password@192.168.1.123/cam/realmonitor?channel=1&subtype=1&unicast=true&proto=Onvif + +wyoming: + dahua_camera: + listen: :10700 + name: "Dahua Satellite" + wake_uri: tcp://192.168.1.23:10400 + vad_threshold: 1 +``` + +Satellite on Dahua camera with two-way audio support. + +```yaml +streams: + wyoming_external: + - wyoming://192.168.1.23:10600 # wyoming-mic-external + - wyoming://192.168.1.23:10601?backchannel=1 # wyoming-snd-external + +wyoming: + wyoming_external: + listen: :10700 + name: "Wyoming Satellite" + wake_uri: tcp://192.168.1.23:10400 + vad_threshold: 1 +``` + +## Wyoming External Microphone and Sound + +Advanced users, who want to enjoy the [Wyoming Satellite](https://github.com/rhasspy/wyoming-satellite) project, can use go2rtc as a [Wyoming External Microphone](https://github.com/rhasspy/wyoming-mic-external) or [Wyoming External Sound](https://github.com/rhasspy/wyoming-snd-external). + +**go2rtc.yaml** + +```yaml +streams: + wyoming_mic_external: + - exec:ffmpeg -hide_banner -f dshow -i "audio=Microphone (High Definition Audio Device)" -c pcm_s16le -ar 16000 -ac 1 -f wav - + wyoming_snd_external: + - exec:ffplay -hide_banner -nodisp -probesize 32 -f s16le -ar 22050 -#backchannel=1#audio=s16le/22050 + +wyoming: + wyoming_mic_external: + listen: :10600 + mode: mic + wyoming_snd_external: + listen: :10601 + mode: snd +``` + +**docker-compose.yml** + +```yaml +version: "3.8" +services: + satellite: + build: wyoming-satellite # https://github.com/rhasspy/wyoming-satellite + ports: + - "10700:10700" + command: + - "--name" + - "my satellite" + - "--mic-uri" + - "tcp://192.168.1.23:10600" + - "--snd-uri" + - "tcp://192.168.1.23:10601" + - "--debug" +``` + +## Wyoming External Source + +**go2rtc.yaml** + +```yaml +streams: + wyoming_external: + - wyoming://192.168.1.23:10600 + - wyoming://192.168.1.23:10601?backchannel=1 +``` + +**docker-compose.yml** + +```yaml +version: "3.8" +services: + microphone: + build: wyoming-mic-external # https://github.com/rhasspy/wyoming-mic-external + ports: + - "10600:10600" + devices: + - /dev/snd:/dev/snd + group_add: + - audio + command: + - "--device" + - "sysdefault" + - "--debug" + playback: + build: wyoming-snd-external # https://github.com/rhasspy/wyoming-snd-external + ports: + - "10601:10601" + devices: + - /dev/snd:/dev/snd + group_add: + - audio + command: + - "--device" + - "sysdefault" + - "--debug" +``` + +## Debug + +```yaml +log: + wyoming: trace +``` diff --git a/pkg/wyoming/README.md b/pkg/wyoming/README.md new file mode 100644 index 00000000..ff17d079 --- /dev/null +++ b/pkg/wyoming/README.md @@ -0,0 +1,14 @@ +## Default wake words + +- alexa_v0.1 +- hey_jarvis_v0.1 +- hey_mycroft_v0.1 +- hey_rhasspy_v0.1 +- ok_nabu_v0.1 + +## Useful Links + +- https://github.com/rhasspy/wyoming-satellite +- https://github.com/rhasspy/wyoming-openwakeword +- https://github.com/fwartner/home-assistant-wakewords-collection +- https://github.com/esphome/micro-wake-word-models/tree/main?tab=readme-ov-file From 70b4bf779e4ad9e337d752b59d595be11712e16a Mon Sep 17 00:00:00 2001 From: Alex X Date: Tue, 22 Apr 2025 16:35:44 +0300 Subject: [PATCH 09/15] Change wyoming Event.Data type to string --- pkg/wyoming/api.go | 7 ++++--- pkg/wyoming/backchannel.go | 2 +- pkg/wyoming/mic.go | 2 +- pkg/wyoming/satellite.go | 8 ++++---- pkg/wyoming/wakeword.go | 8 ++++---- 5 files changed, 14 insertions(+), 13 deletions(-) diff --git a/pkg/wyoming/api.go b/pkg/wyoming/api.go index 59de747c..ce297a22 100644 --- a/pkg/wyoming/api.go +++ b/pkg/wyoming/api.go @@ -64,10 +64,11 @@ func (w *API) ReadEvent() (*Event, error) { evt := Event{Type: hdr.Type} if hdr.DataLength > 0 { - evt.Data = make([]byte, hdr.DataLength) - if _, err = io.ReadFull(w.rd, evt.Data); err != nil { + data = make([]byte, hdr.DataLength) + if _, err = io.ReadFull(w.rd, data); err != nil { return nil, err } + evt.Data = string(data) } if hdr.PayloadLength > 0 { @@ -86,7 +87,7 @@ func (w *API) Close() error { type Event struct { Type string - Data []byte + Data string Payload []byte } diff --git a/pkg/wyoming/backchannel.go b/pkg/wyoming/backchannel.go index b4167ff1..e0569fe1 100644 --- a/pkg/wyoming/backchannel.go +++ b/pkg/wyoming/backchannel.go @@ -44,7 +44,7 @@ func (b *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core ts := time.Now().Nanosecond() evt := &Event{ Type: "audio-chunk", - Data: []byte(fmt.Sprintf(`{"rate":22050,"width":2,"channels":1,"timestamp":%d}`, ts)), + Data: fmt.Sprintf(`{"rate":22050,"width":2,"channels":1,"timestamp":%d}`, ts), Payload: pkt.Payload, } _ = b.api.WriteEvent(evt) diff --git a/pkg/wyoming/mic.go b/pkg/wyoming/mic.go index 014ba4ea..325b0c36 100644 --- a/pkg/wyoming/mic.go +++ b/pkg/wyoming/mic.go @@ -16,7 +16,7 @@ func (s *Server) HandleMic(conn net.Conn) error { api := NewAPI(conn) mic := newMicConsumer(func(chunk []byte) { data := fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, timestamp) - evt := &Event{Type: "audio-chunk", Data: []byte(data), Payload: chunk} + evt := &Event{Type: "audio-chunk", Data: data, Payload: chunk} if err := api.WriteEvent(evt); err != nil { closed.Done(nil) } diff --git a/pkg/wyoming/satellite.go b/pkg/wyoming/satellite.go index c542e3a9..0d1ea3e8 100644 --- a/pkg/wyoming/satellite.go +++ b/pkg/wyoming/satellite.go @@ -57,7 +57,7 @@ func (s *Server) Handle(conn net.Conn) error { case "describe": // {"asr": [], "tts": [], "handle": [], "intent": [], "wake": [], "satellite": {"name": "my satellite", "attribution": {"name": "", "url": ""}, "installed": true, "description": "my satellite", "version": "1.4.1", "area": null, "snd_format": null}} data := fmt.Sprintf(`{"satellite":{"name":%q,"attribution":{"name":"go2rtc","url":"https://github.com/AlexxIT/go2rtc"},"installed":true}}`, s.Name) - _ = api.WriteEvent(&Event{Type: "info", Data: []byte(data)}) + _ = api.WriteEvent(&Event{Type: "info", Data: data}) case "run-satellite": if err = sat.run(); err != nil { return err @@ -193,7 +193,7 @@ func (s *satellite) onMicChunk(chunk []byte) { // some problems with wake word - redirect to HA evt := &Event{ Type: "run-pipeline", - Data: []byte(`{"start_stage":"wake","end_stage":"tts","restart_on_end":false}`), + Data: `{"start_stage":"wake","end_stage":"tts","restart_on_end":false}`, } if err := s.api.WriteEvent(evt); err != nil { return @@ -211,7 +211,7 @@ func (s *satellite) onMicChunk(chunk []byte) { // check if wake word detected evt := &Event{ Type: "run-pipeline", - Data: []byte(`{"start_stage":"asr","end_stage":"tts","restart_on_end":false}`), + Data: `{"start_stage":"asr","end_stage":"tts","restart_on_end":false}`, } _ = s.api.WriteEvent(evt) s.state = stateStreaming @@ -232,7 +232,7 @@ func (s *satellite) onMicChunk(chunk []byte) { if s.state == stateStreaming { data := fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, s.timestamp) - evt := &Event{Type: "audio-chunk", Data: []byte(data), Payload: chunk} + evt := &Event{Type: "audio-chunk", Data: data, Payload: chunk} _ = s.api.WriteEvent(evt) } diff --git a/pkg/wyoming/wakeword.go b/pkg/wyoming/wakeword.go index 3603e22a..4c728f20 100644 --- a/pkg/wyoming/wakeword.go +++ b/pkg/wyoming/wakeword.go @@ -53,7 +53,7 @@ func (w *WakeWord) handle() { var data struct { Name string `json:"name"` } - if err = json.Unmarshal(evt.Data, &data); err != nil { + if err = json.Unmarshal([]byte(evt.Data), &data); err != nil { return } w.Detection = data.Name @@ -95,7 +95,7 @@ func (w *WakeWord) Start() error { if err != nil { return err } - evt := &Event{Type: "detect", Data: data} + evt := &Event{Type: "detect", Data: string(data)} if err := w.WriteEvent(evt); err != nil { return err } @@ -114,7 +114,7 @@ func (w *WakeWord) WriteChunk(payload []byte) error { return w.WriteEvent(evt) } -func audioData(send int) []byte { +func audioData(send int) string { // timestamp in ms = send / 2 * 1000 / 16000 = send / 32 - return []byte(fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, send/32)) + return fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, send/32) } From 545a105ba097da68d2b996e2d172e88d6e8dcf9e Mon Sep 17 00:00:00 2001 From: Alex X Date: Tue, 22 Apr 2025 16:37:10 +0300 Subject: [PATCH 10/15] Add support body to expr fetch func --- pkg/expr/expr.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/pkg/expr/expr.go b/pkg/expr/expr.go index 36719100..e2ed0ca6 100644 --- a/pkg/expr/expr.go +++ b/pkg/expr/expr.go @@ -6,17 +6,23 @@ import ( "io" "net/http" "regexp" + "strings" "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/expr-lang/expr" ) -func newRequest(method, url string, headers map[string]any) (*http.Request, error) { +func newRequest(method, url string, headers map[string]any, body string) (*http.Request, error) { + var rd io.Reader + if method == "" { method = "GET" } + if body != "" { + rd = strings.NewReader(body) + } - req, err := http.NewRequest(method, url, nil) + req, err := http.NewRequest(method, url, rd) if err != nil { return nil, err } @@ -55,7 +61,8 @@ var Options = []expr.Option{ options := params[1].(map[string]any) method, _ := options["method"].(string) headers, _ := options["headers"].(map[string]any) - req, err = newRequest(method, url, headers) + body, _ := options["body"].(string) + req, err = newRequest(method, url, headers, body) } else { req, err = http.NewRequest("GET", url, nil) } From 518cae14767d549f47fb3e4b0a7770ceb76681c2 Mon Sep 17 00:00:00 2001 From: Alex X Date: Tue, 22 Apr 2025 16:39:06 +0300 Subject: [PATCH 11/15] Add support events to wyoming server --- internal/expr/expr.go | 2 +- internal/wyoming/wyoming.go | 17 ++-- pkg/expr/expr.go | 15 ++- pkg/expr/expr_test.go | 4 +- pkg/wyoming/expr.go | 131 +++++++++++++++++++++++++ pkg/wyoming/satellite.go | 187 +++++++++++++----------------------- 6 files changed, 224 insertions(+), 132 deletions(-) create mode 100644 pkg/wyoming/expr.go diff --git a/internal/expr/expr.go b/internal/expr/expr.go index a6d1f972..8fd6c9c2 100644 --- a/internal/expr/expr.go +++ b/internal/expr/expr.go @@ -12,7 +12,7 @@ func Init() { log := app.GetLogger("expr") streams.RedirectFunc("expr", func(url string) (string, error) { - v, err := expr.Run(url[5:]) + v, err := expr.Eval(url[5:], nil) if err != nil { return "", err } diff --git a/internal/wyoming/wyoming.go b/internal/wyoming/wyoming.go index aa76eab7..1849da3a 100644 --- a/internal/wyoming/wyoming.go +++ b/internal/wyoming/wyoming.go @@ -16,11 +16,12 @@ func Init() { // server var cfg struct { Mod map[string]struct { - Listen string `yaml:"listen"` - Name string `yaml:"name"` - Mode string `yaml:"mode"` - WakeURI string `yaml:"wake_uri"` - VADThreshold float32 `yaml:"vad_threshold"` + Listen string `yaml:"listen"` + Name string `yaml:"name"` + Mode string `yaml:"mode"` + Event map[string]string `yaml:"event"` + WakeURI string `yaml:"wake_uri"` + VADThreshold float32 `yaml:"vad_threshold"` } `yaml:"wyoming"` } app.LoadConfig(&cfg) @@ -40,6 +41,7 @@ func Init() { srv := &wyoming.Server{ Name: cfg.Name, + Event: cfg.Event, VADThreshold: int16(1000 * cfg.VADThreshold), // 1.0 => 1000 WakeURI: cfg.WakeURI, MicHandler: func(cons core.Consumer) error { @@ -60,6 +62,9 @@ func Init() { Trace: func(format string, v ...any) { log.Trace().Msgf("[wyoming] "+format, v...) }, + Error: func(format string, v ...any) { + log.Error().Msgf("[wyoming] "+format, v...) + }, } go serve(srv, cfg.Mode, cfg.Listen) } @@ -70,7 +75,7 @@ var log zerolog.Logger func serve(srv *wyoming.Server, mode, address string) { ln, err := net.Listen("tcp", address) if err != nil { - log.Warn().Msgf("[wyoming] listen error: %s", err) + log.Warn().Err(err).Msgf("[wyoming] listen") } for { diff --git a/pkg/expr/expr.go b/pkg/expr/expr.go index e2ed0ca6..4a8a663c 100644 --- a/pkg/expr/expr.go +++ b/pkg/expr/expr.go @@ -10,6 +10,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/expr-lang/expr" + "github.com/expr-lang/expr/vm" ) func newRequest(method, url string, headers map[string]any, body string) (*http.Request, error) { @@ -112,11 +113,19 @@ var Options = []expr.Option{ ), } -func Run(input string) (any, error) { - program, err := expr.Compile(input, Options...) +func Compile(input string) (*vm.Program, error) { + return expr.Compile(input, Options...) +} + +func Eval(input string, env any) (any, error) { + program, err := Compile(input) if err != nil { return nil, err } - return expr.Run(program, nil) + return expr.Run(program, env) +} + +func Run(program *vm.Program, env any) (any, error) { + return vm.Run(program, env) } diff --git a/pkg/expr/expr_test.go b/pkg/expr/expr_test.go index 14e75b2a..096afcdc 100644 --- a/pkg/expr/expr_test.go +++ b/pkg/expr/expr_test.go @@ -7,11 +7,11 @@ import ( ) func TestMatchHost(t *testing.T) { - v, err := Run(` + v, err := Eval(` let url = "rtsp://user:pass@192.168.1.123/cam/realmonitor?..."; let host = match(url, "//[^/]+")[0][2:]; host -`) +`, nil) require.Nil(t, err) require.Equal(t, "user:pass@192.168.1.123", v) } diff --git a/pkg/wyoming/expr.go b/pkg/wyoming/expr.go new file mode 100644 index 00000000..1b184cc3 --- /dev/null +++ b/pkg/wyoming/expr.go @@ -0,0 +1,131 @@ +package wyoming + +import ( + "fmt" + "time" + + "github.com/AlexxIT/go2rtc/pkg/expr" + "golang.org/x/net/context" +) + +type env struct { + *satellite + Type string + Data string +} + +func (s *satellite) handleEvent(evt *Event) { + switch evt.Type { + case "describe": + // {"asr": [], "tts": [], "handle": [], "intent": [], "wake": [], "satellite": {"name": "my satellite", "attribution": {"name": "", "url": ""}, "installed": true, "description": "my satellite", "version": "1.4.1", "area": null, "snd_format": null}} + data := fmt.Sprintf(`{"satellite":{"name":%q,"attribution":{"name":"go2rtc","url":"https://github.com/AlexxIT/go2rtc"},"installed":true}}`, s.srv.Name) + s.WriteEvent("info", data) + case "run-satellite": + s.Detect() + case "pause-satellite": + s.Stop() + case "detect": // WAKE_WORD_START {"names": null} + case "detection": // WAKE_WORD_END {"name": "ok_nabu_v0.1", "timestamp": 17580, "speaker": null} + case "transcribe": // STT_START {"language": "en"} + case "voice-started": // STT_VAD_START {"timestamp": 1160} + case "voice-stopped": // STT_VAD_END {"timestamp": 2470} + s.Pause() + case "transcript": // STT_END {"text": "how are you"} + case "synthesize": // TTS_START {"text": "Sorry, I couldn't understand that", "voice": {"language": "en"}} + case "audio-start": // TTS_END {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0} + case "audio-chunk": // {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0} + case "audio-stop": // {"timestamp": 2.880000000000002} + // run async because PlayAudio takes some time + go func() { + s.PlayAudio() + s.WriteEvent("played") + s.Detect() + }() + case "error": + s.Detect() + case "internal-run": + s.WriteEvent("run-pipeline", `{"start_stage":"wake","end_stage":"tts"}`) + s.Stream() + case "internal-detection": + s.WriteEvent("run-pipeline", `{"start_stage":"asr","end_stage":"tts"}`) + s.Stream() + } +} + +func (s *satellite) handleScript(evt *Event) { + var script string + if s.srv.Event != nil { + script = s.srv.Event[evt.Type] + } + + s.srv.Trace("event=%s data=%s payload size=%d", evt.Type, evt.Data, len(evt.Payload)) + + if script == "" { + s.handleEvent(evt) + return + } + + // run async because script can have sleeps + go func() { + e := &env{satellite: s, Type: evt.Type, Data: evt.Data} + if res, err := expr.Eval(script, e); err != nil { + s.srv.Trace("event=%s expr error=%s", evt.Type, err) + s.handleEvent(evt) + } else { + s.srv.Trace("event=%s expr result=%v", evt.Type, res) + } + }() +} + +func (s *satellite) Detect() bool { + return s.setMicState(stateWaitVAD) +} + +func (s *satellite) Stream() bool { + return s.setMicState(stateActive) +} + +func (s *satellite) Pause() bool { + return s.setMicState(stateIdle) +} + +func (s *satellite) Stop() bool { + s.micStop() + return true +} + +func (s *satellite) WriteEvent(args ...string) bool { + if len(args) == 0 { + return false + } + evt := &Event{Type: args[0]} + if len(args) > 1 { + evt.Data = args[1] + } + if err := s.api.WriteEvent(evt); err != nil { + return false + } + return true +} + +func (s *satellite) PlayAudio() bool { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + prod := newSndProducer(s.sndAudio, cancel) + if err := s.srv.SndHandler(prod); err != nil { + return false + } else { + <-ctx.Done() + return true + } +} + +func (e *env) Sleep(s string) bool { + d, err := time.ParseDuration(s) + if err != nil { + return false + } + time.Sleep(d) + return true +} diff --git a/pkg/wyoming/satellite.go b/pkg/wyoming/satellite.go index 0d1ea3e8..7bc990d0 100644 --- a/pkg/wyoming/satellite.go +++ b/pkg/wyoming/satellite.go @@ -1,7 +1,6 @@ package wyoming import ( - "errors" "fmt" "net" "sync" @@ -14,7 +13,8 @@ import ( ) type Server struct { - Name string + Name string + Event map[string]string VADThreshold int16 WakeURI string @@ -23,6 +23,7 @@ type Server struct { SndHandler func(prod core.Producer) error Trace func(format string, v ...any) + Error func(format string, v ...any) } func (s *Server) Serve(l net.Listener) error { @@ -41,66 +42,49 @@ func (s *Server) Handle(conn net.Conn) error { sat := newSatellite(api, s) defer sat.Close() - var snd []byte - for { evt, err := api.ReadEvent() if err != nil { return err } - s.Trace("event: %s data: %s payload: %d", evt.Type, evt.Data, len(evt.Payload)) - switch evt.Type { case "ping": // {"text": null} _ = api.WriteEvent(&Event{Type: "pong", Data: evt.Data}) - case "describe": - // {"asr": [], "tts": [], "handle": [], "intent": [], "wake": [], "satellite": {"name": "my satellite", "attribution": {"name": "", "url": ""}, "installed": true, "description": "my satellite", "version": "1.4.1", "area": null, "snd_format": null}} - data := fmt.Sprintf(`{"satellite":{"name":%q,"attribution":{"name":"go2rtc","url":"https://github.com/AlexxIT/go2rtc"},"installed":true}}`, s.Name) - _ = api.WriteEvent(&Event{Type: "info", Data: data}) - case "run-satellite": - if err = sat.run(); err != nil { - return err - } - case "pause-satellite": - sat.pause() - case "detect": // WAKE_WORD_START {"names": null} - case "detection": // WAKE_WORD_END {"name": "ok_nabu_v0.1", "timestamp": 17580, "speaker": null} - case "transcribe": // STT_START {"language": "en"} - case "voice-started": // STT_VAD_START {"timestamp": 1160} - case "voice-stopped": // STT_VAD_END {"timestamp": 2470} - sat.idle() - case "transcript": // STT_END {"text": "how are you"} - case "synthesize": // TTS_START {"text": "Sorry, I couldn't understand that", "voice": {"language": "en"}} case "audio-start": // TTS_END {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0} - snd = snd[:0] + sat.sndAudio = sat.sndAudio[:0] case "audio-chunk": // {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0} - snd = append(snd, evt.Payload...) - case "audio-stop": // {"timestamp": 2.880000000000002} - sat.respond(snd) - case "error": - sat.start() + sat.sndAudio = append(sat.sndAudio, evt.Payload...) + } + + if s.Event == nil || s.Event[evt.Type] == "" { + sat.handleEvent(evt) + } else { + // run async because there may be sleeps + go sat.handleScript(evt) } } } -// states like Home Assistant +// states like http.ConnState const ( - stateUnavailable = iota - stateIdle - stateWaitVAD // aka wait VAD - stateWaitWakeWord - stateStreaming + stateError = -2 + stateClosed = -1 + stateNew = 0 + stateIdle = 1 + stateWaitVAD = 2 // aka wait VAD + stateWaitWakeWord = 3 + stateActive = 4 ) type satellite struct { api *API srv *Server - state uint8 - mu sync.Mutex - - timestamp int + micState int8 + micTS int + micMu sync.Mutex + sndAudio []byte mic *micConsumer wake *WakeWord @@ -112,35 +96,41 @@ func newSatellite(api *API, srv *Server) *satellite { } func (s *satellite) Close() error { - s.pause() + s.Stop() return s.api.Close() } -func (s *satellite) run() error { - s.mu.Lock() - defer s.mu.Unlock() +const wakeTimeout = 5 * 2 * 16000 // 5 seconds - if s.state != stateUnavailable { - return errors.New("wyoming: wrong satellite state") +func (s *satellite) setMicState(state int8) bool { + s.micMu.Lock() + defer s.micMu.Unlock() + + if s.micState == stateNew { + s.mic = newMicConsumer(s.onMicChunk) + s.mic.RemoteAddr = s.api.conn.RemoteAddr().String() + if err := s.srv.MicHandler(s.mic); err != nil { + s.micState = stateError + s.srv.Error("can't get mic: %w", err) + _ = s.api.Close() + } else { + s.micState = stateIdle + } } - s.mic = newMicConsumer(s.onMicChunk) - s.mic.RemoteAddr = s.api.conn.RemoteAddr().String() - - if err := s.srv.MicHandler(s.mic); err != nil { - return err + if s.micState < stateIdle { + return false } - s.state = stateIdle - go s.start() - - return nil + s.micState = state + s.micTS = 0 + return true } -func (s *satellite) pause() { - s.mu.Lock() +func (s *satellite) micStop() { + s.micMu.Lock() - s.state = stateUnavailable + s.micState = stateClosed if s.mic != nil { _ = s.mic.Stop() s.mic = nil @@ -150,40 +140,18 @@ func (s *satellite) pause() { s.wake = nil } - s.mu.Unlock() + s.micMu.Unlock() } -func (s *satellite) start() { - s.mu.Lock() - - if s.state != stateUnavailable { - s.state = stateWaitVAD - } - - s.mu.Unlock() -} - -func (s *satellite) idle() { - s.mu.Lock() - - if s.state != stateUnavailable { - s.state = stateIdle - } - - s.mu.Unlock() -} - -const wakeTimeout = 5 * 2 * 16000 // 5 seconds - func (s *satellite) onMicChunk(chunk []byte) { - s.mu.Lock() - defer s.mu.Unlock() + s.micMu.Lock() + defer s.micMu.Unlock() - if s.state == stateIdle { + if s.micState == stateIdle { return } - if s.state == stateWaitVAD { + if s.micState == stateWaitVAD { // tests show that values over 1000 are most likely speech if s.srv.VADThreshold == 0 || s16le.PeaksRMS(chunk) > s.srv.VADThreshold { if s.wake == nil && s.srv.WakeURI != "" { @@ -191,62 +159,41 @@ func (s *satellite) onMicChunk(chunk []byte) { } if s.wake == nil { // some problems with wake word - redirect to HA - evt := &Event{ - Type: "run-pipeline", - Data: `{"start_stage":"wake","end_stage":"tts","restart_on_end":false}`, - } - if err := s.api.WriteEvent(evt); err != nil { - return - } - s.state = stateStreaming + s.micState = stateIdle + go s.handleScript(&Event{Type: "internal-run"}) } else { - s.state = stateWaitWakeWord + s.micState = stateWaitWakeWord } - s.timestamp = 0 + s.micTS = 0 } } - if s.state == stateWaitWakeWord { + if s.micState == stateWaitWakeWord { if s.wake.Detection != "" { // check if wake word detected - evt := &Event{ - Type: "run-pipeline", - Data: `{"start_stage":"asr","end_stage":"tts","restart_on_end":false}`, - } - _ = s.api.WriteEvent(evt) - s.state = stateStreaming - s.timestamp = 0 + s.micState = stateIdle + go s.handleScript(&Event{Type: "internal-detection", Data: `{"name":"` + s.wake.Detection + `"}`}) } else if err := s.wake.WriteChunk(chunk); err != nil { // wake word service failed - s.state = stateWaitVAD + s.micState = stateWaitVAD _ = s.wake.Close() s.wake = nil - } else if s.timestamp > wakeTimeout { + } else if s.micTS > wakeTimeout { // wake word detection timeout - s.state = stateWaitVAD + s.micState = stateWaitVAD } } else if s.wake != nil { _ = s.wake.Close() s.wake = nil } - if s.state == stateStreaming { - data := fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, s.timestamp) + if s.micState == stateActive { + data := fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, s.micTS) evt := &Event{Type: "audio-chunk", Data: data, Payload: chunk} _ = s.api.WriteEvent(evt) } - s.timestamp += len(chunk) / 2 -} - -func (s *satellite) respond(data []byte) { - prod := newSndProducer(data, func() { - _ = s.api.WriteEvent(&Event{Type: "played"}) - s.start() - }) - if err := s.srv.SndHandler(prod); err != nil { - prod.onClose() - } + s.micTS += len(chunk) / 2 } type micConsumer struct { @@ -373,7 +320,7 @@ func (s *sndProducer) Start() error { s.Recv += chunkBytes s.data = s.data[chunkBytes:] - pts += 10 * time.Millisecond + pts += 20 * time.Millisecond seq++ } From 890fd78a6afff756dd7d17649d65f1cdb3dd6656 Mon Sep 17 00:00:00 2001 From: Alex X Date: Thu, 24 Apr 2025 18:32:42 +0300 Subject: [PATCH 12/15] Remove errors from wyoming server handlers --- internal/wyoming/wyoming.go | 12 +++--------- pkg/wyoming/mic.go | 10 +++++----- pkg/wyoming/satellite.go | 4 ++-- pkg/wyoming/snd.go | 11 ++++------- 4 files changed, 14 insertions(+), 23 deletions(-) diff --git a/internal/wyoming/wyoming.go b/internal/wyoming/wyoming.go index 1849da3a..065275c3 100644 --- a/internal/wyoming/wyoming.go +++ b/internal/wyoming/wyoming.go @@ -93,19 +93,13 @@ func handle(srv *wyoming.Server, mode string, conn net.Conn) { log.Trace().Msgf("[wyoming] %s connected", addr) - var err error - switch mode { case "mic": - err = srv.HandleMic(conn) + srv.HandleMic(conn) case "snd": - err = srv.HandleSnd(conn) + srv.HandleSnd(conn) default: - err = srv.Handle(conn) - } - - if err != nil { - log.Error().Msgf("[wyoming] %s error: %s", addr, err) + srv.Handle(conn) } log.Trace().Msgf("[wyoming] %s disconnected", addr) diff --git a/pkg/wyoming/mic.go b/pkg/wyoming/mic.go index 325b0c36..4fb03b44 100644 --- a/pkg/wyoming/mic.go +++ b/pkg/wyoming/mic.go @@ -7,7 +7,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/core" ) -func (s *Server) HandleMic(conn net.Conn) error { +func (s *Server) HandleMic(conn net.Conn) { defer conn.Close() var closed core.Waiter @@ -26,10 +26,10 @@ func (s *Server) HandleMic(conn net.Conn) error { mic.RemoteAddr = api.conn.RemoteAddr().String() if err := s.MicHandler(mic); err != nil { - return err + s.Error("mic error: %s", err) + return } - defer mic.Stop() - - return closed.Wait() + _ = closed.Wait() + _ = mic.Stop() } diff --git a/pkg/wyoming/satellite.go b/pkg/wyoming/satellite.go index 7bc990d0..c45bc50e 100644 --- a/pkg/wyoming/satellite.go +++ b/pkg/wyoming/satellite.go @@ -37,7 +37,7 @@ func (s *Server) Serve(l net.Listener) error { } } -func (s *Server) Handle(conn net.Conn) error { +func (s *Server) Handle(conn net.Conn) { api := NewAPI(conn) sat := newSatellite(api, s) defer sat.Close() @@ -45,7 +45,7 @@ func (s *Server) Handle(conn net.Conn) error { for { evt, err := api.ReadEvent() if err != nil { - return err + return } switch evt.Type { diff --git a/pkg/wyoming/snd.go b/pkg/wyoming/snd.go index 71148efe..822c1ed4 100644 --- a/pkg/wyoming/snd.go +++ b/pkg/wyoming/snd.go @@ -1,12 +1,11 @@ package wyoming import ( - "io" "net" "time" ) -func (s *Server) HandleSnd(conn net.Conn) error { +func (s *Server) HandleSnd(conn net.Conn) { defer conn.Close() var snd []byte @@ -15,10 +14,7 @@ func (s *Server) HandleSnd(conn net.Conn) error { for { evt, err := api.ReadEvent() if err != nil { - if err == io.EOF { - return nil - } - return err + return } s.Trace("event: %s data: %s payload: %d", evt.Type, evt.Data, len(evt.Payload)) @@ -33,7 +29,8 @@ func (s *Server) HandleSnd(conn net.Conn) error { time.Sleep(time.Second) // some extra delay before close }) if err = s.SndHandler(prod); err != nil { - return err + s.Error("snd error: %s", err) + return } } } From c50e894a42d8fa40583dc5586032dff26866ef2d Mon Sep 17 00:00:00 2001 From: Alex X Date: Thu, 24 Apr 2025 21:23:16 +0300 Subject: [PATCH 13/15] Add PlayFile function to wyoming server --- pkg/pcm/handlers.go | 28 ++++++---- pkg/pcm/pcm.go | 18 +++++-- pkg/pcm/producer_sync.go | 96 +++++++++++++++++++++++++++++++++ pkg/wav/producer.go | 48 +---------------- pkg/wav/wav.go | 53 +++++++++++++++++++ pkg/wyoming/expr.go | 23 +++++--- pkg/wyoming/satellite.go | 111 +++++++-------------------------------- pkg/wyoming/snd.go | 11 ++-- 8 files changed, 226 insertions(+), 162 deletions(-) create mode 100644 pkg/pcm/producer_sync.go diff --git a/pkg/pcm/handlers.go b/pkg/pcm/handlers.go index 39075199..18a96468 100644 --- a/pkg/pcm/handlers.go +++ b/pkg/pcm/handlers.go @@ -2,6 +2,7 @@ package pcm import ( "sync" + "time" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/pion/rtp" @@ -82,18 +83,27 @@ func TranscodeHandler(dst, src *core.Codec, handler core.HandlerFunc) core.Handl } } -func BytesPerFrame(codec *core.Codec) byte { - channels := byte(codec.Channels) - if channels == 0 { - channels = 1 - } - +func BytesPerSample(codec *core.Codec) int { switch codec.Name { case core.CodecPCML, core.CodecPCM: - return 2 * channels + return 2 case core.CodecPCMU, core.CodecPCMA: - return channels + return 1 } - return 0 } + +func BytesPerFrame(codec *core.Codec) int { + if codec.Channels <= 1 { + return BytesPerSample(codec) + } + return int(codec.Channels) * BytesPerSample(codec) +} + +func FramesPerDuration(codec *core.Codec, duration time.Duration) int { + return int(time.Duration(codec.ClockRate) * duration / time.Second) +} + +func BytesPerDuration(codec *core.Codec, duration time.Duration) int { + return BytesPerFrame(codec) * FramesPerDuration(codec, duration) +} diff --git a/pkg/pcm/pcm.go b/pkg/pcm/pcm.go index 13405ad4..5395621e 100644 --- a/pkg/pcm/pcm.go +++ b/pkg/pcm/pcm.go @@ -1,13 +1,25 @@ package pcm -import "github.com/AlexxIT/go2rtc/pkg/core" +import ( + "math" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func ceil(x float32) int { + d, fract := math.Modf(float64(x)) + if fract == 0.0 { + return int(d) + } + return int(d) + 1 +} func Downsample(k float32) func([]int16) []int16 { var sampleN, sampleSum float32 return func(src []int16) (dst []int16) { var i int - dst = make([]int16, int((float32(len(src))+sampleN)/k)) + dst = make([]int16, ceil((float32(len(src))+sampleN)/k)) for _, sample := range src { sampleSum += float32(sample) sampleN++ @@ -28,7 +40,7 @@ func Upsample(k float32) func([]int16) []int16 { return func(src []int16) (dst []int16) { var i int - dst = make([]int16, int(k*float32(len(src)))) + dst = make([]int16, ceil(k*float32(len(src)))) for _, sample := range src { sampleN += k for sampleN > 0 { diff --git a/pkg/pcm/producer_sync.go b/pkg/pcm/producer_sync.go new file mode 100644 index 00000000..fedef268 --- /dev/null +++ b/pkg/pcm/producer_sync.go @@ -0,0 +1,96 @@ +package pcm + +import ( + "io" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/pion/rtp" +) + +type ProducerSync struct { + core.Connection + src *core.Codec + rd io.Reader + onClose func() +} + +func OpenSync(codec *core.Codec, rd io.Reader) *ProducerSync { + medias := []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionRecvonly, + Codecs: ProducerCodecs(), + }, + } + + return &ProducerSync{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "pcm", + Medias: medias, + Transport: rd, + }, + src: codec, + rd: rd, + } +} + +func (p *ProducerSync) OnClose(f func()) { + p.onClose = f +} + +func (p *ProducerSync) Start() error { + if len(p.Receivers) == 0 { + return nil + } + + var pktSeq uint16 + var pktTS uint32 // time in frames + var pktTime time.Duration // time in seconds + + t0 := time.Now() + + dst := p.Receivers[0].Codec + transcode := Transcode(dst, p.src) + + const chunkDuration = 20 * time.Millisecond + chunkBytes := BytesPerDuration(p.src, chunkDuration) + chunkFrames := uint32(FramesPerDuration(dst, chunkDuration)) + + for { + buf := make([]byte, chunkBytes) + n, _ := io.ReadFull(p.rd, buf) + + if n == 0 { + break + } + + pkt := &core.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + SequenceNumber: pktSeq, + Timestamp: pktTS, + }, + Payload: transcode(buf[:n]), + } + + if d := pktTime - time.Since(t0); d > 0 { + time.Sleep(d) + } + + p.Receivers[0].WriteRTP(pkt) + p.Recv += n + + pktSeq++ + pktTS += chunkFrames + pktTime += chunkDuration + } + + if p.onClose != nil { + p.onClose() + } + + return nil +} diff --git a/pkg/wav/producer.go b/pkg/wav/producer.go index b9b3a878..60bdeaa1 100644 --- a/pkg/wav/producer.go +++ b/pkg/wav/producer.go @@ -2,7 +2,6 @@ package wav import ( "bufio" - "encoding/binary" "errors" "io" @@ -17,39 +16,11 @@ func Open(r io.Reader) (*Producer, error) { // https://www.mmsp.ece.mcgill.ca/Documents/AudioFormats/WAVE/WAVE.html rd := bufio.NewReaderSize(r, core.BufferSize) - // skip Master RIFF chunk - if _, err := rd.Discard(12); err != nil { + codec, err := ReadHeader(r) + if err != nil { return nil, err } - codec := &core.Codec{} - - for { - chunkID, data, err := readChunk(rd) - if err != nil { - return nil, err - } - - if chunkID == "data" { - break - } - - if chunkID == "fmt " { - // https://audiocoding.cc/articles/2008-05-22-wav-file-structure/wav_formats.txt - switch data[0] { - case 1: - codec.Name = core.CodecPCML - case 6: - codec.Name = core.CodecPCMA - case 7: - codec.Name = core.CodecPCMU - } - - codec.Channels = data[2] - codec.ClockRate = binary.LittleEndian.Uint32(data[4:]) - } - } - if codec.Name == "" { return nil, errors.New("waw: unsupported codec") } @@ -110,18 +81,3 @@ func (c *Producer) Start() error { ts += PacketSize } } - -func readChunk(r io.Reader) (chunkID string, data []byte, err error) { - b := make([]byte, 8) - if _, err = io.ReadFull(r, b); err != nil { - return - } - - if chunkID = string(b[:4]); chunkID != "data" { - size := binary.LittleEndian.Uint32(b[4:]) - data = make([]byte, size) - _, err = io.ReadFull(r, data) - } - - return -} diff --git a/pkg/wav/wav.go b/pkg/wav/wav.go index bf48fdf9..9fe857d4 100644 --- a/pkg/wav/wav.go +++ b/pkg/wav/wav.go @@ -2,6 +2,7 @@ package wav import ( "encoding/binary" + "io" "github.com/AlexxIT/go2rtc/pkg/core" ) @@ -48,3 +49,55 @@ func Header(codec *core.Codec) []byte { return b } + +func ReadHeader(r io.Reader) (*core.Codec, error) { + // skip Master RIFF chunk + if _, err := io.ReadFull(r, make([]byte, 12)); err != nil { + return nil, err + } + + var codec core.Codec + + for { + chunkID, data, err := readChunk(r) + if err != nil { + return nil, err + } + + if chunkID == "data" { + break + } + + if chunkID == "fmt " { + // https://audiocoding.cc/articles/2008-05-22-wav-file-structure/wav_formats.txt + switch data[0] { + case 1: + codec.Name = core.CodecPCML + case 6: + codec.Name = core.CodecPCMA + case 7: + codec.Name = core.CodecPCMU + } + + codec.Channels = data[2] + codec.ClockRate = binary.LittleEndian.Uint32(data[4:]) + } + } + + return &codec, nil +} + +func readChunk(r io.Reader) (chunkID string, data []byte, err error) { + b := make([]byte, 8) + if _, err = io.ReadFull(r, b); err != nil { + return + } + + if chunkID = string(b[:4]); chunkID != "data" { + size := binary.LittleEndian.Uint32(b[4:]) + data = make([]byte, size) + _, err = io.ReadFull(r, data) + } + + return +} diff --git a/pkg/wyoming/expr.go b/pkg/wyoming/expr.go index 1b184cc3..f2f58933 100644 --- a/pkg/wyoming/expr.go +++ b/pkg/wyoming/expr.go @@ -1,11 +1,13 @@ package wyoming import ( + "bytes" "fmt" + "os" "time" "github.com/AlexxIT/go2rtc/pkg/expr" - "golang.org/x/net/context" + "github.com/AlexxIT/go2rtc/pkg/wav" ) type env struct { @@ -109,16 +111,21 @@ func (s *satellite) WriteEvent(args ...string) bool { } func (s *satellite) PlayAudio() bool { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + return s.playAudio(sndCodec, bytes.NewReader(s.sndAudio)) +} - prod := newSndProducer(s.sndAudio, cancel) - if err := s.srv.SndHandler(prod); err != nil { +func (s *satellite) PlayFile(path string) bool { + f, err := os.Open(path) + if err != nil { return false - } else { - <-ctx.Done() - return true } + + codec, err := wav.ReadHeader(f) + if err != nil { + return false + } + + return s.playAudio(codec, f) } func (e *env) Sleep(s string) bool { diff --git a/pkg/wyoming/satellite.go b/pkg/wyoming/satellite.go index c45bc50e..0c0e6f30 100644 --- a/pkg/wyoming/satellite.go +++ b/pkg/wyoming/satellite.go @@ -1,10 +1,11 @@ package wyoming import ( + "context" "fmt" + "io" "net" "sync" - "time" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/pcm" @@ -55,13 +56,8 @@ func (s *Server) Handle(conn net.Conn) { sat.sndAudio = sat.sndAudio[:0] case "audio-chunk": // {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0} sat.sndAudio = append(sat.sndAudio, evt.Payload...) - } - - if s.Event == nil || s.Event[evt.Type] == "" { - sat.handleEvent(evt) - } else { - // run async because there may be sleeps - go sat.handleScript(evt) + default: + sat.handleScript(evt) } } } @@ -196,6 +192,21 @@ func (s *satellite) onMicChunk(chunk []byte) { s.micTS += len(chunk) / 2 } +func (s *satellite) playAudio(codec *core.Codec, rd io.Reader) bool { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + prod := pcm.OpenSync(codec, rd) + prod.OnClose(cancel) + + if err := s.srv.SndHandler(prod); err != nil { + return false + } else { + <-ctx.Done() + return true + } +} + type micConsumer struct { core.Connection onData func(chunk []byte) @@ -247,90 +258,6 @@ func (c *micConsumer) Stop() error { return c.Connection.Stop() } -type sndProducer struct { - core.Connection - data []byte - onClose func() -} - -func newSndProducer(data []byte, onClose func()) *sndProducer { - medias := []*core.Media{ - { - Kind: core.KindAudio, - Direction: core.DirectionRecvonly, - Codecs: pcm.ProducerCodecs(), - }, - } - - return &sndProducer{ - core.Connection{ - ID: core.NewID(), - FormatName: "wyoming", - Protocol: "tcp", - Medias: medias, - }, - data, - onClose, - } -} - -func (s *sndProducer) Start() error { - if len(s.Receivers) == 0 { - return nil - } - - var pts time.Duration - var seq uint16 - - t0 := time.Now() - - src := &core.Codec{Name: core.CodecPCML, ClockRate: 22050} - dst := s.Receivers[0].Codec - f := pcm.Transcode(dst, src) - - bps := uint32(pcm.BytesPerFrame(dst)) - - chunkBytes := int(2 * src.ClockRate / 50) // 20ms - - for { - n := len(s.data) - if n == 0 { - break - } - if chunkBytes > n { - chunkBytes = n - } - - pkt := &core.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - SequenceNumber: seq, - Timestamp: uint32(s.Recv/2) * bps, - }, - Payload: f(s.data[:chunkBytes]), - } - - if d := pts - time.Since(t0); d > 0 { - time.Sleep(d) - } - - s.Receivers[0].WriteRTP(pkt) - - s.Recv += chunkBytes - s.data = s.data[chunkBytes:] - - pts += 20 * time.Millisecond - seq++ - } - - if s.onClose != nil { - s.onClose() - } - - return nil -} - func repack(handler core.HandlerFunc) core.HandlerFunc { const PacketSize = 2 * 16000 / 50 // 20ms diff --git a/pkg/wyoming/snd.go b/pkg/wyoming/snd.go index 822c1ed4..e26ca7ea 100644 --- a/pkg/wyoming/snd.go +++ b/pkg/wyoming/snd.go @@ -1,8 +1,11 @@ package wyoming import ( + "bytes" "net" - "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/pcm" ) func (s *Server) HandleSnd(conn net.Conn) { @@ -25,9 +28,7 @@ func (s *Server) HandleSnd(conn net.Conn) { case "audio-chunk": snd = append(snd, evt.Payload...) case "audio-stop": - prod := newSndProducer(snd, func() { - time.Sleep(time.Second) // some extra delay before close - }) + prod := pcm.OpenSync(sndCodec, bytes.NewReader(snd)) if err = s.SndHandler(prod); err != nil { s.Error("snd error: %s", err) return @@ -35,3 +36,5 @@ func (s *Server) HandleSnd(conn net.Conn) { } } } + +var sndCodec = &core.Codec{Name: core.CodecPCML, ClockRate: 22050} From fce41f4fc10dab3a08a160b304c360f63e47c9ec Mon Sep 17 00:00:00 2001 From: Alex X Date: Thu, 24 Apr 2025 22:06:36 +0300 Subject: [PATCH 14/15] Update wyoming readme about events --- internal/wyoming/README.md | 70 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/internal/wyoming/README.md b/internal/wyoming/README.md index 445af2c8..45273df8 100644 --- a/internal/wyoming/README.md +++ b/internal/wyoming/README.md @@ -10,6 +10,7 @@ This module provide [Wyoming Protocol](https://www.home-assistant.io/integration - any desktop/server microphone/speaker can be used as two-way audio source - supported any OS via FFmpeg or any similar software - supported Linux via alsa source +- you can change the behavior using the built-in scripting engine ## Typical Voice Pipeline @@ -57,6 +58,75 @@ Select one or multiple wake words: wake_uri: tcp://192.168.1.23:10400?name=alexa_v0.1&name=hey_jarvis_v0.1&name=hey_mycroft_v0.1&name=hey_rhasspy_v0.1&name=ok_nabu_v0.1 ``` +## Events + +You can add wyoming event handling using the [expr](https://github.com/AlexxIT/go2rtc/blob/master/internal/expr/README.md) language. For example, to pronounce TTS on some media player from HA. + +Turn on the logs to see what kind of events happens. + +This is what the default scripts look like: + +```yaml +wyoming: + script_example: + event: + run-satellite: Detect() + pause-satellite: Stop() + voice-stopped: Pause() + audio-stop: PlayAudio() && WriteEvent("played") && Detect() + error: Detect() + internal-run: WriteEvent("run-pipeline", '{"start_stage":"wake","end_stage":"tts"}') && Stream() + internal-detection: WriteEvent("run-pipeline", '{"start_stage":"asr","end_stage":"tts"}') && Stream() +``` + +If you write a script for an event - the default action is no longer executed. You need to repeat the necessary steps yourself. + +In addition to the standard events, there are two additional events: + +- `internal-run` - called after `Detect()` when VAD detected, but WAKE service unavailable +- `internal-detection` - called after `Detect()` when WAKE word detected + +**Example 1.** You want to play a sound file when a wake word detected (only `wav` supported): + +- `PlayFile` and `PlayAudio` functions are executed synchronously, the following steps will be executed only after they are completed + +```yaml +wyoming: + script_example: + event: + internal-detection: PlayFile('/media/beep.wav') && WriteEvent("run-pipeline", '{"start_stage":"asr","end_stage":"tts"}') && Stream() +``` + +**Example 2.** You want to play TTS on a Home Assistant media player: + +Each event has a `Type` and `Data` in JSON format. You can use their values in scripts. + +- in the `synthesize` step, we get the value of the `text` and call the HA REST API +- in the `audio-stop` step we get the duration of the TTS in seconds, wait for this time and start the pipeline again + +```yaml +wyoming: + script_example: + event: + synthesize: | + let text = fromJSON(Data).text; + let token = 'eyJhbGci...'; + fetch('http://localhost:8123/api/services/tts/speak', { + method: 'POST', + headers: {'Authorization': 'Bearer '+token,'Content-Type': 'application/json'}, + body: toJSON({ + entity_id: 'tts.google_translate_com', + media_player_entity_id: 'media_player.google_nest', + message: text, + language: 'en', + }), + }).ok + audio-stop: | + let timestamp = fromJSON(Data).timestamp; + let delay = string(timestamp)+'s'; + Sleep(delay) && WriteEvent("played") && Detect() +``` + ## Config examples Satellite on Windows server using FFmpeg and FFplay. From 6d37cceb9153068b37f69ed27a271a2bc549f70d Mon Sep 17 00:00:00 2001 From: Alex X Date: Fri, 25 Apr 2025 14:52:11 +0300 Subject: [PATCH 15/15] Improve readme for wyoming module --- internal/wyoming/README.md | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/internal/wyoming/README.md b/internal/wyoming/README.md index 45273df8..f98acb05 100644 --- a/internal/wyoming/README.md +++ b/internal/wyoming/README.md @@ -79,6 +79,20 @@ wyoming: internal-detection: WriteEvent("run-pipeline", '{"start_stage":"asr","end_stage":"tts"}') && Stream() ``` +Supported functions and variables: + +- `Detect()` - start the VAD and WAKE word detection process +- `Stream()` - start transmission of audio data to the client (Home Assistant) +- `Stop()` - stop and disconnect stream without disconnecting client (Home Assistant) +- `Pause()` - temporary pause of audio transfer, without disconnecting the stream +- `PlayAudio()` - playing the last audio that was sent from client (Home Assistant) +- `WriteEvent(type, data)` - send event to client (Home Assistant) +- `Sleep(duration)` - temporary script pause (ex. `Sleep('1.5s')`) +- `PlayFile(path)` - play audio from `wav` file +- `Type` - type (name) of event +- `Data` - event data in JSON format (ex. `{"text":"how are you"}`) +- also available other functions from [expr](https://github.com/AlexxIT/go2rtc/blob/master/internal/expr/README.md) module (ex. `fetch`) + If you write a script for an event - the default action is no longer executed. You need to repeat the necessary steps yourself. In addition to the standard events, there are two additional events: @@ -160,7 +174,7 @@ wyoming: vad_threshold: 1 ``` -Satellite on Dahua camera with two-way audio support. +Satellite on external wyoming Microphone and Sound. ```yaml streams: