小米训练营-大作业

[!NOTE]

背景

BMS系统是智能化管理及维护各个电池单元,防止电池出现过充电和过放电、延长电池的使用寿命、监控电池状态的系统。在BMS系统中存在大量电池各种信号的规则管理以及监控,良好的是处理信号,并且根据规则,生成相关预警信息,能够极大提升用户体验。为此需要大家完成一套支持规则配置、信号预警的系统,来解决电池各种突发情况和提升用户体验。

1. 技术方案

1.1 系统设计

​ 技术栈:Java17 + SpringBoot2.6.13 + SpringCloudAlibaba3.1.9 + RocketMQ5.3.2 + Nacos2.5.1 + caffeine2.9.3 + Redis2.6.9 + MySQL5.7

whiteboard_exported_image (2)

1.2 数据库表设计

​ 共由两个数据库:signal库和warn

  • signal库只用来存放信号信息、并根据carIdsignal_info_0signal_info_1表。

  • warn库用来存放车辆、规则、预警信息,分为car_info表、rule_info表、warn_info表。

    create table signal_info_0 / signal_info_1
    (
    id bigint auto_increment comment '主键自增id'
    primary key,
    car_id bigint not null comment '车辆id',
    warn_id tinyint null comment '报警编号',
    Mx decimal(6, 3) null comment '最高电压',
    Mi decimal(6, 3) null comment '最小电压',
    Ix decimal(6, 3) null comment '最高电流',
    Ii decimal(6, 3) null comment '最小电流',
    create_time datetime not null comment '创建时间'
    );

    create table car_info
    (
    id bigint auto_increment comment '主键自增id'
    primary key,
    vid varchar(16) not null comment '车辆识别码',
    battery_type tinyint not null comment '电池类型:1-三元电池,2-铁锂电池',
    total_mileage decimal(10, 2) default 0.00 not null comment '总里程',
    battery_health decimal(5, 2) default 100.00 not null comment '电池健康度',
    create_time datetime not null comment '创建时间',
    update_time datetime not null comment '更新时间',
    constraint vid
    unique (vid)
    );

    create table rule_info
    (
    id bigint auto_increment comment '主键自增id'
    primary key,
    warn_id tinyint not null comment '规则编号',
    warn_name varchar(16) not null comment '报警名称',
    battery_type tinyint not null comment '电池类型:1-三元电池,2-铁锂电池',
    difference char(4) not null comment '电压/电流',
    warn_rule varchar(255) not null comment '预警规则',
    create_time datetime not null comment '创建时间',
    update_time datetime not null comment '更新时间'
    );

    create table warn_info
    (
    id bigint auto_increment comment '主键自增id'
    primary key,
    car_id bigint not null comment '车辆id',
    battery_type tinyint not null comment '电池类型:1-三元电池,2-铁锂电池',
    warn_name varchar(16) not null comment '报警名称',
    warn_level tinyint not null comment '报警等级',
    create_time datetime not null comment '创建时间'
    );

1.3 接口设计

核心功能共有四个接口:

  1. signal微服务的uploadSignal上传信号接口:
    • POST/api/upload/signal
  2. signal微服务的getSignal获取信号接口:
    • GET/api/upload/signal/{carId}
  3. warn微服务的warnSignal预警信号接口:
    • POST/api/warn/signal
  4. warn微服务的warnSignal预警信号接口(供signal微服务调用):
    • GET/api/warn/rules

1.4 单元测试

​ 使用Apifox对接口进行测试:

image-20250605213559831

2. 代码

2.1 代码地址

Gitee小米训练营-大作业:xiaomi-bms

2.2 核心代码

GetSignalServiceImpl类:

@Slf4j
@Service
@RequiredArgsConstructor
public class GetSignalServiceImpl implements GetSignalService {

private final RedisTemplate<String, Object> redisTemplate;
private final WarnClient warnClient;

@Override
public SignalVO getSignal(Long carId) {
List<SignalDTO> signals = new ArrayList<>();
String key = "SignalOfCarId:" + carId + ":";
try {
Set<Integer> warnIds = warnClient.getRules().stream().map(Rule::getWarnId).collect(Collectors.toSet());
for (Integer warnId : warnIds) {
Object obj = redisTemplate.opsForValue().get(key + warnId);
if (obj instanceof Map) {
signals.add(new JacksonObjectMapper().convertValue(obj, SignalDTO.class));
}
}
SignalVO signalVO = new SignalVO();
signalVO.setSignals(signals);
return signalVO;
} catch (Exception e) {
log.error("从Redis中获取对象失败");
return null;
}
}
}

