ForkJoinPool 相关
package fighting.threads;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
/**
* ForkJoinPool学习
* 不同之处:
* 1、没有一个公共的任务队列,而是每个工作线程都有自己两个任务队列(外部和内部产生的子任务)
* 2、目的是避免处理大量短平快任务时,对公共队列任务的争抢引发性能问题。即每个任务执行都很快,但多并发情况下公共任务队列push和pop的加锁成为了瓶颈
*
* 两个作用:
* 1、分而治之,允许任务执行过程中,再次提交子任务,从而缩短时间
* 2、工作窃取,线程池中已完成的工作线程,将主动获取所有线程队列中已提交未完成的任务或子任务
*
* 对应的类:
* ForkJoinPool:线程池
* ForkJoinPool.common:全局公用的线程池
* ForkJoinTask:有返回值的任务
* ForkJoinAction:没有返回值任务
*
* 相关类:
* Executors.newWorkStealingPool()
* stream.parallelStream()
*/
public class ForkJoinPoolTest {
/**
* 测试入口
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool pool = new ForkJoinPool(10);
ForkJoinTask<Integer> result = pool.submit(new CalcIntegerTask(0, 100000));
System.out.println(result.get());
pool.shutdown();
// 通过executors创建线程池
Executors.newWorkStealingPool().submit(() -> System.out.println("a runnable task"));
// parallelStream(pærəlel),默认使用 ForkJoinPool.common
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream().forEach(System.out::println);
}
}
/**
* 数字计算任务
*/
class CalcIntegerTask extends RecursiveTask<Integer> {
private Integer start;
private Integer end;
public CalcIntegerTask(Integer start, Integer end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if (end - start <= 100) {
// 数据量少时直接计算
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 数据量大时,分而治之。拆分子任务提交
int middle = (end - start) / 2 + start;
CalcIntegerTask leftTask = new CalcIntegerTask(start, middle);
CalcIntegerTask rightTask = new CalcIntegerTask(middle + 1, end);
// 子任务需要调用 fork 方法
leftTask.fork();
rightTask.fork();
// 使用 join 方法等待并获取值
sum = leftTask.join() + rightTask.join();
}
return sum;
}
}
发布于 2020/06/15
浏览
次