diff --git a/internal/desktop/pipeline.go b/internal/desktop/pipeline.go index 97d24e4..a50448b 100644 --- a/internal/desktop/pipeline.go +++ b/internal/desktop/pipeline.go @@ -178,9 +178,10 @@ func (pm *PipelineManager) buildGStreamer(cfg PipelineConfig) (string, []string, pipelineParts = append(pipelineParts, fmt.Sprintf("ximagesrc display-name=%s", cfg.Display)) pipelineParts = append(pipelineParts, "video/x-raw,framerate=30/1") pipelineParts = append(pipelineParts, "videoconvert") + pipelineParts = append(pipelineParts, "video/x-raw,format=I420") // 2. Encoder - encoderStr := "x264enc speed-preset=ultrafast tune=zerolatency bitrate=" + fmt.Sprintf("%d", cfg.Bitrate) + encoderStr := "x264enc speed-preset=ultrafast tune=zerolatency bitrate=" + fmt.Sprintf("%d", cfg.Bitrate) + " byte-stream=true key-int-max=30" if cfg.UseGPU { // Detect if nvcodec is present by calling gst-inspect-1.0 or simply attempting it. // We'll default to nvh264enc. @@ -189,6 +190,7 @@ func (pm *PipelineManager) buildGStreamer(cfg PipelineConfig) (string, []string, pipelineParts = append(pipelineParts, encoderStr) // 3. Payload and Sink + pipelineParts = append(pipelineParts, "video/x-h264,profile=baseline") pipelineParts = append(pipelineParts, "rtph264pay config-interval=1 pt=96") pipelineParts = append(pipelineParts, fmt.Sprintf("udpsink host=127.0.0.1 port=%d sync=false async=false", cfg.Port)) @@ -224,12 +226,16 @@ func (pm *PipelineManager) buildFFmpeg(cfg PipelineConfig) (string, []string, er "-preset", "llhp", // low latency high quality "-tune", "zerolatency", "-g", "30", + "-pix_fmt", "yuv420p", ) } else { args = append(args, "-c:v", "libx264", "-preset", "ultrafast", "-tune", "zerolatency", + "-profile:v", "baseline", + "-pix_fmt", "yuv420p", + "-x264-params", "repeat-headers=1", "-g", "30", ) } diff --git a/internal/desktop/pipeline_test.go b/internal/desktop/pipeline_test.go new file mode 100644 index 0000000..410d633 --- /dev/null +++ b/internal/desktop/pipeline_test.go @@ -0,0 +1,71 @@ +package desktop + +import ( + "strings" + "testing" +) + +func TestBuildGStreamerUsesBrowserFriendlyH264(t *testing.T) { + pm := NewPipelineManager() + + tool, args, err := pm.buildGStreamer(PipelineConfig{ + Display: ":11.0", + Port: 5004, + Bitrate: 2000, + }) + if err != nil { + t.Fatalf("buildGStreamer returned error: %v", err) + } + + if tool != "gst-launch-1.0" { + t.Fatalf("expected gst-launch-1.0, got %q", tool) + } + joined := strings.Join(args, " ") + for _, expected := range []string{ + "video/x-raw,format=I420", + "x264enc", + "tune=zerolatency", + "key-int-max=30", + "byte-stream=true", + "video/x-h264,profile=baseline", + "rtph264pay", + "config-interval=1", + } { + if !strings.Contains(joined, expected) { + t.Fatalf("expected GStreamer args to contain %q, got %s", expected, joined) + } + } +} + +func TestBuildFFmpegUsesBrowserFriendlyH264(t *testing.T) { + pm := NewPipelineManager() + + tool, args, err := pm.buildFFmpeg(PipelineConfig{ + Display: ":11.0", + Port: 5004, + Bitrate: 2000, + FPS: 30, + Width: 1280, + Height: 720, + }) + if err != nil { + t.Fatalf("buildFFmpeg returned error: %v", err) + } + + if tool != "ffmpeg" { + t.Fatalf("expected ffmpeg, got %q", tool) + } + joined := strings.Join(args, " ") + for _, expected := range []string{ + "-c:v libx264", + "-tune zerolatency", + "-profile:v baseline", + "-pix_fmt yuv420p", + "-x264-params repeat-headers=1", + "-g 30", + } { + if !strings.Contains(joined, expected) { + t.Fatalf("expected FFmpeg args to contain %q, got %s", expected, joined) + } + } +} diff --git a/internal/desktop/webrtc.go b/internal/desktop/webrtc.go index d3b3ac3..9ad8909 100644 --- a/internal/desktop/webrtc.go +++ b/internal/desktop/webrtc.go @@ -5,7 +5,10 @@ import ( "fmt" "log" "net" + "os" "sync" + "sync/atomic" + "time" "github.com/pion/webrtc/v4" ) @@ -17,6 +20,19 @@ type WebRTCServer struct { inputInjector *XdotoolInjector mu sync.Mutex isClosed bool + rtpPackets uint64 + rtpBytes uint64 + rtpWriteErrors uint64 +} + +func desktopWebRTCDiagnosticsEnabled() bool { + value := os.Getenv("XWORKMATE_DESKTOP_WEBRTC_DEBUG") + switch value { + case "0", "false", "FALSE", "off", "OFF", "no", "NO": + return false + default: + return true + } } func NewWebRTCServer(injector *XdotoolInjector) (*WebRTCServer, error) { @@ -107,6 +123,36 @@ func (w *WebRTCServer) StartRTPReceiver(port int) error { go func() { buf := make([]byte, 2048) log.Printf("WebRTC RTP receiver listening on UDP %s", addr) + statsDone := make(chan struct{}) + if desktopWebRTCDiagnosticsEnabled() { + statsTicker := time.NewTicker(5 * time.Second) + defer statsTicker.Stop() + defer close(statsDone) + go func() { + var lastPackets uint64 + var lastBytes uint64 + for { + select { + case <-statsTicker.C: + packets := atomic.LoadUint64(&w.rtpPackets) + bytes := atomic.LoadUint64(&w.rtpBytes) + errors := atomic.LoadUint64(&w.rtpWriteErrors) + log.Printf( + "WebRTC RTP stats: packets=%d bytes=%d packetDelta=%d byteDelta=%d writeErrors=%d", + packets, + bytes, + packets-lastPackets, + bytes-lastBytes, + errors, + ) + lastPackets = packets + lastBytes = bytes + case <-statsDone: + return + } + } + }() + } for { n, _, err := conn.ReadFrom(buf) if err != nil { @@ -118,6 +164,8 @@ func (w *WebRTCServer) StartRTPReceiver(port int) error { } break } + atomic.AddUint64(&w.rtpPackets, 1) + atomic.AddUint64(&w.rtpBytes, uint64(n)) // Forward packet directly to WebRTC track (zero-copy) w.mu.Lock() @@ -126,6 +174,7 @@ func (w *WebRTCServer) StartRTPReceiver(port int) error { if track != nil { if _, err := track.Write(buf[:n]); err != nil { + atomic.AddUint64(&w.rtpWriteErrors, 1) log.Printf("Failed to write RTP packet to track: %v", err) } } @@ -214,4 +263,10 @@ func (w *WebRTCServer) Close() { if pc != nil { _ = pc.Close() } + log.Printf( + "WebRTC RTP final stats: packets=%d bytes=%d writeErrors=%d", + atomic.LoadUint64(&w.rtpPackets), + atomic.LoadUint64(&w.rtpBytes), + atomic.LoadUint64(&w.rtpWriteErrors), + ) }