diff --git a/internal/exec/README.md b/internal/exec/README.md new file mode 100644 index 00000000..e15a9657 --- /dev/null +++ b/internal/exec/README.md @@ -0,0 +1,12 @@ +## Backchannel + +- You can check audio card names in the **Go2rtc > WebUI > Add** +- You can specify multiple backchannel lines with different codecs + +```yaml +sources: + two_way_audio_win: + - exec:ffmpeg -hide_banner -f dshow -i "audio=Microphone (High Definition Audio Device)" -c pcm_s16le -ar 16000 -ac 1 -f wav - + - exec:ffplay -nodisp -probesize 32 -f s16le -ar 16000 -#backchannel=1#audio=s16le/16000 + - exec:ffplay -nodisp -probesize 32 -f alaw -ar 8000 -#backchannel=1#audio=alaw/8000 +``` diff --git a/internal/exec/exec.go b/internal/exec/exec.go index 89add393..711be8a2 100644 --- a/internal/exec/exec.go +++ b/internal/exec/exec.go @@ -19,9 +19,9 @@ import ( "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/magic" + "github.com/AlexxIT/go2rtc/pkg/pcm" pkg "github.com/AlexxIT/go2rtc/pkg/rtsp" "github.com/AlexxIT/go2rtc/pkg/shell" - "github.com/AlexxIT/go2rtc/pkg/stdin" "github.com/rs/zerolog" ) @@ -86,7 +86,7 @@ func execHandle(rawURL string) (prod core.Producer, err error) { } if query.Get("backchannel") == "1" { - return stdin.NewClient(cmd) + return pcm.NewBackchannel(cmd, query.Get("audio")) } if path == "" { diff --git a/internal/expr/expr.go b/internal/expr/expr.go index a6d1f972..8fd6c9c2 100644 --- a/internal/expr/expr.go +++ b/internal/expr/expr.go @@ -12,7 +12,7 @@ func Init() { log := app.GetLogger("expr") streams.RedirectFunc("expr", func(url string) (string, error) { - v, err := expr.Run(url[5:]) + v, err := expr.Eval(url[5:], nil) if err != nil { return "", err } diff --git a/internal/streams/play.go b/internal/streams/play.go index d72c5e0c..1f8c4ade 100644 --- a/internal/streams/play.go +++ b/internal/streams/play.go @@ -7,7 +7,7 @@ import ( "github.com/AlexxIT/go2rtc/pkg/core" ) -func (s *Stream) Play(source string) error { +func (s *Stream) Play(urlOrProd any) error { s.mu.Lock() for _, producer := range s.producers { if producer.state == stateInternal && producer.conn != nil { @@ -16,12 +16,18 @@ func (s *Stream) Play(source string) error { } s.mu.Unlock() - if source == "" { - return nil - } - + var source string var src core.Producer + switch urlOrProd.(type) { + case string: + if source = urlOrProd.(string); source == "" { + return nil + } + case core.Producer: + src = urlOrProd.(core.Producer) + } + for _, producer := range s.producers { if producer.conn == nil { continue diff --git a/internal/wyoming/README.md b/internal/wyoming/README.md new file mode 100644 index 00000000..f98acb05 --- /dev/null +++ b/internal/wyoming/README.md @@ -0,0 +1,281 @@ +# Wyoming + +This module provide [Wyoming Protocol](https://www.home-assistant.io/integrations/wyoming/) support to create local voice assistants using [Home Assistant](https://www.home-assistant.io/). + +- go2rtc can act as [Wyoming Satellite](https://github.com/rhasspy/wyoming-satellite) +- go2rtc can act as [Wyoming External Microphone](https://github.com/rhasspy/wyoming-mic-external) +- go2rtc can act as [Wyoming External Sound](https://github.com/rhasspy/wyoming-snd-external) +- any supported audio source with PCM codec can be used as audio input +- any supported two-way audio source with PCM codec can be used as audio output +- any desktop/server microphone/speaker can be used as two-way audio source + - supported any OS via FFmpeg or any similar software + - supported Linux via alsa source +- you can change the behavior using the built-in scripting engine + +## Typical Voice Pipeline + +1. Audio stream (MIC) + - any audio source with PCM codec support (include PCMA/PCMU) +2. Voice Activity Detector (VAD) +3. Wake Word (WAKE) + - [OpenWakeWord](https://www.home-assistant.io/voice_control/create_wake_word/) +4. Speech-to-Text (STT) + - [Whisper](https://github.com/home-assistant/addons/blob/master/whisper/README.md) + - [Vosk](https://github.com/rhasspy/hassio-addons/blob/master/vosk/README.md) +5. Conversation agent (INTENT) + - [Home Assistant](https://www.home-assistant.io/integrations/conversation/) +6. Text-to-speech (TTS) + - [Google Translate](https://www.home-assistant.io/integrations/google_translate/) + - [Piper](https://github.com/home-assistant/addons/blob/master/piper/README.md) +7. Audio stream (SND) + - any source with two-way audio (backchannel) and PCM codec support (include PCMA/PCMU) + +You can use a large number of different projects for WAKE, STT, INTENT and TTS thanks to the Home Assistant. + +And you can use a large number of different technologies for MIC and SND thanks to Go2rtc. + +## Configuration + +You can optionally specify WAKE service. So go2rtc will start transmitting audio to Home Assistant only after WAKE word. If the WAKE service cannot be connected to or not specified - go2rtc will pass all audio to Home Assistant. In this case WAKE service must be configured in your Voice Assistant pipeline. + +You can optionally specify VAD threshold. So go2rtc will start transmitting audio to WAKE service only after some audio noise. + +Your stream must support audio transmission in PCM codec (include PCMA/PCMU). + +```yaml +wyoming: + stream_name_from_streams_section: + listen: :10700 + name: "My Satellite" # optional name + wake_uri: tcp://192.168.1.23:10400 # optional WAKE service + vad_threshold: 1 # optional VAD threshold (from 0.1 to 3.5) +``` + +Home Assistant -> Settings -> Integrations -> Add -> Wyoming Protocol -> Host + Port from `go2rtc.yaml` + +Select one or multiple wake words: +```yaml +wake_uri: tcp://192.168.1.23:10400?name=alexa_v0.1&name=hey_jarvis_v0.1&name=hey_mycroft_v0.1&name=hey_rhasspy_v0.1&name=ok_nabu_v0.1 +``` + +## Events + +You can add wyoming event handling using the [expr](https://github.com/AlexxIT/go2rtc/blob/master/internal/expr/README.md) language. For example, to pronounce TTS on some media player from HA. + +Turn on the logs to see what kind of events happens. + +This is what the default scripts look like: + +```yaml +wyoming: + script_example: + event: + run-satellite: Detect() + pause-satellite: Stop() + voice-stopped: Pause() + audio-stop: PlayAudio() && WriteEvent("played") && Detect() + error: Detect() + internal-run: WriteEvent("run-pipeline", '{"start_stage":"wake","end_stage":"tts"}') && Stream() + internal-detection: WriteEvent("run-pipeline", '{"start_stage":"asr","end_stage":"tts"}') && Stream() +``` + +Supported functions and variables: + +- `Detect()` - start the VAD and WAKE word detection process +- `Stream()` - start transmission of audio data to the client (Home Assistant) +- `Stop()` - stop and disconnect stream without disconnecting client (Home Assistant) +- `Pause()` - temporary pause of audio transfer, without disconnecting the stream +- `PlayAudio()` - playing the last audio that was sent from client (Home Assistant) +- `WriteEvent(type, data)` - send event to client (Home Assistant) +- `Sleep(duration)` - temporary script pause (ex. `Sleep('1.5s')`) +- `PlayFile(path)` - play audio from `wav` file +- `Type` - type (name) of event +- `Data` - event data in JSON format (ex. `{"text":"how are you"}`) +- also available other functions from [expr](https://github.com/AlexxIT/go2rtc/blob/master/internal/expr/README.md) module (ex. `fetch`) + +If you write a script for an event - the default action is no longer executed. You need to repeat the necessary steps yourself. + +In addition to the standard events, there are two additional events: + +- `internal-run` - called after `Detect()` when VAD detected, but WAKE service unavailable +- `internal-detection` - called after `Detect()` when WAKE word detected + +**Example 1.** You want to play a sound file when a wake word detected (only `wav` supported): + +- `PlayFile` and `PlayAudio` functions are executed synchronously, the following steps will be executed only after they are completed + +```yaml +wyoming: + script_example: + event: + internal-detection: PlayFile('/media/beep.wav') && WriteEvent("run-pipeline", '{"start_stage":"asr","end_stage":"tts"}') && Stream() +``` + +**Example 2.** You want to play TTS on a Home Assistant media player: + +Each event has a `Type` and `Data` in JSON format. You can use their values in scripts. + +- in the `synthesize` step, we get the value of the `text` and call the HA REST API +- in the `audio-stop` step we get the duration of the TTS in seconds, wait for this time and start the pipeline again + +```yaml +wyoming: + script_example: + event: + synthesize: | + let text = fromJSON(Data).text; + let token = 'eyJhbGci...'; + fetch('http://localhost:8123/api/services/tts/speak', { + method: 'POST', + headers: {'Authorization': 'Bearer '+token,'Content-Type': 'application/json'}, + body: toJSON({ + entity_id: 'tts.google_translate_com', + media_player_entity_id: 'media_player.google_nest', + message: text, + language: 'en', + }), + }).ok + audio-stop: | + let timestamp = fromJSON(Data).timestamp; + let delay = string(timestamp)+'s'; + Sleep(delay) && WriteEvent("played") && Detect() +``` + +## Config examples + +Satellite on Windows server using FFmpeg and FFplay. + +```yaml +streams: + satellite_win: + - exec:ffmpeg -hide_banner -f dshow -i "audio=Microphone (High Definition Audio Device)" -c pcm_s16le -ar 16000 -ac 1 -f wav - + - exec:ffplay -hide_banner -nodisp -probesize 32 -f s16le -ar 22050 -#backchannel=1#audio=s16le/22050 + +wyoming: + satellite_win: + listen: :10700 + name: "Windows Satellite" + wake_uri: tcp://192.168.1.23:10400 + vad_threshold: 1 +``` + +Satellite on Dahua camera with two-way audio support. + +```yaml +streams: + dahua_camera: + - rtsp://admin:password@192.168.1.123/cam/realmonitor?channel=1&subtype=1&unicast=true&proto=Onvif + +wyoming: + dahua_camera: + listen: :10700 + name: "Dahua Satellite" + wake_uri: tcp://192.168.1.23:10400 + vad_threshold: 1 +``` + +Satellite on external wyoming Microphone and Sound. + +```yaml +streams: + wyoming_external: + - wyoming://192.168.1.23:10600 # wyoming-mic-external + - wyoming://192.168.1.23:10601?backchannel=1 # wyoming-snd-external + +wyoming: + wyoming_external: + listen: :10700 + name: "Wyoming Satellite" + wake_uri: tcp://192.168.1.23:10400 + vad_threshold: 1 +``` + +## Wyoming External Microphone and Sound + +Advanced users, who want to enjoy the [Wyoming Satellite](https://github.com/rhasspy/wyoming-satellite) project, can use go2rtc as a [Wyoming External Microphone](https://github.com/rhasspy/wyoming-mic-external) or [Wyoming External Sound](https://github.com/rhasspy/wyoming-snd-external). + +**go2rtc.yaml** + +```yaml +streams: + wyoming_mic_external: + - exec:ffmpeg -hide_banner -f dshow -i "audio=Microphone (High Definition Audio Device)" -c pcm_s16le -ar 16000 -ac 1 -f wav - + wyoming_snd_external: + - exec:ffplay -hide_banner -nodisp -probesize 32 -f s16le -ar 22050 -#backchannel=1#audio=s16le/22050 + +wyoming: + wyoming_mic_external: + listen: :10600 + mode: mic + wyoming_snd_external: + listen: :10601 + mode: snd +``` + +**docker-compose.yml** + +```yaml +version: "3.8" +services: + satellite: + build: wyoming-satellite # https://github.com/rhasspy/wyoming-satellite + ports: + - "10700:10700" + command: + - "--name" + - "my satellite" + - "--mic-uri" + - "tcp://192.168.1.23:10600" + - "--snd-uri" + - "tcp://192.168.1.23:10601" + - "--debug" +``` + +## Wyoming External Source + +**go2rtc.yaml** + +```yaml +streams: + wyoming_external: + - wyoming://192.168.1.23:10600 + - wyoming://192.168.1.23:10601?backchannel=1 +``` + +**docker-compose.yml** + +```yaml +version: "3.8" +services: + microphone: + build: wyoming-mic-external # https://github.com/rhasspy/wyoming-mic-external + ports: + - "10600:10600" + devices: + - /dev/snd:/dev/snd + group_add: + - audio + command: + - "--device" + - "sysdefault" + - "--debug" + playback: + build: wyoming-snd-external # https://github.com/rhasspy/wyoming-snd-external + ports: + - "10601:10601" + devices: + - /dev/snd:/dev/snd + group_add: + - audio + command: + - "--device" + - "sysdefault" + - "--debug" +``` + +## Debug + +```yaml +log: + wyoming: trace +``` diff --git a/internal/wyoming/wyoming.go b/internal/wyoming/wyoming.go new file mode 100644 index 00000000..065275c3 --- /dev/null +++ b/internal/wyoming/wyoming.go @@ -0,0 +1,106 @@ +package wyoming + +import ( + "net" + + "github.com/AlexxIT/go2rtc/internal/app" + "github.com/AlexxIT/go2rtc/internal/streams" + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/wyoming" + "github.com/rs/zerolog" +) + +func Init() { + streams.HandleFunc("wyoming", wyoming.Dial) + + // server + var cfg struct { + Mod map[string]struct { + Listen string `yaml:"listen"` + Name string `yaml:"name"` + Mode string `yaml:"mode"` + Event map[string]string `yaml:"event"` + WakeURI string `yaml:"wake_uri"` + VADThreshold float32 `yaml:"vad_threshold"` + } `yaml:"wyoming"` + } + app.LoadConfig(&cfg) + + log = app.GetLogger("wyoming") + + for name, cfg := range cfg.Mod { + stream := streams.Get(name) + if stream == nil { + log.Warn().Msgf("[wyoming] missing stream: %s", name) + continue + } + + if cfg.Name == "" { + cfg.Name = name + } + + srv := &wyoming.Server{ + Name: cfg.Name, + Event: cfg.Event, + VADThreshold: int16(1000 * cfg.VADThreshold), // 1.0 => 1000 + WakeURI: cfg.WakeURI, + MicHandler: func(cons core.Consumer) error { + if err := stream.AddConsumer(cons); err != nil { + return err + } + // not best solution + if i, ok := cons.(interface{ OnClose(func()) }); ok { + i.OnClose(func() { + stream.RemoveConsumer(cons) + }) + } + return nil + }, + SndHandler: func(prod core.Producer) error { + return stream.Play(prod) + }, + Trace: func(format string, v ...any) { + log.Trace().Msgf("[wyoming] "+format, v...) + }, + Error: func(format string, v ...any) { + log.Error().Msgf("[wyoming] "+format, v...) + }, + } + go serve(srv, cfg.Mode, cfg.Listen) + } +} + +var log zerolog.Logger + +func serve(srv *wyoming.Server, mode, address string) { + ln, err := net.Listen("tcp", address) + if err != nil { + log.Warn().Err(err).Msgf("[wyoming] listen") + } + + for { + conn, err := ln.Accept() + if err != nil { + return + } + + go handle(srv, mode, conn) + } +} + +func handle(srv *wyoming.Server, mode string, conn net.Conn) { + addr := conn.RemoteAddr() + + log.Trace().Msgf("[wyoming] %s connected", addr) + + switch mode { + case "mic": + srv.HandleMic(conn) + case "snd": + srv.HandleSnd(conn) + default: + srv.Handle(conn) + } + + log.Trace().Msgf("[wyoming] %s disconnected", addr) +} diff --git a/main.go b/main.go index f8aba89e..295de219 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,7 @@ import ( "github.com/AlexxIT/go2rtc/internal/v4l2" "github.com/AlexxIT/go2rtc/internal/webrtc" "github.com/AlexxIT/go2rtc/internal/webtorrent" + "github.com/AlexxIT/go2rtc/internal/wyoming" "github.com/AlexxIT/go2rtc/pkg/shell" ) @@ -69,6 +70,7 @@ func main() { hass.Init() // hass source, Hass API server onvif.Init() // onvif source, ONVIF API server webtorrent.Init() // webtorrent source, WebTorrent module + wyoming.Init() // 5. Other sources diff --git a/pkg/core/codec.go b/pkg/core/codec.go index 708839b3..ba0c656a 100644 --- a/pkg/core/codec.go +++ b/pkg/core/codec.go @@ -249,3 +249,36 @@ func DecodeH264(fmtp string) (profile string, level byte) { } return } + +func ParseCodecString(s string) *Codec { + var codec Codec + + ss := strings.Split(s, "/") + switch strings.ToLower(ss[0]) { + case "pcm_s16be", "s16be", "pcm": + codec.Name = CodecPCM + case "pcm_s16le", "s16le", "pcml": + codec.Name = CodecPCML + case "pcm_alaw", "alaw", "pcma": + codec.Name = CodecPCMA + case "pcm_mulaw", "mulaw", "pcmu": + codec.Name = CodecPCMU + case "aac", "mpeg4-generic": + codec.Name = CodecAAC + case "opus": + codec.Name = CodecOpus + case "flac": + codec.Name = CodecFLAC + default: + return nil + } + + if len(ss) >= 2 { + codec.ClockRate = uint32(Atoi(ss[1])) + } + if len(ss) >= 3 { + codec.Channels = uint8(Atoi(ss[1])) + } + + return &codec +} diff --git a/pkg/expr/expr.go b/pkg/expr/expr.go index 36719100..4a8a663c 100644 --- a/pkg/expr/expr.go +++ b/pkg/expr/expr.go @@ -6,17 +6,24 @@ import ( "io" "net/http" "regexp" + "strings" "github.com/AlexxIT/go2rtc/pkg/tcp" "github.com/expr-lang/expr" + "github.com/expr-lang/expr/vm" ) -func newRequest(method, url string, headers map[string]any) (*http.Request, error) { +func newRequest(method, url string, headers map[string]any, body string) (*http.Request, error) { + var rd io.Reader + if method == "" { method = "GET" } + if body != "" { + rd = strings.NewReader(body) + } - req, err := http.NewRequest(method, url, nil) + req, err := http.NewRequest(method, url, rd) if err != nil { return nil, err } @@ -55,7 +62,8 @@ var Options = []expr.Option{ options := params[1].(map[string]any) method, _ := options["method"].(string) headers, _ := options["headers"].(map[string]any) - req, err = newRequest(method, url, headers) + body, _ := options["body"].(string) + req, err = newRequest(method, url, headers, body) } else { req, err = http.NewRequest("GET", url, nil) } @@ -105,11 +113,19 @@ var Options = []expr.Option{ ), } -func Run(input string) (any, error) { - program, err := expr.Compile(input, Options...) +func Compile(input string) (*vm.Program, error) { + return expr.Compile(input, Options...) +} + +func Eval(input string, env any) (any, error) { + program, err := Compile(input) if err != nil { return nil, err } - return expr.Run(program, nil) + return expr.Run(program, env) +} + +func Run(program *vm.Program, env any) (any, error) { + return vm.Run(program, env) } diff --git a/pkg/expr/expr_test.go b/pkg/expr/expr_test.go index 14e75b2a..096afcdc 100644 --- a/pkg/expr/expr_test.go +++ b/pkg/expr/expr_test.go @@ -7,11 +7,11 @@ import ( ) func TestMatchHost(t *testing.T) { - v, err := Run(` + v, err := Eval(` let url = "rtsp://user:pass@192.168.1.123/cam/realmonitor?..."; let host = match(url, "//[^/]+")[0][2:]; host -`) +`, nil) require.Nil(t, err) require.Equal(t, "user:pass@192.168.1.123", v) } diff --git a/pkg/pcm/backchannel.go b/pkg/pcm/backchannel.go new file mode 100644 index 00000000..99b6e3aa --- /dev/null +++ b/pkg/pcm/backchannel.go @@ -0,0 +1,69 @@ +package pcm + +import ( + "errors" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/shell" + "github.com/pion/rtp" +) + +type Backchannel struct { + core.Connection + cmd *shell.Command +} + +func NewBackchannel(cmd *shell.Command, audio string) (core.Producer, error) { + var codec *core.Codec + + if audio == "" { + // default codec + codec = &core.Codec{Name: core.CodecPCML, ClockRate: 16000} + } else if codec = core.ParseCodecString(audio); codec == nil { + return nil, errors.New("pcm: unsupported audio format: " + audio) + } + + medias := []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{codec}, + }, + } + + return &Backchannel{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "pcm", + Protocol: "pipe", + Medias: medias, + Transport: cmd, + }, + cmd: cmd, + }, nil +} + +func (c *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + return nil, core.ErrCantGetTrack +} + +func (c *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + wr, err := c.cmd.StdinPipe() + if err != nil { + return err + } + + sender := core.NewSender(media, track.Codec) + sender.Handler = func(packet *rtp.Packet) { + if n, err := wr.Write(packet.Payload); err != nil { + c.Send += n + } + } + sender.HandleRTP(track) + c.Senders = append(c.Senders, sender) + return nil +} + +func (c *Backchannel) Start() error { + return c.cmd.Run() +} diff --git a/pkg/pcm/handlers.go b/pkg/pcm/handlers.go index 39075199..18a96468 100644 --- a/pkg/pcm/handlers.go +++ b/pkg/pcm/handlers.go @@ -2,6 +2,7 @@ package pcm import ( "sync" + "time" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/pion/rtp" @@ -82,18 +83,27 @@ func TranscodeHandler(dst, src *core.Codec, handler core.HandlerFunc) core.Handl } } -func BytesPerFrame(codec *core.Codec) byte { - channels := byte(codec.Channels) - if channels == 0 { - channels = 1 - } - +func BytesPerSample(codec *core.Codec) int { switch codec.Name { case core.CodecPCML, core.CodecPCM: - return 2 * channels + return 2 case core.CodecPCMU, core.CodecPCMA: - return channels + return 1 } - return 0 } + +func BytesPerFrame(codec *core.Codec) int { + if codec.Channels <= 1 { + return BytesPerSample(codec) + } + return int(codec.Channels) * BytesPerSample(codec) +} + +func FramesPerDuration(codec *core.Codec, duration time.Duration) int { + return int(time.Duration(codec.ClockRate) * duration / time.Second) +} + +func BytesPerDuration(codec *core.Codec, duration time.Duration) int { + return BytesPerFrame(codec) * FramesPerDuration(codec, duration) +} diff --git a/pkg/pcm/pcm.go b/pkg/pcm/pcm.go index 6872c503..5395621e 100644 --- a/pkg/pcm/pcm.go +++ b/pkg/pcm/pcm.go @@ -1,13 +1,25 @@ package pcm -import "github.com/AlexxIT/go2rtc/pkg/core" +import ( + "math" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func ceil(x float32) int { + d, fract := math.Modf(float64(x)) + if fract == 0.0 { + return int(d) + } + return int(d) + 1 +} func Downsample(k float32) func([]int16) []int16 { var sampleN, sampleSum float32 return func(src []int16) (dst []int16) { var i int - dst = make([]int16, int((float32(len(src))+sampleN)/k)) + dst = make([]int16, ceil((float32(len(src))+sampleN)/k)) for _, sample := range src { sampleSum += float32(sample) sampleN++ @@ -28,7 +40,7 @@ func Upsample(k float32) func([]int16) []int16 { return func(src []int16) (dst []int16) { var i int - dst = make([]int16, int(k*float32(len(src)))) + dst = make([]int16, ceil(k*float32(len(src)))) for _, sample := range src { sampleN += k for sampleN > 0 { @@ -185,3 +197,24 @@ func Transcode(dst, src *core.Codec) func([]byte) []byte { return writer(samples) } } + +func ConsumerCodecs() []*core.Codec { + return []*core.Codec{ + {Name: core.CodecPCML}, + {Name: core.CodecPCM}, + {Name: core.CodecPCMA}, + {Name: core.CodecPCMU}, + } +} + +func ProducerCodecs() []*core.Codec { + return []*core.Codec{ + {Name: core.CodecPCML, ClockRate: 16000}, + {Name: core.CodecPCM, ClockRate: 16000}, + {Name: core.CodecPCML, ClockRate: 8000}, + {Name: core.CodecPCM, ClockRate: 8000}, + {Name: core.CodecPCMA, ClockRate: 8000}, + {Name: core.CodecPCMU, ClockRate: 8000}, + {Name: core.CodecPCML, ClockRate: 22050}, // wyoming-snd-external + } +} diff --git a/pkg/pcm/producer_sync.go b/pkg/pcm/producer_sync.go new file mode 100644 index 00000000..fedef268 --- /dev/null +++ b/pkg/pcm/producer_sync.go @@ -0,0 +1,96 @@ +package pcm + +import ( + "io" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/pion/rtp" +) + +type ProducerSync struct { + core.Connection + src *core.Codec + rd io.Reader + onClose func() +} + +func OpenSync(codec *core.Codec, rd io.Reader) *ProducerSync { + medias := []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionRecvonly, + Codecs: ProducerCodecs(), + }, + } + + return &ProducerSync{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "pcm", + Medias: medias, + Transport: rd, + }, + src: codec, + rd: rd, + } +} + +func (p *ProducerSync) OnClose(f func()) { + p.onClose = f +} + +func (p *ProducerSync) Start() error { + if len(p.Receivers) == 0 { + return nil + } + + var pktSeq uint16 + var pktTS uint32 // time in frames + var pktTime time.Duration // time in seconds + + t0 := time.Now() + + dst := p.Receivers[0].Codec + transcode := Transcode(dst, p.src) + + const chunkDuration = 20 * time.Millisecond + chunkBytes := BytesPerDuration(p.src, chunkDuration) + chunkFrames := uint32(FramesPerDuration(dst, chunkDuration)) + + for { + buf := make([]byte, chunkBytes) + n, _ := io.ReadFull(p.rd, buf) + + if n == 0 { + break + } + + pkt := &core.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + SequenceNumber: pktSeq, + Timestamp: pktTS, + }, + Payload: transcode(buf[:n]), + } + + if d := pktTime - time.Since(t0); d > 0 { + time.Sleep(d) + } + + p.Receivers[0].WriteRTP(pkt) + p.Recv += n + + pktSeq++ + pktTS += chunkFrames + pktTime += chunkDuration + } + + if p.onClose != nil { + p.onClose() + } + + return nil +} diff --git a/pkg/pcm/s16le/s16le.go b/pkg/pcm/s16le/s16le.go new file mode 100644 index 00000000..acd2d4fc --- /dev/null +++ b/pkg/pcm/s16le/s16le.go @@ -0,0 +1,42 @@ +package s16le + +func PeaksRMS(b []byte) int16 { + // RMS of sine wave = peak / sqrt2 + // https://en.wikipedia.org/wiki/Root_mean_square + // https://www.youtube.com/watch?v=MUDkL4KZi0I + var peaks int32 + var peaksSum int32 + var prevSample int16 + var prevUp bool + + var i int + for n := len(b); i < n; { + lo := b[i] + i++ + hi := b[i] + i++ + + sample := int16(hi)<<8 | int16(lo) + up := sample >= prevSample + + if i >= 4 { + if up != prevUp { + if prevSample >= 0 { + peaksSum += int32(prevSample) + } else { + peaksSum -= int32(prevSample) + } + peaks++ + } + } + + prevSample = sample + prevUp = up + } + + if peaks == 0 { + return 0 + } + + return int16(peaksSum / peaks) +} diff --git a/pkg/stdin/backchannel.go b/pkg/stdin/backchannel.go deleted file mode 100644 index b154a291..00000000 --- a/pkg/stdin/backchannel.go +++ /dev/null @@ -1,59 +0,0 @@ -package stdin - -import ( - "encoding/json" - - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/pion/rtp" -) - -func (c *Client) GetMedias() []*core.Media { - return c.medias -} - -func (c *Client) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { - return nil, core.ErrCantGetTrack -} - -func (c *Client) AddTrack(media *core.Media, _ *core.Codec, track *core.Receiver) error { - if c.sender == nil { - stdin, err := c.cmd.StdinPipe() - if err != nil { - return err - } - - c.sender = core.NewSender(media, track.Codec) - c.sender.Handler = func(packet *rtp.Packet) { - _, _ = stdin.Write(packet.Payload) - c.send += len(packet.Payload) - } - } - - c.sender.HandleRTP(track) - return nil -} - -func (c *Client) Start() (err error) { - return c.cmd.Run() -} - -func (c *Client) Stop() (err error) { - if c.sender != nil { - c.sender.Close() - } - return c.cmd.Close() -} - -func (c *Client) MarshalJSON() ([]byte, error) { - info := &core.Connection{ - ID: core.ID(c), - FormatName: "exec", - Protocol: "pipe", - Medias: c.medias, - Send: c.send, - } - if c.sender != nil { - info.Senders = []*core.Sender{c.sender} - } - return json.Marshal(info) -} diff --git a/pkg/stdin/client.go b/pkg/stdin/client.go deleted file mode 100644 index a77d4459..00000000 --- a/pkg/stdin/client.go +++ /dev/null @@ -1,33 +0,0 @@ -package stdin - -import ( - "github.com/AlexxIT/go2rtc/pkg/core" - "github.com/AlexxIT/go2rtc/pkg/shell" -) - -// Deprecated: should be rewritten to core.Connection -type Client struct { - cmd *shell.Command - - medias []*core.Media - sender *core.Sender - send int -} - -func NewClient(cmd *shell.Command) (*Client, error) { - c := &Client{ - cmd: cmd, - medias: []*core.Media{ - { - Kind: core.KindAudio, - Direction: core.DirectionSendonly, - Codecs: []*core.Codec{ - {Name: core.CodecPCMA, ClockRate: 8000}, - {Name: core.CodecPCM}, - }, - }, - }, - } - - return c, nil -} diff --git a/pkg/wav/backchannel.go b/pkg/wav/backchannel.go new file mode 100644 index 00000000..f9697ee4 --- /dev/null +++ b/pkg/wav/backchannel.go @@ -0,0 +1,67 @@ +package wav + +import ( + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/shell" + "github.com/pion/rtp" +) + +type Backchannel struct { + core.Connection + cmd *shell.Command +} + +func NewBackchannel(cmd *shell.Command) (core.Producer, error) { + medias := []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + //{Name: core.CodecPCML}, + {Name: core.CodecPCMA}, + {Name: core.CodecPCMU}, + }, + }, + } + + return &Backchannel{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "wav", + Protocol: "pipe", + Medias: medias, + Transport: cmd, + }, + cmd: cmd, + }, nil +} + +func (c *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + return nil, core.ErrCantGetTrack +} + +func (c *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + wr, err := c.cmd.StdinPipe() + if err != nil { + return err + } + + b := Header(track.Codec) + if _, err = wr.Write(b); err != nil { + return err + } + + sender := core.NewSender(media, track.Codec) + sender.Handler = func(packet *rtp.Packet) { + if n, err := wr.Write(packet.Payload); err != nil { + c.Send += n + } + } + sender.HandleRTP(track) + c.Senders = append(c.Senders, sender) + return nil +} + +func (c *Backchannel) Start() error { + return c.cmd.Run() +} diff --git a/pkg/wav/producer.go b/pkg/wav/producer.go index b9b3a878..60bdeaa1 100644 --- a/pkg/wav/producer.go +++ b/pkg/wav/producer.go @@ -2,7 +2,6 @@ package wav import ( "bufio" - "encoding/binary" "errors" "io" @@ -17,39 +16,11 @@ func Open(r io.Reader) (*Producer, error) { // https://www.mmsp.ece.mcgill.ca/Documents/AudioFormats/WAVE/WAVE.html rd := bufio.NewReaderSize(r, core.BufferSize) - // skip Master RIFF chunk - if _, err := rd.Discard(12); err != nil { + codec, err := ReadHeader(r) + if err != nil { return nil, err } - codec := &core.Codec{} - - for { - chunkID, data, err := readChunk(rd) - if err != nil { - return nil, err - } - - if chunkID == "data" { - break - } - - if chunkID == "fmt " { - // https://audiocoding.cc/articles/2008-05-22-wav-file-structure/wav_formats.txt - switch data[0] { - case 1: - codec.Name = core.CodecPCML - case 6: - codec.Name = core.CodecPCMA - case 7: - codec.Name = core.CodecPCMU - } - - codec.Channels = data[2] - codec.ClockRate = binary.LittleEndian.Uint32(data[4:]) - } - } - if codec.Name == "" { return nil, errors.New("waw: unsupported codec") } @@ -110,18 +81,3 @@ func (c *Producer) Start() error { ts += PacketSize } } - -func readChunk(r io.Reader) (chunkID string, data []byte, err error) { - b := make([]byte, 8) - if _, err = io.ReadFull(r, b); err != nil { - return - } - - if chunkID = string(b[:4]); chunkID != "data" { - size := binary.LittleEndian.Uint32(b[4:]) - data = make([]byte, size) - _, err = io.ReadFull(r, data) - } - - return -} diff --git a/pkg/wav/wav.go b/pkg/wav/wav.go new file mode 100644 index 00000000..9fe857d4 --- /dev/null +++ b/pkg/wav/wav.go @@ -0,0 +1,103 @@ +package wav + +import ( + "encoding/binary" + "io" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func Header(codec *core.Codec) []byte { + var fmt, size, extra byte + + switch codec.Name { + case core.CodecPCML: + fmt = 1 + size = 2 + case core.CodecPCMA: + fmt = 6 + size = 1 + extra = 2 + case core.CodecPCMU: + fmt = 7 + size = 1 + extra = 2 + default: + return nil + } + + channels := byte(codec.Channels) + if channels == 0 { + channels = 1 + } + + b := make([]byte, 0, 46) // cap with extra + b = append(b, "RIFF\xFF\xFF\xFF\xFFWAVEfmt "...) + + b = append(b, 0x10+extra, 0, 0, 0) + b = append(b, fmt, 0) + b = append(b, channels, 0) + b = binary.LittleEndian.AppendUint32(b, codec.ClockRate) + b = binary.LittleEndian.AppendUint32(b, uint32(size*channels)*codec.ClockRate) + b = append(b, size*channels, 0) + b = append(b, size*8, 0) + if extra > 0 { + b = append(b, 0, 0) // ExtraParamSize (if PCM, then doesn't exist) + } + + b = append(b, "data\xFF\xFF\xFF\xFF"...) + + return b +} + +func ReadHeader(r io.Reader) (*core.Codec, error) { + // skip Master RIFF chunk + if _, err := io.ReadFull(r, make([]byte, 12)); err != nil { + return nil, err + } + + var codec core.Codec + + for { + chunkID, data, err := readChunk(r) + if err != nil { + return nil, err + } + + if chunkID == "data" { + break + } + + if chunkID == "fmt " { + // https://audiocoding.cc/articles/2008-05-22-wav-file-structure/wav_formats.txt + switch data[0] { + case 1: + codec.Name = core.CodecPCML + case 6: + codec.Name = core.CodecPCMA + case 7: + codec.Name = core.CodecPCMU + } + + codec.Channels = data[2] + codec.ClockRate = binary.LittleEndian.Uint32(data[4:]) + } + } + + return &codec, nil +} + +func readChunk(r io.Reader) (chunkID string, data []byte, err error) { + b := make([]byte, 8) + if _, err = io.ReadFull(r, b); err != nil { + return + } + + if chunkID = string(b[:4]); chunkID != "data" { + size := binary.LittleEndian.Uint32(b[4:]) + data = make([]byte, size) + _, err = io.ReadFull(r, data) + } + + return +} diff --git a/pkg/wyoming/README.md b/pkg/wyoming/README.md new file mode 100644 index 00000000..ff17d079 --- /dev/null +++ b/pkg/wyoming/README.md @@ -0,0 +1,14 @@ +## Default wake words + +- alexa_v0.1 +- hey_jarvis_v0.1 +- hey_mycroft_v0.1 +- hey_rhasspy_v0.1 +- ok_nabu_v0.1 + +## Useful Links + +- https://github.com/rhasspy/wyoming-satellite +- https://github.com/rhasspy/wyoming-openwakeword +- https://github.com/fwartner/home-assistant-wakewords-collection +- https://github.com/esphome/micro-wake-word-models/tree/main?tab=readme-ov-file diff --git a/pkg/wyoming/api.go b/pkg/wyoming/api.go new file mode 100644 index 00000000..ce297a22 --- /dev/null +++ b/pkg/wyoming/api.go @@ -0,0 +1,99 @@ +package wyoming + +import ( + "bufio" + "encoding/json" + "io" + "net" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +type API struct { + conn net.Conn + rd *bufio.Reader +} + +func DialAPI(address string) (*API, error) { + conn, err := net.DialTimeout("tcp", address, core.ConnDialTimeout) + if err != nil { + return nil, err + } + + return NewAPI(conn), nil +} + +const Version = "1.5.4" + +func NewAPI(conn net.Conn) *API { + return &API{conn: conn, rd: bufio.NewReader(conn)} +} + +func (w *API) WriteEvent(evt *Event) (err error) { + hdr := EventHeader{ + Type: evt.Type, + Version: Version, + DataLength: len(evt.Data), + PayloadLength: len(evt.Payload), + } + + buf, err := json.Marshal(hdr) + if err != nil { + return err + } + + buf = append(buf, '\n') + buf = append(buf, evt.Data...) + buf = append(buf, evt.Payload...) + + _, err = w.conn.Write(buf) + return err +} + +func (w *API) ReadEvent() (*Event, error) { + data, err := w.rd.ReadBytes('\n') + if err != nil { + return nil, err + } + + var hdr EventHeader + if err = json.Unmarshal(data, &hdr); err != nil { + return nil, err + } + + evt := Event{Type: hdr.Type} + + if hdr.DataLength > 0 { + data = make([]byte, hdr.DataLength) + if _, err = io.ReadFull(w.rd, data); err != nil { + return nil, err + } + evt.Data = string(data) + } + + if hdr.PayloadLength > 0 { + evt.Payload = make([]byte, hdr.PayloadLength) + if _, err = io.ReadFull(w.rd, evt.Payload); err != nil { + return nil, err + } + } + + return &evt, nil +} + +func (w *API) Close() error { + return w.conn.Close() +} + +type Event struct { + Type string + Data string + Payload []byte +} + +type EventHeader struct { + Type string `json:"type"` + Version string `json:"version"` + DataLength int `json:"data_length,omitempty"` + PayloadLength int `json:"payload_length,omitempty"` +} diff --git a/pkg/wyoming/backchannel.go b/pkg/wyoming/backchannel.go new file mode 100644 index 00000000..e0569fe1 --- /dev/null +++ b/pkg/wyoming/backchannel.go @@ -0,0 +1,63 @@ +package wyoming + +import ( + "fmt" + "net" + "time" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/pion/rtp" +) + +type Backchannel struct { + core.Connection + api *API +} + +func newBackchannel(conn net.Conn) *Backchannel { + return &Backchannel{ + core.Connection{ + ID: core.NewID(), + FormatName: "wyoming", + Medias: []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: []*core.Codec{ + {Name: core.CodecPCML, ClockRate: 22050}, + }, + }, + }, + Transport: conn, + }, + NewAPI(conn), + } +} + +func (b *Backchannel) GetTrack(media *core.Media, codec *core.Codec) (*core.Receiver, error) { + return nil, core.ErrCantGetTrack +} + +func (b *Backchannel) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + sender := core.NewSender(media, codec) + sender.Handler = func(pkt *rtp.Packet) { + ts := time.Now().Nanosecond() + evt := &Event{ + Type: "audio-chunk", + Data: fmt.Sprintf(`{"rate":22050,"width":2,"channels":1,"timestamp":%d}`, ts), + Payload: pkt.Payload, + } + _ = b.api.WriteEvent(evt) + } + sender.HandleRTP(track) + b.Senders = append(b.Senders, sender) + return nil +} + +func (b *Backchannel) Start() error { + for { + if _, err := b.api.ReadEvent(); err != nil { + return err + } + } +} diff --git a/pkg/wyoming/expr.go b/pkg/wyoming/expr.go new file mode 100644 index 00000000..f2f58933 --- /dev/null +++ b/pkg/wyoming/expr.go @@ -0,0 +1,138 @@ +package wyoming + +import ( + "bytes" + "fmt" + "os" + "time" + + "github.com/AlexxIT/go2rtc/pkg/expr" + "github.com/AlexxIT/go2rtc/pkg/wav" +) + +type env struct { + *satellite + Type string + Data string +} + +func (s *satellite) handleEvent(evt *Event) { + switch evt.Type { + case "describe": + // {"asr": [], "tts": [], "handle": [], "intent": [], "wake": [], "satellite": {"name": "my satellite", "attribution": {"name": "", "url": ""}, "installed": true, "description": "my satellite", "version": "1.4.1", "area": null, "snd_format": null}} + data := fmt.Sprintf(`{"satellite":{"name":%q,"attribution":{"name":"go2rtc","url":"https://github.com/AlexxIT/go2rtc"},"installed":true}}`, s.srv.Name) + s.WriteEvent("info", data) + case "run-satellite": + s.Detect() + case "pause-satellite": + s.Stop() + case "detect": // WAKE_WORD_START {"names": null} + case "detection": // WAKE_WORD_END {"name": "ok_nabu_v0.1", "timestamp": 17580, "speaker": null} + case "transcribe": // STT_START {"language": "en"} + case "voice-started": // STT_VAD_START {"timestamp": 1160} + case "voice-stopped": // STT_VAD_END {"timestamp": 2470} + s.Pause() + case "transcript": // STT_END {"text": "how are you"} + case "synthesize": // TTS_START {"text": "Sorry, I couldn't understand that", "voice": {"language": "en"}} + case "audio-start": // TTS_END {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0} + case "audio-chunk": // {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0} + case "audio-stop": // {"timestamp": 2.880000000000002} + // run async because PlayAudio takes some time + go func() { + s.PlayAudio() + s.WriteEvent("played") + s.Detect() + }() + case "error": + s.Detect() + case "internal-run": + s.WriteEvent("run-pipeline", `{"start_stage":"wake","end_stage":"tts"}`) + s.Stream() + case "internal-detection": + s.WriteEvent("run-pipeline", `{"start_stage":"asr","end_stage":"tts"}`) + s.Stream() + } +} + +func (s *satellite) handleScript(evt *Event) { + var script string + if s.srv.Event != nil { + script = s.srv.Event[evt.Type] + } + + s.srv.Trace("event=%s data=%s payload size=%d", evt.Type, evt.Data, len(evt.Payload)) + + if script == "" { + s.handleEvent(evt) + return + } + + // run async because script can have sleeps + go func() { + e := &env{satellite: s, Type: evt.Type, Data: evt.Data} + if res, err := expr.Eval(script, e); err != nil { + s.srv.Trace("event=%s expr error=%s", evt.Type, err) + s.handleEvent(evt) + } else { + s.srv.Trace("event=%s expr result=%v", evt.Type, res) + } + }() +} + +func (s *satellite) Detect() bool { + return s.setMicState(stateWaitVAD) +} + +func (s *satellite) Stream() bool { + return s.setMicState(stateActive) +} + +func (s *satellite) Pause() bool { + return s.setMicState(stateIdle) +} + +func (s *satellite) Stop() bool { + s.micStop() + return true +} + +func (s *satellite) WriteEvent(args ...string) bool { + if len(args) == 0 { + return false + } + evt := &Event{Type: args[0]} + if len(args) > 1 { + evt.Data = args[1] + } + if err := s.api.WriteEvent(evt); err != nil { + return false + } + return true +} + +func (s *satellite) PlayAudio() bool { + return s.playAudio(sndCodec, bytes.NewReader(s.sndAudio)) +} + +func (s *satellite) PlayFile(path string) bool { + f, err := os.Open(path) + if err != nil { + return false + } + + codec, err := wav.ReadHeader(f) + if err != nil { + return false + } + + return s.playAudio(codec, f) +} + +func (e *env) Sleep(s string) bool { + d, err := time.ParseDuration(s) + if err != nil { + return false + } + time.Sleep(d) + return true +} diff --git a/pkg/wyoming/mic.go b/pkg/wyoming/mic.go new file mode 100644 index 00000000..4fb03b44 --- /dev/null +++ b/pkg/wyoming/mic.go @@ -0,0 +1,35 @@ +package wyoming + +import ( + "fmt" + "net" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func (s *Server) HandleMic(conn net.Conn) { + defer conn.Close() + + var closed core.Waiter + var timestamp int + + api := NewAPI(conn) + mic := newMicConsumer(func(chunk []byte) { + data := fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, timestamp) + evt := &Event{Type: "audio-chunk", Data: data, Payload: chunk} + if err := api.WriteEvent(evt); err != nil { + closed.Done(nil) + } + + timestamp += len(chunk) / 2 + }) + mic.RemoteAddr = api.conn.RemoteAddr().String() + + if err := s.MicHandler(mic); err != nil { + s.Error("mic error: %s", err) + return + } + + _ = closed.Wait() + _ = mic.Stop() +} diff --git a/pkg/wyoming/producer.go b/pkg/wyoming/producer.go new file mode 100644 index 00000000..09451333 --- /dev/null +++ b/pkg/wyoming/producer.go @@ -0,0 +1,65 @@ +package wyoming + +import ( + "net" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/pion/rtp" +) + +type Producer struct { + core.Connection + api *API +} + +func newProducer(conn net.Conn) *Producer { + return &Producer{ + core.Connection{ + ID: core.NewID(), + FormatName: "wyoming", + Medias: []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionRecvonly, + Codecs: []*core.Codec{ + {Name: core.CodecPCML, ClockRate: 16000}, + }, + }, + }, + Transport: conn, + }, + NewAPI(conn), + } +} + +func (p *Producer) Start() error { + var seq uint16 + var ts uint32 + + for { + evt, err := p.api.ReadEvent() + if err != nil { + return err + } + + if evt.Type != "audio-chunk" { + continue + } + + p.Recv += len(evt.Payload) + + pkt := &core.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + SequenceNumber: seq, + Timestamp: ts, + }, + Payload: evt.Payload, + } + p.Receivers[0].WriteRTP(pkt) + + seq++ + ts += uint32(len(evt.Payload) / 2) + } +} diff --git a/pkg/wyoming/satellite.go b/pkg/wyoming/satellite.go new file mode 100644 index 00000000..0c0e6f30 --- /dev/null +++ b/pkg/wyoming/satellite.go @@ -0,0 +1,275 @@ +package wyoming + +import ( + "context" + "fmt" + "io" + "net" + "sync" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/pcm" + "github.com/AlexxIT/go2rtc/pkg/pcm/s16le" + "github.com/pion/rtp" +) + +type Server struct { + Name string + Event map[string]string + + VADThreshold int16 + WakeURI string + + MicHandler func(cons core.Consumer) error + SndHandler func(prod core.Producer) error + + Trace func(format string, v ...any) + Error func(format string, v ...any) +} + +func (s *Server) Serve(l net.Listener) error { + for { + conn, err := l.Accept() + if err != nil { + return err + } + + go s.Handle(conn) + } +} + +func (s *Server) Handle(conn net.Conn) { + api := NewAPI(conn) + sat := newSatellite(api, s) + defer sat.Close() + + for { + evt, err := api.ReadEvent() + if err != nil { + return + } + + switch evt.Type { + case "ping": // {"text": null} + _ = api.WriteEvent(&Event{Type: "pong", Data: evt.Data}) + case "audio-start": // TTS_END {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0} + sat.sndAudio = sat.sndAudio[:0] + case "audio-chunk": // {"rate": 22050, "width": 2, "channels": 1, "timestamp": 0} + sat.sndAudio = append(sat.sndAudio, evt.Payload...) + default: + sat.handleScript(evt) + } + } +} + +// states like http.ConnState +const ( + stateError = -2 + stateClosed = -1 + stateNew = 0 + stateIdle = 1 + stateWaitVAD = 2 // aka wait VAD + stateWaitWakeWord = 3 + stateActive = 4 +) + +type satellite struct { + api *API + srv *Server + + micState int8 + micTS int + micMu sync.Mutex + sndAudio []byte + + mic *micConsumer + wake *WakeWord +} + +func newSatellite(api *API, srv *Server) *satellite { + sat := &satellite{api: api, srv: srv} + return sat +} + +func (s *satellite) Close() error { + s.Stop() + return s.api.Close() +} + +const wakeTimeout = 5 * 2 * 16000 // 5 seconds + +func (s *satellite) setMicState(state int8) bool { + s.micMu.Lock() + defer s.micMu.Unlock() + + if s.micState == stateNew { + s.mic = newMicConsumer(s.onMicChunk) + s.mic.RemoteAddr = s.api.conn.RemoteAddr().String() + if err := s.srv.MicHandler(s.mic); err != nil { + s.micState = stateError + s.srv.Error("can't get mic: %w", err) + _ = s.api.Close() + } else { + s.micState = stateIdle + } + } + + if s.micState < stateIdle { + return false + } + + s.micState = state + s.micTS = 0 + return true +} + +func (s *satellite) micStop() { + s.micMu.Lock() + + s.micState = stateClosed + if s.mic != nil { + _ = s.mic.Stop() + s.mic = nil + } + if s.wake != nil { + _ = s.wake.Close() + s.wake = nil + } + + s.micMu.Unlock() +} + +func (s *satellite) onMicChunk(chunk []byte) { + s.micMu.Lock() + defer s.micMu.Unlock() + + if s.micState == stateIdle { + return + } + + if s.micState == stateWaitVAD { + // tests show that values over 1000 are most likely speech + if s.srv.VADThreshold == 0 || s16le.PeaksRMS(chunk) > s.srv.VADThreshold { + if s.wake == nil && s.srv.WakeURI != "" { + s.wake, _ = DialWakeWord(s.srv.WakeURI) + } + if s.wake == nil { + // some problems with wake word - redirect to HA + s.micState = stateIdle + go s.handleScript(&Event{Type: "internal-run"}) + } else { + s.micState = stateWaitWakeWord + } + s.micTS = 0 + } + } + + if s.micState == stateWaitWakeWord { + if s.wake.Detection != "" { + // check if wake word detected + s.micState = stateIdle + go s.handleScript(&Event{Type: "internal-detection", Data: `{"name":"` + s.wake.Detection + `"}`}) + } else if err := s.wake.WriteChunk(chunk); err != nil { + // wake word service failed + s.micState = stateWaitVAD + _ = s.wake.Close() + s.wake = nil + } else if s.micTS > wakeTimeout { + // wake word detection timeout + s.micState = stateWaitVAD + } + } else if s.wake != nil { + _ = s.wake.Close() + s.wake = nil + } + + if s.micState == stateActive { + data := fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, s.micTS) + evt := &Event{Type: "audio-chunk", Data: data, Payload: chunk} + _ = s.api.WriteEvent(evt) + } + + s.micTS += len(chunk) / 2 +} + +func (s *satellite) playAudio(codec *core.Codec, rd io.Reader) bool { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + prod := pcm.OpenSync(codec, rd) + prod.OnClose(cancel) + + if err := s.srv.SndHandler(prod); err != nil { + return false + } else { + <-ctx.Done() + return true + } +} + +type micConsumer struct { + core.Connection + onData func(chunk []byte) + onClose func() +} + +func newMicConsumer(onData func(chunk []byte)) *micConsumer { + medias := []*core.Media{ + { + Kind: core.KindAudio, + Direction: core.DirectionSendonly, + Codecs: pcm.ConsumerCodecs(), + }, + } + + return &micConsumer{ + Connection: core.Connection{ + ID: core.NewID(), + FormatName: "wyoming", + Protocol: "tcp", + Medias: medias, + }, + onData: onData, + } +} + +func (c *micConsumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error { + src := track.Codec + dst := &core.Codec{ + Name: core.CodecPCML, + ClockRate: 16000, + Channels: 1, + } + sender := core.NewSender(media, dst) + sender.Handler = pcm.TranscodeHandler(dst, src, + repack(func(packet *core.Packet) { + c.onData(packet.Payload) + }), + ) + sender.HandleRTP(track) + c.Senders = append(c.Senders, sender) + return nil +} + +func (c *micConsumer) Stop() error { + if c.onClose != nil { + c.onClose() + } + return c.Connection.Stop() +} + +func repack(handler core.HandlerFunc) core.HandlerFunc { + const PacketSize = 2 * 16000 / 50 // 20ms + + var buf []byte + + return func(pkt *rtp.Packet) { + buf = append(buf, pkt.Payload...) + + for len(buf) >= PacketSize { + pkt = &core.Packet{Payload: buf[:PacketSize]} + buf = buf[PacketSize:] + handler(pkt) + } + } +} diff --git a/pkg/wyoming/snd.go b/pkg/wyoming/snd.go new file mode 100644 index 00000000..e26ca7ea --- /dev/null +++ b/pkg/wyoming/snd.go @@ -0,0 +1,40 @@ +package wyoming + +import ( + "bytes" + "net" + + "github.com/AlexxIT/go2rtc/pkg/core" + "github.com/AlexxIT/go2rtc/pkg/pcm" +) + +func (s *Server) HandleSnd(conn net.Conn) { + defer conn.Close() + + var snd []byte + + api := NewAPI(conn) + for { + evt, err := api.ReadEvent() + if err != nil { + return + } + + s.Trace("event: %s data: %s payload: %d", evt.Type, evt.Data, len(evt.Payload)) + + switch evt.Type { + case "audio-start": + snd = snd[:0] + case "audio-chunk": + snd = append(snd, evt.Payload...) + case "audio-stop": + prod := pcm.OpenSync(sndCodec, bytes.NewReader(snd)) + if err = s.SndHandler(prod); err != nil { + s.Error("snd error: %s", err) + return + } + } + } +} + +var sndCodec = &core.Codec{Name: core.CodecPCML, ClockRate: 22050} diff --git a/pkg/wyoming/wakeword.go b/pkg/wyoming/wakeword.go new file mode 100644 index 00000000..4c728f20 --- /dev/null +++ b/pkg/wyoming/wakeword.go @@ -0,0 +1,120 @@ +package wyoming + +import ( + "encoding/json" + "fmt" + "net/url" +) + +type WakeWord struct { + *API + names []string + send int + + Detection string +} + +func DialWakeWord(rawURL string) (*WakeWord, error) { + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + + api, err := DialAPI(u.Host) + if err != nil { + return nil, err + } + + names := u.Query()["name"] + if len(names) == 0 { + names = []string{"ok_nabu_v0.1"} + } + + wake := &WakeWord{API: api, names: names} + if err = wake.Start(); err != nil { + _ = wake.Close() + return nil, err + } + + go wake.handle() + return wake, nil +} + +func (w *WakeWord) handle() { + defer w.Close() + + for { + evt, err := w.ReadEvent() + if err != nil { + return + } + + if evt.Type == "detection" { + var data struct { + Name string `json:"name"` + } + if err = json.Unmarshal([]byte(evt.Data), &data); err != nil { + return + } + w.Detection = data.Name + } + } +} + +//func (w *WakeWord) Describe() error { +// if err := w.WriteEvent(&Event{Type: "describe"}); err != nil { +// return err +// } +// +// evt, err := w.ReadEvent() +// if err != nil { +// return err +// } +// +// var info struct { +// Wake []struct { +// Models []struct { +// Name string `json:"name"` +// } `json:"models"` +// } `json:"wake"` +// } +// if err = json.Unmarshal(evt.Data, &info); err != nil { +// return err +// } +// +// return nil +//} + +func (w *WakeWord) Start() error { + msg := struct { + Names []string `json:"names"` + }{ + Names: w.names, + } + data, err := json.Marshal(msg) + if err != nil { + return err + } + evt := &Event{Type: "detect", Data: string(data)} + if err := w.WriteEvent(evt); err != nil { + return err + } + + evt = &Event{Type: "audio-start", Data: audioData(0)} + return w.WriteEvent(evt) +} + +func (w *WakeWord) Close() error { + return w.conn.Close() +} + +func (w *WakeWord) WriteChunk(payload []byte) error { + evt := &Event{Type: "audio-chunk", Data: audioData(w.send), Payload: payload} + w.send += len(payload) + return w.WriteEvent(evt) +} + +func audioData(send int) string { + // timestamp in ms = send / 2 * 1000 / 16000 = send / 32 + return fmt.Sprintf(`{"rate":16000,"width":2,"channels":1,"timestamp":%d}`, send/32) +} diff --git a/pkg/wyoming/wyoming.go b/pkg/wyoming/wyoming.go new file mode 100644 index 00000000..0c8eebae --- /dev/null +++ b/pkg/wyoming/wyoming.go @@ -0,0 +1,26 @@ +package wyoming + +import ( + "net" + "net/url" + + "github.com/AlexxIT/go2rtc/pkg/core" +) + +func Dial(rawURL string) (core.Producer, error) { + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + + conn, err := net.DialTimeout("tcp", u.Host, core.ConnDialTimeout) + if err != nil { + return nil, err + } + + if u.Query().Get("backchannel") != "1" { + return newProducer(conn), nil + } else { + return newBackchannel(conn), nil + } +}