1、工作窃取算法,是指一个Worker线程在执行完毕自己队列中的任务之后,可以窃取其他线程队列中的任务来执行,从而实现负载均衡,以防有的线程很空闲,有的线程很忙。这个过程要用到工作窃取队列。
队列: 基于一个普通的数组实现
2、队列的三个操作
Worker线程自己,在队列头部,通过对queueTop指针执行加、减操作,实现入队或出队,是单线程的。 其他Worker线程,在队列尾部,通过对queueBase进行累加,实现出队操作,窃取,是多线程的,需要通过CAS操作。 queue top不是volatile的,queue base 是volatile类型。 volatile int base; // index of next slot for poll int top; // index of next slot for push 扩容源码(1)扩容之后的新数组还是空的时候,array。而queueTop、queueBase的值是不变的,其他窃取线程若此时来窃取任务,取到的将全是null,即取不到任务。不过,虽然此时窃取不到,可以阻塞一会儿,待扩容完成就可以窃取到了,不会影响整个算法的正确性。
(2)在把旧数组的元素复制过来之前,先通过CAS操作把旧数组中的该元素置为null。只有CAS成功置为null了,才能赋值到新数组。这样可以避免同1个元素在旧数组、新数组中各有1份。1个窃取线程还在读旧数组,另1个窃取线程读取新数组,导致同1个元素被2个线程重复窃取。
/** * Initializes or doubles the capacity of array. Call either * by owner or with lock held -- it is OK for base, but not * top, to move while resizings are in progress. */ final ForkJoinTask<?>[] growArray() { ForkJoinTask<?>[] oldA = array; int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; if (size > MAXIMUM_QUEUE_CAPACITY) throw new RejectedExecutionException("Queue capacity exceeded"); int oldMask, t, b; ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size]; if (oldA != null && (oldMask = oldA.length - 1) >= 0 && (t = top) - (b = base) > 0) { int mask = size - 1; do { // emulate poll from old array, push to new array ForkJoinTask<?> x; int oldj = ((b & oldMask) << ASHIFT) + ABASE; int j = ((b & mask) << ASHIFT) + ABASE; x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj); if (x != null && U.compareAndSwapObject(oldA, oldj, x, null)) U.putObjectVolatile(a, j, x); } while (++b != t); } return a; } poll 源码 /** * Takes next task, if one exists, in FIFO order. */ final ForkJoinTask<?> poll() { ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t; while ((b = base) - top < 0 && (a = array) != null) { int j = (((a.length - 1) & b) << ASHIFT) + ABASE; t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); if (base == b) { if (t != null) { if (U.compareAndSwapObject(a, j, t, null)) { base = b + 1; return t; } } else if (b + 1 == top) // now empty break; } } return null; }3、整个队列是环形的,是一个数组实现的RingBuffer。
并且queueBase会一直累加,不会减小;queueTop会累加、减小。 当queueTop-queueBase=queue.length-1的时候,队列为满,扩容; 当queueTop=queueBase的时候,队列为空,Worker线程即将进入阻塞状态。