设计目标
实现一个高性能、高准确性的视频实时在线人数统计系统,支持百万级并发用户
系统架构概览
MermaidOpen SVG
核心数据结构设计
1. Redis Hash 存储在线用户
数据结构说明
使用 Hash 结构存储每个视频的在线用户信息,支持 O(1) 的用户增删和 O(1) 的人数统计
| Key Pattern | Field | Value | 说明 |
|---|---|---|---|
video:online:{videoId} | {userId} | {timestamp} | 存储用户最后活跃时间 |
示例:video:online:10001 | user_12345 | 1706789123 | 用户12345在视频10001的活跃时间 |
2. Redis KV 存储清理标记
| Key Pattern | Value | TTL | 说明 |
|---|---|---|---|
video:clean:{videoId} | 1 | 60s | 清理标记,存在则跳过清理 |
详细流程设计
用户进入房间流程
MermaidOpen SVG
过期清理流程
MermaidOpen SVG
核心代码实现
代码示例
@Service
@Slf4j
public class OnlineCountService {
private static final String ONLINE_KEY_PREFIX = "video:online:";
private static final String CLEAN_KEY_PREFIX = "video:clean:";
private static final int HEARTBEAT_TIMEOUT = 90; // 90秒超时
private static final int CLEAN_INTERVAL = 60; // 60秒清理间隔
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 用户进入房间
*/
public Long enterRoom(String videoId, String userId) {
String key = ONLINE_KEY_PREFIX + videoId;
String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
// 原子操作:设置用户在线状态
redisTemplate.opsForHash().put(key, userId, timestamp);
// 返回当前在线人数
return getOnlineCount(videoId);
}
/**
* 获取在线人数(带清理逻辑)
*/
public Long getOnlineCount(String videoId) {
String onlineKey = ONLINE_KEY_PREFIX + videoId;
String cleanKey = CLEAN_KEY_PREFIX + videoId;
// 尝试设置清理标记
Boolean needClean = redisTemplate.opsForValue()
.setIfAbsent(cleanKey, "1", Duration.ofSeconds(CLEAN_INTERVAL));
if (Boolean.TRUE.equals(needClean)) {
// 异步执行清理
CompletableFuture.runAsync(() -> cleanExpiredUsers(videoId));
}
return redisTemplate.opsForHash().size(onlineKey);
}
/**
* 清理过期用户
*/
private void cleanExpiredUsers(String videoId) {
String key = ONLINE_KEY_PREFIX + videoId;
long currentTime = System.currentTimeMillis() / 1000;
// 获取所有在线用户
Map<Object, Object> users = redisTemplate.opsForHash().entries(key);
List<String> expiredUsers = users.entrySet().stream()
.filter(entry -> {
long lastActiveTime = Long.parseLong((String) entry.getValue());
return currentTime - lastActiveTime > HEARTBEAT_TIMEOUT;
})
.map(entry -> (String) entry.getKey())
.collect(Collectors.toList());
// 批量删除过期用户
if (!expiredUsers.isEmpty()) {
redisTemplate.opsForHash().delete(key, expiredUsers.toArray());
log.info("清理视频 {} 的过期用户 {} 个", videoId, expiredUsers.size());
}
}
}性能优化策略
优化要点
1. 批量操作优化
// 使用 Pipeline 批量更新心跳
public void batchHeartbeat(Map<String, List<String>> videoUserMap) {
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
long timestamp = System.currentTimeMillis() / 1000;
videoUserMap.forEach((videoId, userIds) -> {
byte[] key = (ONLINE_KEY_PREFIX + videoId).getBytes();
userIds.forEach(userId -> {
connection.hSet(key, userId.getBytes(),
String.valueOf(timestamp).getBytes());
});
});
return null;
});
}2. 缓存优化
| 优化项 | 实现方式 | 效果 |
|---|---|---|
| 本地缓存 | Caffeine 缓存1秒 | 减少 90% Redis 请求 |
| 批量心跳 | 客户端聚合30秒发送 | 降低网络开销 |
| 异步清理 | CompletableFuture | 不阻塞主流程 |
监控与告警
监控指标
@Component
public class OnlineCountMetrics {
private final MeterRegistry registry;
// 在线人数指标
@Scheduled(fixedDelay = 5000)
public void recordMetrics() {
videoIds.forEach(videoId -> {
long count = onlineCountService.getOnlineCount(videoId);
registry.gauge("video.online.count",
Tags.of("video_id", videoId), count);
});
}
// 清理耗时监控
public void recordCleanupTime(String videoId, long duration) {
registry.timer("video.cleanup.duration",
Tags.of("video_id", videoId))
.record(duration, TimeUnit.MILLISECONDS);
}
}系统容量评估
容量计算
| 指标 | 计算方式 | 数值 |
|---|---|---|
| 单视频内存占用 | 100万用户 × (8字节userId + 8字节时间戳) | ≈ 16MB |
| QPS承载能力 | Redis单实例 10万QPS | 支持 100万并发 |
| 清理操作耗时 | 100万用户遍历 | < 100ms |
方案优势总结
核心优势
- 高实时性:用户进入/退出立即反映,心跳保证准确性
- 高性能:O(1) 的人数统计,清理操作分摊到各个请求
- 容错性强:即使退出信令丢失,心跳超时也能保证数据准确
- 可扩展:支持 Redis Cluster 横向扩展
