因为需求变动的比较多,所以文章里面的内容以每一次的需求变动为一小节。文章到最后也没有能够很好的实现需求,主要是因为性能下降的问题。请教一下各位大神有没有比较好的解决方案。
原有接口
原有接口实现的功能大致是,读取前端传递文件的手机号码和一些其他参数,手机号保存到set。调用一个批量执行的工具类(线程池),将set丢进线程池,将set里面的手机号分为几个批次,几个批次同时发送短信。
接口参数
使用form-data格式
这里写几个比较重要的
参数名 | 参数值 | 备注 |
---|---|---|
intervalTime | 0 | 间隔时间(ms) |
file | phone.txt | 上传手机号文件(txt格式) |
其他参数 | 短信账号、模板等。。 |
1.初始需求
需求
初始的需求还比较简单,需求有两个。
一是要求新增查询短信发送任务接口,前端能查询短信发送任务的开始时间、结束时间、任务状态、以及一些其他短信参数。同时要求每条短发发送时要有间隔(间隔时间可选)
二是修改读取文件的类型,原有读取文件为excel格式,现要求读取文件类型为txt格式,同时要防止oom。
解决方案
由于原有的批量发送短信接口并没有记录开始时间、结束时间等信息,于是便添加一个记录时间的操作,并将一个固定字符串拼接开始时间作为Redis的key,将一些查询信息存入Redis中。
至于读取txt文件防止oom这一个问题,采用了将上传的文件写到本地,使用nio包下的file读取本地文件。
@Data
public class SmsInfoDTO{
// 任务名称 (线程名称)
private String taskName;
// 任务状态 0:正在执行;1:已结束;2:被结束
private Integer status;
// 开始时间
private LocalDateTime startTime;
// 结束时间
private LocalDateTime endTime;
// 其他属性...
}
/**
* 批量发送短信任务
* @param intervalTime 间隔时间
* @param file 文件
*/
@override
public Result sendSms(Integer intervalTime, MultipartFile file) throws IOException {
// Redis中key的前缀
String key = "SEND_SMS:";
// 将上传文件保存到本地 即 multipartFile -> file
String fileName = file.getOriginalFilename();
Path path = Paths.get("file/", fileName);
// 写入文件之前 先判断文件夹是否存在 不存在则创建
if(!Files.exists(path.getParent())){
Files.createDirectories(path.getParent());
}
Files.write(path, file.getBytes());
// 删除保存到本地的文件
Files.delete(path);
new Thread(()->{
// 记录开始时间
LocalDateTime startTime = LocalDateTime.now();
// 构建一个实体类,实体类中属性为一些要存储到Redis中的信息
SmsInfoDTO smsInfoDto = new SmsInfoDTO();
// set注入 这里忽略不写 smsInfoDto.set...
//使用Redis工具类将key、value存入redis;记录开始状态
redisTemplate.opsForValue.set(
key + startTime.format(DateTimeFormatter.ofPattern(format)),
JSONObject.toJSONString(smsInfoDto),
RedisConstans.ONE_DAY * 2);
FileChannel channel = FileChannel.open(path, StandardOpenOption.READ);
// LF 和 CRLF 的区别
// 缓冲区一条电话号码占字节大小 12B unix/linux 13B Windows
// 11B + 2B换行符(/r/n)windows | 1B换行符(/n)linux
// 默认开辟25KB的直接缓存空间 allocateDirect直接缓存空间|allocate非直接缓存空间
ByteBuffer buffer = ByteBuffer.allocateDirect(26000);
int bytesRead = channel.read(buffer);
buffer.flip();
while (bytesRead != -1) {
//将缓冲区中的数据转换为字符串数组
String content = new String(buffer.array(), 0, bytesRead);
String[] phoneList = content.split("\r\n");
//处理读取到的内容
for (String phone : phoneList) {
// 调用线程池
// 调用短信工厂 获取短信发送策略 发送短信 这里忽略 只提供思路
Thread.sleep(intervalTime)
}
//清空缓冲区
buffer.clear();
//继续读取下一个块
bytesRead = channel.read(buffer);
}
channel.close();
// 记录结束时间
LocalDateTime endTime = LocalDateTime.now();
// set方法 这边忽略 smsInfoDto.setEndTime(endTime)....
smsInfoDto.setStatus(1);
// 存入Redis
redisTemplate.opsForValue.set(
key + startTime.format(DateTimeFormatter.ofPattern(format)),
JSONObject.toJSONString(smsInfoDto),
RedisConstans.ONE_DAY * 2);
}).start();
return Result.success();
}
/**
* 查询短信发送任务
*/
@Override
public Result getSmsSendTask() {
// 从Redis中获取所有key
Set<String> values = RedisUtils.keys(key + "*");
List<SmsInfoVO> smsInfoVOList = new ArrayList<>();
//根据key获取value
values.forEach(value -> {
String valueStr = RedisUtils.getString(value);
SmsInfoDTO smsInfoDto = JSONObject.parseObject(valueStr, SmsInfoDTO.class);
SmsInfoVO smsInfoVo = new SmsInfoVO();
BeanUtils.copyProperties(smsInfoDto,smsInfoVo);
smsInfoVOList.add(smsInfoVo);
});
// 返回结果
if(values.isEmpty()){
return Result.success("没有存在的任务");
}
//根据任务开始时间倒序
sendTaskVOList.sort(Comparator.comparing(smsInfoVOList::getStartTime).reversed());
return Result.success(smsInfoVOList);
}
2.第二次需求
需求
在第一次需求的前提下,新增了两个需求。
一是要求在单位时间内能控制短信的发送数量可控,发送短信有序,以便较好的预测短信发送任务的完成时间。(比如说发送18000条短信,间隔时间10ms,那么预计完成时间就应该是3分钟左右。
二是新增了终止任务的需求,要求能够随时终止任务。
解决方案
到这里比较头大,主要是需求没理解清楚(大家听需求的时候如果有不明白的一定要问,不然回座位敲代码也是一头雾水)。
第二个需求要终止线程比较容易,添加两个线程共享变量,发送短信的线程执行的时候对线程共享变量进行判断就可以。
至于第一个需求,如果要有序可控那么只有使用单线程处理短信发送能够做到有序可控。
相比于第一次需求的代码,第二次需求的改动主要在于添加了线程共享变量的判断和删除了原有线程池的使用。
// Redis中key的前缀
private final String key = "SEND_SMS:";
// 线程共享变量 任务名称
private volatile String taskName;
// 线程共享变量 任务停止标识
private volatile boolean flag = false;
// Redis时间格式化
private final String format = "yyyy_MM_dd_HH_mm_ss";
/**
* 批量发送短信任务
* @param intervalTime 间隔时间
* @param file 文件
*/
@override
public Result sendSms(Integer intervalTime, MultipartFile file) throws IOException {
// 线程状态初始化
flag = false;
// 将上传文件保存到本地 即 multipartFile -> file
String fileName = file.getOriginalFilename();
Path path = Paths.get("file/", fileName);
// 写入文件之前 先判断文件夹是否存在 不存在则创建
if(!Files.exists(path.getParent())){
Files.createDirectories(path.getParent());
}
Files.write(path, file.getBytes());
// 删除保存到本地的文件
Files.delete(path);
new Thread(()->{
// 记录开始时间
LocalDateTime startTime = LocalDateTime.now();
// 构建一个实体类,实体类中属性为一些要存储到Redis中的信息
SmsInfoDTO smsInfoDto = new SmsInfoDTO();
// set注入 这里忽略不写 smsInfoDto.set...
smsInfoDto.setStatus(0);
//使用Redis工具类将key、value存入redis;记录开始状态
redisTemplate.opsForValue.set(
key + startTime.format(DateTimeFormatter.ofPattern(format)),
JSONObject.toJSONString(smsInfoDto),
RedisConstans.ONE_DAY * 2);
FileChannel channel = FileChannel.open(path, StandardOpenOption.READ);
// 缓冲区一条电话号码占字节大小 12B unix/linux 13B Windows
// 11B + 2B换行符(/r/n)windows | 1B换行符(/n)linux
// 默认开辟25KB内存空间
ByteBuffer buffer = ByteBuffer.allocate(26000);
int bytesRead = channel.read(buffer);
buffer.flip();
while (bytesRead != -1) {
//将缓冲区中的数据转换为字符串数组
String content = new String(buffer.array(), 0, bytesRead);
String[] phoneList = content.split("\r\n");
//处理读取到的内容
for (String phone : phoneList) {
if (flag && taskName.equals(Thread.currentThread().getName())){
if (intervalTime == 0){
executorService.shutdown();
}
extracted(smsInfoDto, startTime);
flag = false;
return;
}
// 调用短信工厂 获取短信发送策略 发送短信 这里忽略 只提供思路
Thread.sleep(intervalTime)
}
//清空缓冲区
buffer.clear();
//继续读取下一个块
bytesRead = channel.read(buffer);
}
channel.close();
extracted(smsInfoDto, startTime);
}).start();
return Result.success();
}
/**
* 终止发送任务
* @param startTime 任务开始时间
*/
@Override
public Result stopSmsBatchSend(LocalDateTime startTime) {
log.info("终止操作人:{}", operatorUser);
// 自定义的工具类 获取键值
String valueStr = RedisUtils.getString(key + startTime.format(DateTimeFormatter.ofPattern(format)));
if (valueStr == null){
return Result.failed("该任务不存在!");
}
SmsInfoDTO smsInfoDto = JSONObject.parseObject(valueStr, SmsInfoDTO.class);
if (smsSendTaskDto.getStatus() != 0){
return Result.failed("该任务已经完成或已被终止,不能终止!");
}
// 设置停止标志
flag = true;
// 设置停止任务的线程名称
taskName = smsInfoDto.getTaskName();
extracted(smsInfoDto, startTime);
SmsInfoVO smsInfoVo = new SmsInfoVO();
BeanUtils.copyProperties(smsInfoDto, SmsInfoVO);
return Result.success("终止成功!", smsSendTaskVO);
}
/**
* 任务结束记录
* @param smsInfoDto 实体类
* @param startTime 任务开始时间
*/
private void extracted(SmsInfoDTO smsInfoDto, LocalDateTime startTime) {
//记录结束时间
LocalDateTime endTime = LocalDateTime.now();
smsInfoDto.setEndTime(endTime);
// 判断是否是被终止的
if (flag){
smsSendTaskDto.setStatus(2);
}else {
smsSendTaskDto.setStatus(1);
}
// 存入Redis
RedisUtils.setValue(
key + startTime.format(DateTimeFormatter.ofPattern(format)),
JSONObject.toJSONString(smsInfoDto), RedisConstans.ONE_DAY * 2);
}
3.总结
虽然完成了需求,但是这段代码的性能却不高。在运行了两个小时后,性能下降明显。任务刚开始时代码间的延迟为0,到了后期延迟为60ms。看着延迟不高,但是对于发送几十万条短信,每条短信延迟60ms是致命的。
目前也没有什么思路解决这个问题,领导让另外一个同事看了也说没有什么优化方向,于是就采用了单线程和多线程可选的方式保留。