核心模块展开
Data Aggregation 模块深度解析
一、模块定位与核心功能
1. 系统角色
Data Aggregation(数据聚合)模块是Fugu系统的”数据中枢”,负责实时采集、清洗和整合视频流传输全链路的关键数据。如图6所示,它作为系统基础组件,为上层预测和控制模块提供数据支撑:
2. 核心职责
- 实时数据采集:从TCP/IP协议栈到应用层的全栈监控
- 数据预处理:异常检测、时间对齐、特征编码
- 数据分发:同时支持实时流处理和离线分析
- 质量控制:确保数据完整性、一致性和时效性
二、信号聚合输入(采集的信号类型)
1. 网络传输层信号
| 信号类型 | 数据来源 | 采集频率 | 用途 |
|---|---|---|---|
| TCP RTT | Linux内核tcp_info |
每ACK包 | 网络延迟评估 |
| 拥塞窗口 | tcpi_snd_cwnd |
每数据包 | 发送容量监控 |
| 未确认数据 | tcpi_unacked |
每ACK包 | 网络负载评估 |
| 传输速率 | tcpi_delivery_rate |
每RTT | 吞吐量估计 |
| 丢包重传 | tcpi_lost |
每RTO | 网络健康度 |
2. 视频应用层信号
| 信号类型 | 采集点 | 示例值 | 用途 |
|---|---|---|---|
| 视频块大小 | 编码器输出 | 1.8MB | 传输需求评估 |
| SSIM质量 | 编码质量分析 | 0.92 | 用户体验量化 |
| 编码参数 | 编码配置 | CRF=23 | 质量基准 |
| 块传输时间 | 发送-ACK时差 | 1.2s | 性能分析 |
3. 客户端状态信号
| 信号类型 | 采集方式 | 精度 | 用途 |
|---|---|---|---|
| 缓冲区水平 | JS定时上报 | 0.1s | 卡顿风险预警 |
| 卡顿事件 | 播放器事件 | ms级 | QoE评估 |
| 用户操作 | 交互监听 | - | 行为分析 |
三、输出信号(处理后的数据)
1. 实时特征流
message RealTimeFeatures {
// 网络特征
float smoothed_rtt = 1; // 平滑RTT(EWMA)
float bandwidth_est = 2; // 带宽估计(Mbps)
int32 packet_loss_rate = 3; // 丢包率(‰)
// 视频特征
float chunk_size_avg = 4; // 滑动窗口平均块大小
float ssim_variance = 5; // 质量波动
// 客户端状态
float buffer_level = 6; // 当前缓冲区(秒)
bool is_stalling = 7; // 是否正在卡顿
}
2. 聚合统计数据
| 指标类型 | 计算方式 | 输出频率 | 用途 |
|---|---|---|---|
| 会话QoE | SSIM - 4×卡顿时间 | 会话结束 | 整体评估 |
| 网络质量 | 吞吐量/RTT比值 | 每分钟 | 路径评分 |
| 编码效率 | 大小/SSIM比 | 每10块 | 参数优化 |
四、模块间接口规范
1. 与TTP预测器的接口
数据流向:DataAgg → TTP
传输协议:gRPC流
消息格式:
message TTPInput {
repeated float last_5_rtts = 1; // 最近5个RTT
float current_cwnd = 2; // 当前拥塞窗口
float chunk_size = 3; // 待预测块大小
float buffer_level = 4; // 当前缓冲区
}
QPS限制:≤1000请求/秒
超时设置:200ms
2. 与MPC控制器的接口
数据流向:DataAgg → MPC
传输协议:ZeroMQ PUB/SUB
消息格式:
{
"session_id": "abcd1234",
"timestamp": 1630000000.123,
"network": {
"throughput": 5.2, // Mbps
"rtt": 0.045 // seconds
},
"video": {
"last_ssim": 0.92,
"last_size": 1854321 // bytes
}
}
传输频率:每视频块(约2秒)
数据延迟:<50ms(P99)
3. 与离线存储的接口
数据流向:DataAgg → HBase
存储格式:
CREATE TABLE raw_metrics (
rowkey STRING PRIMARY KEY, # session_id|timestamp
net_stats BINARY, # Protobuf序列化
video_metrics BINARY,
client_metrics BINARY
) COMPRESSION='SNAPPY'
写入批次:每100条或1秒
数据保留:30天滚动
五、关键数据流图示
[TCP Kernel] → [Raw Metrics] → [实时清洗] → [特征提取]
↑____________↓ ↓
[Player Client] [TTP/MPC]
↓
[离线存储]
六、设计思考总结
1. 信号选择原则
- 必要性:只采集影响QoE的关键指标
- 可观测性:优先选择内核直接暴露的指标
- 成本效益:平衡数据价值与采集开销
2. 接口设计哲学
- 松耦合:模块间仅通过定义良好的接口交互
- 可扩展:字段采用protobuf可选设计
- 容错性:所有接口具备降级方案
3. 演进方向
- 智能采样:动态调整采集频率
- 边缘计算:在客户端预聚合
- 联邦聚合:跨会话隐私保护汇总
该模块通过精心选择的信号聚合和高效的接口设计,为Fugu系统提供了高质量的数据基础,平衡了实时性、准确性和系统开销。
Puffer Video Server 模块
一、核心输入输出
输入信号(来自两个模块):
- 来自MPC控制器:当前应发送的视频块规格(码率/分辨率)优先级标记(如紧急补发)特殊指令(如插入关键帧)
- 来自DataAgg:实时网络状态(当前RTT、拥塞窗口等)客户端缓冲区状态历史传输性能数据
输出信号(到两个模块):
- 反馈给DataAgg:实际发送的视频块信息(大小/码率)精确传输耗时(从发送到ACK)TCP层状态快照(发送时的cwnd/rtt)
- 服务客户端:视频数据块(HTTP分片或WebSocket流)播放清单(如HLS的m3u8文件)控制指令(如码率切换通知)
二、模块连接图示
[MPC控制器] → 指令 → [Puffer Server] → 视频流 → [客户端]
↑_________↓
[DataAgg] ←───┘
传输报告
三、典型数据示例
1. 从MPC接收的指令
{
"action": "send_chunk",
"chunk_id": 1002,
"bitrate": 2500,
"resolution": "1080p",
"is_keyframe": false,
"priority": "normal"
}
2. 向DataAgg发送的报告
{
"timestamp": 1630000000.123,
"chunk_id": 1002,
"size_bytes": 1854321,
"transmission_ms": 1200,
"tcp_state": {
"cwnd": 10240,
"rtt": 45
}
}
四、关键特性总结
- 简单职责:听MPC的话(执行码率决策)向DataAgg说实话(如实反馈传输情况)给客户端喂饭(稳定发送视频数据)
- 不负责:不做决策(MPC负责)不存历史(DataAgg负责)不处理异常(由控制环处理)
- 核心价值:精准执行控制指令提供低延迟传输保证数据完整性
TTP (Transmission Time Predictor) 模块解析
一、核心输入输出
输入信号(来自两个模块):
- 来自DataAgg:实时TCP状态(RTT、拥塞窗口、未确认包数)历史传输时间序列(最近8个块的传输耗时)待发送块的大小
- 来自MPC控制器:预测请求触发信号预测时间范围要求(默认5步)
输出信号(到MPC控制器):
- 传输时间概率分布(21个时间区间的概率值)
- 预测置信度评分(0-1范围)
- 异常状态标记(如预测不可靠警告)
二、模块连接图示
[DataAgg] → 网络状态 → [TTP] → 预测结果 → [MPC]
↑____________↓
[历史传输数据]
三、典型数据示例
1. 从DataAgg接收的输入
{
"tcp_state": {
"rtt": 45,
"min_rtt": 30,
"cwnd": 10240,
"in_flight": 7680,
"delivery_rate": 5200000
},
"chunk_size": 1854321,
"history": [1200, 950, 1100, 1350, 900, 1150, 1250, 1000]
}
2. 向MPC输出的预测
{
"prob_distribution": [
0.01, 0.05, 0.15, 0.20, 0.18,
0.12, 0.08, 0.06, 0.04, 0.03,
0.02, 0.01, 0.01, 0.01, 0.01,
0.01, 0.00, 0.00, 0.00, 0.00,
0.01
],
"time_bins": [
"[0,0.25)", "[0.25,0.75)", "[0.75,1.25)",
"[1.25,1.75)", "[1.75,2.25)", ..., "[9.75,∞)"
]
}
四、关键特性总结
- 简单职责:吃进网络状态数据吐出传输时间概率不参与决策(MPC负责)
- 核心能力:处理原始TCP指标(不依赖人工特征)输出完整概率分布(非单点预测)考虑块大小影响(关键创新)
- 不负责:不存储长期历史(DataAgg负责)不做决策优化(MPC负责)不处理执行异常(Server负责)
五、网络架构设计
1. 整体结构
TTP采用前馈神经网络架构,包含3个全连接隐藏层:
输入层(12维) → 隐藏层1(64神经元) → 隐藏层2(64神经元) → 输出层(21维)
↓ ↓
ReLU激活 Softmax归一化
2. 输入层设计
12维输入特征:
- TCP即时状态(5维):当前RTT / 最小RTT拥塞窗口大小(cwnd)在途数据量(in_flight)传输速率(delivery_rate)
- 历史传输(5维):过去4个块的传输时间(滑动窗口)传输时间移动平均
- 块特征(2维):当前块大小(MB)大小变化率(Δsize)
3. 输出层设计
21维概率分布:
- 对应21个时间区间(从[0,0.25s)到[9.75s,∞))
- 使用Softmax确保∑prob=1
- 特殊处理尾部区间(避免概率消失)
4. 训练策略详解
1. 损失函数
采用加权交叉熵:
L=−∑b=121wb⋅yblog(pb)
其中权重w_b:
- 对关键区间(1-5s)赋予更高权重
- 长尾区间(>5s)适当降权
2. 数据准备
预处理流程:
- 标准化:
x_norm = (x - μ) / σ # 各特征独立标准化 - 增强:随机丢包模拟(0-5%)时延抖动注入(±20%)块大小扰动(±10%)
训练集构成:
- 70%正常网络条件
- 20%中度拥塞
- 10%极端情况
3. 优化配置
| 参数 | 设置值 | 说明 |
|---|---|---|
| 优化器 | AdamW | 带权重衰减 |
| 初始LR | 3e-4 | 余弦退火 |
| Batch Size | 512 | 梯度累积 |
| 训练轮次 | 50 | 早停机制 |
MPC模块解析
一、输入输出速查表
| 模块 | 输入 | 输出 |
|---|---|---|
| MPC控制器 | 1. 当前缓冲区水平(秒) 2. 上一个视频块信息(大小/质量) 3. TTP预测的传输时间概率分布 4. 可用视频块的SSIM和大小列表 | 最优视频块选择(包括码率和对应SSIM) |
二、算法三步走
1. 状态采集(输入处理)
inputs = {
'buffer': 5.2, # 当前缓冲区秒数(如5.2秒)
'last_chunk': { # 上一个发送的视频块
'size': 1.8e6, # 字节数
'ssim': 0.92 # 质量评分
},
'prediction': [0.1, 0.3, 0.4, ...], # TTP输出的21-bin概率
'options': [ # 可选视频块列表
{'size': 2.1e6, 'ssim': 0.95},
{'size': 1.5e6, 'ssim': 0.90},
...
]
}
2. 核心决策流程
伪代码实现:
def MPC_decision(inputs):
best_qoe = -inf
best_chunk = None
# 遍历所有可选视频块
for chunk in inputs['options']:
total_qoe = 0
# 考虑所有可能的传输时间
for t, prob in zip(time_bins, inputs['prediction']):
# 计算即时QoE
instant_qoe = chunk['ssim']
- |chunk['ssim'] - inputs['last_chunk']['ssim']|
- 100 * max(0, t - inputs['buffer'])
# 计算未来价值(简化版)
future_value = estimate_future_value(
buffer = inputs['buffer'] - t + chunk_duration,
last_chunk = chunk
)
# 加权累加
total_qoe += prob * (instant_qoe + future_value)
# 保留最优解
if total_qoe > best_qoe:
best_qoe = total_qoe
best_chunk = chunk
return best_chunk
3. 输出执行
选择具有最高期望QoE的视频块,立即发送给客户端
三、关键设计精简说明
- 预测时域:仅预测未来5步(约10秒)但只执行第1步决策,然后重新规划
- 状态离散化:
# 缓冲区离散化(0.5秒步长) buffer_bins = [0, 0.5, 1.0, ..., 15.0] # SSIM离散化(0.05步长) ssim_bins = [0.70, 0.75, ..., 1.00] - 紧急处理:当缓冲区<2秒时:
if current_buffer < 2.0: return min_size_chunk # 强制最低码率
四、与TTP的交互图示
[TCP状态] → TTP → [传输时间概率] → MPC → [视频块选择]
↑____________反馈数据____________↓