fix: stabilize desktop WebRTC H264 stream

This commit is contained in:
Haitao Pan 2026-06-07 22:25:42 +08:00
parent 72bc3c0319
commit 0f096a588d
3 changed files with 133 additions and 1 deletions

View File

@ -178,9 +178,10 @@ func (pm *PipelineManager) buildGStreamer(cfg PipelineConfig) (string, []string,
pipelineParts = append(pipelineParts, fmt.Sprintf("ximagesrc display-name=%s", cfg.Display))
pipelineParts = append(pipelineParts, "video/x-raw,framerate=30/1")
pipelineParts = append(pipelineParts, "videoconvert")
pipelineParts = append(pipelineParts, "video/x-raw,format=I420")
// 2. Encoder
encoderStr := "x264enc speed-preset=ultrafast tune=zerolatency bitrate=" + fmt.Sprintf("%d", cfg.Bitrate)
encoderStr := "x264enc speed-preset=ultrafast tune=zerolatency bitrate=" + fmt.Sprintf("%d", cfg.Bitrate) + " byte-stream=true key-int-max=30"
if cfg.UseGPU {
// Detect if nvcodec is present by calling gst-inspect-1.0 or simply attempting it.
// We'll default to nvh264enc.
@ -189,6 +190,7 @@ func (pm *PipelineManager) buildGStreamer(cfg PipelineConfig) (string, []string,
pipelineParts = append(pipelineParts, encoderStr)
// 3. Payload and Sink
pipelineParts = append(pipelineParts, "video/x-h264,profile=baseline")
pipelineParts = append(pipelineParts, "rtph264pay config-interval=1 pt=96")
pipelineParts = append(pipelineParts, fmt.Sprintf("udpsink host=127.0.0.1 port=%d sync=false async=false", cfg.Port))
@ -224,12 +226,16 @@ func (pm *PipelineManager) buildFFmpeg(cfg PipelineConfig) (string, []string, er
"-preset", "llhp", // low latency high quality
"-tune", "zerolatency",
"-g", "30",
"-pix_fmt", "yuv420p",
)
} else {
args = append(args,
"-c:v", "libx264",
"-preset", "ultrafast",
"-tune", "zerolatency",
"-profile:v", "baseline",
"-pix_fmt", "yuv420p",
"-x264-params", "repeat-headers=1",
"-g", "30",
)
}

View File

@ -0,0 +1,71 @@
package desktop
import (
"strings"
"testing"
)
func TestBuildGStreamerUsesBrowserFriendlyH264(t *testing.T) {
pm := NewPipelineManager()
tool, args, err := pm.buildGStreamer(PipelineConfig{
Display: ":11.0",
Port: 5004,
Bitrate: 2000,
})
if err != nil {
t.Fatalf("buildGStreamer returned error: %v", err)
}
if tool != "gst-launch-1.0" {
t.Fatalf("expected gst-launch-1.0, got %q", tool)
}
joined := strings.Join(args, " ")
for _, expected := range []string{
"video/x-raw,format=I420",
"x264enc",
"tune=zerolatency",
"key-int-max=30",
"byte-stream=true",
"video/x-h264,profile=baseline",
"rtph264pay",
"config-interval=1",
} {
if !strings.Contains(joined, expected) {
t.Fatalf("expected GStreamer args to contain %q, got %s", expected, joined)
}
}
}
func TestBuildFFmpegUsesBrowserFriendlyH264(t *testing.T) {
pm := NewPipelineManager()
tool, args, err := pm.buildFFmpeg(PipelineConfig{
Display: ":11.0",
Port: 5004,
Bitrate: 2000,
FPS: 30,
Width: 1280,
Height: 720,
})
if err != nil {
t.Fatalf("buildFFmpeg returned error: %v", err)
}
if tool != "ffmpeg" {
t.Fatalf("expected ffmpeg, got %q", tool)
}
joined := strings.Join(args, " ")
for _, expected := range []string{
"-c:v libx264",
"-tune zerolatency",
"-profile:v baseline",
"-pix_fmt yuv420p",
"-x264-params repeat-headers=1",
"-g 30",
} {
if !strings.Contains(joined, expected) {
t.Fatalf("expected FFmpeg args to contain %q, got %s", expected, joined)
}
}
}

View File

@ -5,7 +5,10 @@ import (
"fmt"
"log"
"net"
"os"
"sync"
"sync/atomic"
"time"
"github.com/pion/webrtc/v4"
)
@ -17,6 +20,19 @@ type WebRTCServer struct {
inputInjector *XdotoolInjector
mu sync.Mutex
isClosed bool
rtpPackets uint64
rtpBytes uint64
rtpWriteErrors uint64
}
func desktopWebRTCDiagnosticsEnabled() bool {
value := os.Getenv("XWORKMATE_DESKTOP_WEBRTC_DEBUG")
switch value {
case "0", "false", "FALSE", "off", "OFF", "no", "NO":
return false
default:
return true
}
}
func NewWebRTCServer(injector *XdotoolInjector) (*WebRTCServer, error) {
@ -107,6 +123,36 @@ func (w *WebRTCServer) StartRTPReceiver(port int) error {
go func() {
buf := make([]byte, 2048)
log.Printf("WebRTC RTP receiver listening on UDP %s", addr)
statsDone := make(chan struct{})
if desktopWebRTCDiagnosticsEnabled() {
statsTicker := time.NewTicker(5 * time.Second)
defer statsTicker.Stop()
defer close(statsDone)
go func() {
var lastPackets uint64
var lastBytes uint64
for {
select {
case <-statsTicker.C:
packets := atomic.LoadUint64(&w.rtpPackets)
bytes := atomic.LoadUint64(&w.rtpBytes)
errors := atomic.LoadUint64(&w.rtpWriteErrors)
log.Printf(
"WebRTC RTP stats: packets=%d bytes=%d packetDelta=%d byteDelta=%d writeErrors=%d",
packets,
bytes,
packets-lastPackets,
bytes-lastBytes,
errors,
)
lastPackets = packets
lastBytes = bytes
case <-statsDone:
return
}
}
}()
}
for {
n, _, err := conn.ReadFrom(buf)
if err != nil {
@ -118,6 +164,8 @@ func (w *WebRTCServer) StartRTPReceiver(port int) error {
}
break
}
atomic.AddUint64(&w.rtpPackets, 1)
atomic.AddUint64(&w.rtpBytes, uint64(n))
// Forward packet directly to WebRTC track (zero-copy)
w.mu.Lock()
@ -126,6 +174,7 @@ func (w *WebRTCServer) StartRTPReceiver(port int) error {
if track != nil {
if _, err := track.Write(buf[:n]); err != nil {
atomic.AddUint64(&w.rtpWriteErrors, 1)
log.Printf("Failed to write RTP packet to track: %v", err)
}
}
@ -214,4 +263,10 @@ func (w *WebRTCServer) Close() {
if pc != nil {
_ = pc.Close()
}
log.Printf(
"WebRTC RTP final stats: packets=%d bytes=%d writeErrors=%d",
atomic.LoadUint64(&w.rtpPackets),
atomic.LoadUint64(&w.rtpBytes),
atomic.LoadUint64(&w.rtpWriteErrors),
)
}