UploadSignalServiceImpl类:

@Service
@RequiredArgsConstructor
public class UploadSignalServiceImpl implements UploadSignalService {

private final RedisTemplate<String, Object> redisTemplate;
private final SignalProducer signalProducer;
private final WarnClient warnClient;

@Override
public void uploadAndSave(SignalDTO signalDTO) {
signalDTO.setCreateTime(LocalDateTime.now());
String key = "SignalOfCarId:" + signalDTO.getCarId().toString() + ":";
//保存进入redis中
if (signalDTO.getWarnId() == null) {
Set<Integer> warnIds = warnClient.getRules().stream().map(Rule::getWarnId).collect(Collectors.toSet());
for (Integer warnId : warnIds) {
signalDTO.setWarnId(warnId);
redisTemplate.opsForValue().set(key + warnId, signalDTO);
}
signalDTO.setWarnId(null);
} else {
redisTemplate.opsForValue().set(key + signalDTO.getWarnId(), signalDTO);
}
//异步存入数据库
signalProducer.sendSaveMessage(signalDTO);
}
}

WarnSignalServiceImpl类:

@Service
@RequiredArgsConstructor
public class WarnSignalServiceImpl implements WarnSignalService {
private final SignalClient signalClient;
private final CarMapper carMapper;
private final RuleService ruleService;
private final StringRedisTemplate stringRedisTemplate;
private final WarnProducer warnProducer;

@Override
public WarnVO warnSignal(List<SignalDTO> SignalDTOList) {
List<WarnDTO> warns = new ArrayList<>();
for (SignalDTO signalDTO : SignalDTOList) {
//1. 调用upload微服务的uploadSignal方法
signalClient.uploadSignal(signalDTO);
//2. 调用upload服务的getSignal方法
List<SignalDTO> signals = signalClient.getSignal(signalDTO.getCarId()).getData().getSignals();
for (SignalDTO signal : signals) {
WarnDTO warnDTO = checkSignal(signal);
if (warnDTO != null) {
warns.add(warnDTO);
}
}
}
return !warns.isEmpty() ? new WarnVO(warns) : null;
}

public WarnDTO checkSignal(SignalDTO signal) {
// 通过caffeine生成Rule列表
List<Rule> rules = ruleService.getAllRulesWithCache();

//1. 查询当前信号的车辆信息
String key = "BatteryTypeOfCarId:" + signal.getCarId();
if (!stringRedisTemplate.hasKey(key)) {
stringRedisTemplate.opsForValue().set(key, carMapper.selectById(signal.getCarId()).getBatteryType().toString());
}
Integer batteryType = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(key)));
// Car car = carMapper.selectById(signal.getCarId());

//2. 根据车辆电池类型和warnId在caffeine报警规则
Rule rule = null;
for (Rule value : rules)
if (Objects.equals(value.getWarnId(), signal.getWarnId()) && Objects.equals(value.getBatteryType(), batteryType)) {
rule = value;
break;
}
// Rule rule = ruleMapper.selectByBatteryAndWarnId(car.getBatteryType(), signal.getWarnId());

