ForkJoinPool 相关

package fighting.threads;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

/**
 * ForkJoinPool学习
 * 不同之处:
 * 1、没有一个公共的任务队列,而是每个工作线程都有自己两个任务队列(外部和内部产生的子任务)
 * 2、目的是避免处理大量短平快任务时,对公共队列任务的争抢引发性能问题。即每个任务执行都很快,但多并发情况下公共任务队列push和pop的加锁成为了瓶颈
 *
 * 两个作用:
 * 1、分而治之,允许任务执行过程中,再次提交子任务,从而缩短时间
 * 2、工作窃取,线程池中已完成的工作线程,将主动获取所有线程队列中已提交未完成的任务或子任务
 *
 * 对应的类:
 * ForkJoinPool:线程池
 * ForkJoinPool.common:全局公用的线程池
 * ForkJoinTask:有返回值的任务
 * ForkJoinAction:没有返回值任务
 *
 * 相关类:
 * Executors.newWorkStealingPool()
 * stream.parallelStream()
 */
public class ForkJoinPoolTest {

    /**
     * 测试入口
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool(10);
        ForkJoinTask<Integer> result = pool.submit(new CalcIntegerTask(0, 100000));
        System.out.println(result.get());
        pool.shutdown();

        // 通过executors创建线程池
        Executors.newWorkStealingPool().submit(() -> System.out.println("a runnable task"));

        // parallelStream(pærəlel),默认使用 ForkJoinPool.common
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        numbers.parallelStream().forEach(System.out::println);
    }

}

/**
 * 数字计算任务
 */
class CalcIntegerTask extends RecursiveTask<Integer> {

    private Integer start;

    private Integer end;

    public CalcIntegerTask(Integer start, Integer end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if (end - start <= 100) {
            // 数据量少时直接计算
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 数据量大时,分而治之。拆分子任务提交
            int middle = (end - start) / 2 + start;
            CalcIntegerTask leftTask = new CalcIntegerTask(start, middle);
            CalcIntegerTask rightTask = new CalcIntegerTask(middle + 1, end);

            // 子任务需要调用 fork 方法
            leftTask.fork();
            rightTask.fork();

            // 使用 join 方法等待并获取值
            sum = leftTask.join() + rightTask.join();
        }

        return sum;
    }
}


发布于 2020/06/15 浏览