需求说明
在涉及大量数据处理时,我们经常希望能通过批处理的方式来减少比如数据库IO等提升处理效率以及提升tps.
在批次处理时我们会有如下面的需求场景:
- 队列存储请求,队列请求数量达到100或者等待数量满足100的时间超过200ms时,进行请求的批处理.
- 基于上述批次处理,返回每个请求对应的结果(线程间需要通信),例如秒杀我们进行库存扣减的合并处理的同时,也需要返回每个请求的处理结果
下面,针对上面场景可以看下下面的代码demo和案例
需求1. 超过2秒或者数量超过3个,则进行批次处理.(超时 || 超量 >> 批次处理)
/**
* 超时 || 超量
* waitTimeOut || queueSizeOver
*/
@Slf4j
static class BatchQueueDemo {
/**
* 缓冲队列
*/
private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2000);
/**
* 批次处理大小
*/
private static int batchSize = 3;
/**
* 获取请求处理超时时间
*/
private static int timeOutMillSeconds = 2000;
public static void main(String[] args) {
ThreadPoolExecutor producerPool = new ThreadPoolExecutor(100, 100, 180L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(2000), new CustomizableThreadFactory("data-pool-"), new ThreadPoolExecutor.DiscardPolicy());
// 1个异步线程处理消费任务
new Thread(() -> {
while (true) {
try {
List<Integer> list = new ArrayList<>(3);
// waitTime >=timeOutSeconds秒 || listSize >= batchSize >> 执行
Queues.drain(queue, list, batchSize, timeOutMillSeconds, TimeUnit.MILLISECONDS);
if (list.size() >= batchSize) {
log.info("====超量消费数据,listSize->{}", list.size());
} else if (!list.isEmpty()) {
log.info("+++++++++超时消费数据,listSize->{}", list.size());
} else {
log.info(">>>>>>超时无数据");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}, "消费线程").start();
// 模拟请求
Random random = new Random();
for (int i = 0; i < 10000; i++) {
producerPool.execute(() -> {
try {
// 注意: 这里使用put进行阻塞还是offer不阻塞根据业务而定
// 如超过队列容量的请求可以直接返回系统繁忙或者如未抢到则直接用offer
queue.put(1);
log.info("生产数据,队列大小->{}", queue.size());
} catch (Exception e) {
e.printStackTrace();
}
});
// 请求时间随机(这里为了模拟3种场景定的时间较长)
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
执行结果
需求2. 基于上述批次处理,返回每个请求对应的结果(线程间需要通信)-秒杀的合并扣减库存案例
/**
* 基于上述批次处理,返回每个请求对应的结果(线程间需要通信)
* 例如秒杀场景的合并处理
*/
@Slf4j
static class SecondKillDemo {
private static AtomicInteger stock = new AtomicInteger(5);
/**
* 缓冲队列
*/
private static BlockingQueue<SecondKillHandleInfo> queue = new ArrayBlockingQueue<>(2000);
/**
* 批次处理大小
*/
private static int batchSize = 3;
/**
* 获取请求处理超时时间
*/
private static int timeOutMillSeconds = 200;
public static void main(String[] args) {
ThreadPoolExecutor producerPool = new ThreadPoolExecutor(100, 100, 180L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(2000), new CustomizableThreadFactory("data-pool-"), new ThreadPoolExecutor.DiscardPolicy());
// 1个异步线程处理消费任务
new Thread(SecondKillDemo::batchHandle, "秒杀批次处理线程").start();
// 模拟秒杀请求
for (int i = 0; i < 30; i++) {
SecondKillRequest secondKillRequest = new SecondKillRequest("user" + i, 1);
producerPool.execute(() -> secondKill(secondKillRequest));
}
}
/**
* 秒杀请求
*/
private static void secondKill(SecondKillRequest secondKillRequest) {
try {
Result result = new Result();
CountDownLatch countDownLatch = new CountDownLatch(1);
SecondKillHandleInfo secondKillHandleInfo = new SecondKillHandleInfo(secondKillRequest, result, countDownLatch);
// 注意: 这里使用put进行阻塞还是offer不阻塞根据业务而定
// 如超过队列容量的请求可以直接返回系统繁忙或者如未抢到则直接用offer
boolean offer = queue.offer(secondKillHandleInfo);
if (!offer) { // 队列满,直接返回抢购失败
result.setCode("-1");
result.setMsg("抢购失败");
log.info("秒杀结果,userId->{},result->{}", secondKillRequest.getUserId(), JSON.toJSONString(result));
}
// 等待合并线程处理完成再
countDownLatch.await(timeOutMillSeconds, TimeUnit.MILLISECONDS);
log.info("秒杀结果,userId->{},result->{}", secondKillRequest.getUserId(), JSON.toJSONString(result));
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 批次处理请求
* 说明: 这里不处理线程安全之类的,核心在于批次处理队列的应用以及线程间的通信
*/
private static void batchHandle() {
while (true) {
try {
List<SecondKillHandleInfo> list = new ArrayList<>(3);
// waitTime >=timeOutSeconds秒 || listSize >= batchSize >> 执行
Queues.drain(queue, list, batchSize, timeOutMillSeconds, TimeUnit.MILLISECONDS);
if (CollectionUtils.isNotEmpty(list)) {
if (stock.get() <= 0) { // 没有库存时,所有请求返回库存不足
for (SecondKillHandleInfo secondKillHandleInfo : list) {
Result result = secondKillHandleInfo.result;
result.setCode("-1");
result.setMsg("库存不足");
secondKillHandleInfo.countDownLatch.countDown();
}
}
int sum = list.stream().map(SecondKillHandleInfo::getSecondKillRequest).mapToInt(SecondKillRequest::getCount).sum();
if (stock.get() >= sum) { // 合并处理,库存足够
int currentStock = stock.addAndGet(-sum);
log.info("合并处理,currentStock->{}", currentStock);
for (SecondKillHandleInfo secondKillHandleInfo : list) {
Result result = secondKillHandleInfo.result;
result.setCode("0");
result.setMsg("=====抢购成功");
// TODO 业务逻辑
secondKillHandleInfo.countDownLatch.countDown();
}
} else { // 需要单独一个个处理
for (SecondKillHandleInfo secondKillHandleInfo : list) {
Integer count = secondKillHandleInfo.secondKillRequest.count;
if (stock.get() >= count) { // 库存足够
int currentStock = stock.addAndGet(-count);
log.info("单独处理,currentStock->{}", currentStock);
Result result = secondKillHandleInfo.result;
result.setCode("0");
result.setMsg("=====抢购成功");
// TODO 业务逻辑
} else { // 库存不足
Result result = secondKillHandleInfo.result;
result.setCode("-1");
result.setMsg("库存不足");
}
secondKillHandleInfo.countDownLatch.countDown();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
static class Result {
private String code;
private String msg;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
static class SecondKillRequest {
private String userId;
private Integer count;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
static class SecondKillHandleInfo {
private SecondKillRequest secondKillRequest;
private Result result;
private CountDownLatch countDownLatch;
}