//3. 解析预警规则
if (rule != null) {
BigDecimal subtract;
if ("M".equalsIgnoreCase(rule.getDifference())) {
if (signal.getMx() == null || signal.getMi() == null) subtract = BigDecimal.valueOf(0);
else subtract = signal.getMx().subtract(signal.getMi());
} else if ("I".equalsIgnoreCase(rule.getDifference())) {
if (signal.getIx() == null || signal.getIi() == null) subtract = BigDecimal.valueOf(0);
else subtract = signal.getIx().subtract(signal.getIi());
} else return null;
String[] limit = rule.getWarnRule().split(",");
// 快速判断是否预警
if (subtract.compareTo(new BigDecimal(limit[limit.length - 1])) < 0) return null;
for (int i = 0; i < limit.length; i++) {
if (subtract.compareTo(new BigDecimal(limit[i])) >= 0) {
WarnDTO warnDTO = new WarnDTO();
warnDTO.setCarId(signal.getCarId());
warnDTO.setBatteryType(batteryType);
warnDTO.setWarnName(rule.getWarnName());
warnDTO.setWarnLevel(i);
warnDTO.setCreateTime(signal.getCreateTime());
//TODO 发送消息存入数据库中
warnProducer.sendSaveMessage(warnDTO);
return warnDTO;
}
}
}
return null;
}

}

SignalConsumer0类:

@Service
@Slf4j
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = "signalToSave0",
consumerGroup = "signal-consumer"
)
public class SignalConsumer0 implements RocketMQListener<SignalDTO> {

private final SignalMapper signalMapper;

// 缓存消息列表
private final List<Signal> signalList = new ArrayList<>();


@Override
//幂等性处理:如果消息可能重复,建议在插入前检查是否已存在相同的消息。
//性能优化:根据实际需求调整批处理大小(如 100 条)以平衡内存占用和吞吐量
public void onMessage(SignalDTO signalDTO) {
synchronized (signalList) {
try {
Signal signal = new Signal();
BeanUtils.copyProperties(signalDTO, signal);
// 添加消息到缓存
signalList.add(signal);

// 达到100条时批量插入
if (signalList.size() >= 100) {
signalMapper.batchInsertSignals0(signalList);
signalList.clear(); // 清空缓存
}
} catch (Exception e) {
log.error("Failed to process message: {}", e.getMessage(), e);
// 可选:将失败的消息重新放回队列或进行补偿处理
}
}
}
}

WarnAllSignalTask类:

@Service
@Slf4j
@RequiredArgsConstructor
public class WarnAllSignalTask {
private final RedisTemplate<String, Object> redisTemplate;
private final SignalProducer signalProducer;

@Scheduled(cron = "0 0/10 * * * ?")
public void warnAllSignal() {
// 获取所有以 "SignalOfCarId:" 为前缀的键
String keyPattern = "SignalOfCarId:*";
Set<String> keys = scanKeys(keyPattern);

if (!keys.isEmpty()) {
for (String key : keys) {
// 获取每个键对应的值
Object obj = redisTemplate.opsForValue().get(key);
if (obj instanceof Map) {
signalProducer.sendCheckMessage(new JacksonObjectMapper().convertValue(obj, SignalDTO.class));
}
}
} else {
log.error("SignalOfCarId为空");
}
}

/*scanKeys 方法使用了 RedisTemplate 的 execute 方法来执行 Redis 的 SCAN 命令。
SCAN 命令用于逐步迭代 Redis 中的键空间,而不会阻塞 Redis 服务器。
这种方法比直接使用 KEYS 命令更高效,尤其是在处理大量键时。*/
private Set<String> scanKeys(String pattern) {
Set<String> keys = new HashSet<>();
redisTemplate.execute((RedisConnection connection) -> {
try (Cursor<byte[]> cursor = connection.scan(ScanOptions.scanOptions().match(pattern).build())) {
cursor.forEachRemaining(item -> keys.add(new String(item, StandardCharsets.UTF_8)));
}
return null;
});
return keys;
}
}

3. 运行截图

3.1 网关鉴权

请求需要进行鉴权,防止非法上报信号:

image-20250605215328596

3.2 上报信号

结果返回正常:

image-20250605215215312

信号批处理:

image-20250605215248676

信号存放入Redis当中:

image-20250605215623530

3.3 获取信号

获取carId下的所有规则的信号数据:

image-20250605215915301

3.4 预警信息

结果放回需要预警的carId对应的预警信息:

image-20250605220228036

同时更新Redis当中的数据:

image-20250605220405473

并将结果存入warn数据库当中:

image-20250605220442568

3.5 定时任务

使用非阻塞方法扫描Redis,并发送消息给warn微服务:

image-20250605221055818

warn微服务调用checkSignal方法,检查信号是否需要预警:

image-20250605221305057