[并发实现原理] 0702 ForkJoinPool 结构

    技术2025-10-05  5

    一、分析 基于JDK 1.8

    1、ForkJoinPool 初始化做了什么事情? 工作过程是什么样的?

    Returns the number of processors available to the Java virtual machine defaultForkJoinWorkerThreadFactory checkParallelism()、 checkFactory() 、checkPermission initialize field offsets for CAS etc forkJoinPool.submit(task); // 提交任务 forkJoinPool.shutdown();

    2、源码剖析

    (1) /** * Creates a new ForkJoinWorkerThread. This factory is used unless * overridden in ForkJoinPool constructors. */ public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; (2) static final int MAX_CAP = 0x7fff; // max #workers - 1 (3) public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); } (4) public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); } (5) private static int checkParallelism(int parallelism) { if (parallelism <= 0 || parallelism > MAX_CAP) throw new IllegalArgumentException(); return parallelism; } (6) private static ForkJoinWorkerThreadFactory checkFactory (ForkJoinWorkerThreadFactory factory) { if (factory == null) throw new NullPointerException(); return factory; } (7) private static void checkPermission() { SecurityManager security = System.getSecurityManager(); // 默认为null if (security != null) security.checkPermission(modifyThreadPermission); } (8)// initialize field offsets for CAS etc (9) forkJoinPool.submit(task); // 提交任务 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; } (10) forkJoinPool.shutdown(); public void shutdown() { checkPermission(); tryTerminate(false, true); }

    2、相关属性

    // Instance fields volatile long ctl; // main pool control volatile int runState; // lockable status final int config; // parallelism, mode int indexSeed; // to generate worker index volatile WorkQueue[] workQueues; // main registry final ForkJoinWorkerThreadFactory factory; final UncaughtExceptionHandler ueh; // per-worker UEH final String workerNamePrefix; // to create worker name string volatile AtomicLong stealCounter; // also used as sync monitor private static final sun.misc.Unsafe U; private static final int ABASE; // private static final int ASHIFT; private static final long CTL; private static final long RUNSTATE; private static final long STEALCOUNTER; private static final long PARKBLOCKER; private static final long QTOP; // 队列的头指针 private static final long QLOCK; private static final long QSCANSTATE; private static final long QPARKER; private static final long QCURRENTSTEAL; private static final long QCURRENTJOIN;

    图解 1、ForkJoinPool

    2、RecusuveTask

    3、RecursiveAction

    4、ForkJoinPool

    5、WorkQueue

    6、对比 7、数据结构 8、growArray方法 9、pop方法

    Processed: 0.015, SQL: 9