目录
背景
CAS
简介
应用
CountDownLatch
简介
应用
Callable接口
简介
使用
Lock同步锁
简介
使用
volatile关键字
并发容器类
简介
其他容器类
虚假唤醒
Condition的唤醒与等待
读写锁
简介
用法
线程池
简介
用法
ForkJoinPool分支/合并框架
简介
使用
代替
结语
继续赋闲……整理上个月学习JUC线程的笔记。JUC是java.util.concurrent包的简称,示例代码都是在jdk8的环境下运行的
CAS是Compare And Swap的简称,用于保证数据的原子性,这是硬件对于并发操作共享数据的支持
包含三个值:内存值V、预估值A,更新值B
当且仅当V == A时,V = B,否则不进行任何操作
CAS在java中的实现类为原子数据类,可以对值进行原子性修改
原始的修改比如i = i++分为三步:
int temp = i;
i = i + 1;
i = temp;
这在多线程的环境下显然是不安全的
public class Main { public static void main(String[] args) { CASDemo casDemo = new CASDemo(); for (int i = 0; i < 10; i++) { new Thread(casDemo).start(); } } } class CASDemo implements Runnable { private int mNum = 0; @Override public void run() { try { Thread.sleep(500); System.out.println(Thread.currentThread().getName() + ":" + getNum()); } catch (InterruptedException e) { e.printStackTrace(); } } public int getNum() { return mNum++; } }输出结果可能会有值重复,但如果换成原子类,就不会有这种情况
public class Main { public static void main(String[] args) { CASDemo casDemo = new CASDemo(); for (int i = 0; i < 10; i++) { new Thread(casDemo).start(); } } } class CASDemo implements Runnable { private AtomicInteger mAtomicNum = new AtomicInteger(0); @Override public void run() { try { Thread.sleep(500); System.out.println(Thread.currentThread().getName() + ":" + getAtomicNum()); } catch (InterruptedException e) { e.printStackTrace(); } } public int getAtomicNum() { return mAtomicNum.getAndIncrement(); } }原子变量除了使用CAS算法保证操作的原子性外,还使用了volatile关键字保证内存可见性
CountDownLatch(闭锁):在完成某些运算时,只有其他所有线程的运算全部完成,当前运算才继续执行
用法参加以下代码,及其注释
public class Main { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(5); // 初始化闭锁对象,给出初值 LatchDemo demo = new LatchDemo(latch); long start = System.currentTimeMillis(); for (int i = 0; i < 5; i++) { new Thread(demo).start(); } latch.await(); // 等待闭锁值为0 System.out.println("耗费时间:" + (System.currentTimeMillis() - start)); } } class LatchDemo implements Runnable { private final CountDownLatch mLatch; public LatchDemo(CountDownLatch latch) { mLatch = latch; } @Override public void run() { for (int i = 0; i < 50000; i++) { if (i % 2 == 0) { System.out.println(i); } } synchronized (mLatch) { // 由于闭锁对象被多个线程引用,所以此处加个同步锁 mLatch.countDown(); // 子线程执行完,闭锁值-1 } } }除了实现Runnable接口和继承Thread类,实现Callable接口是实现线程的第三种方式,相比于实现Runnable接口,Callable接口需要给出类型,方法有返回值和异常,且需要FutureTask的支持
先实现Callable接口,指定泛型为String,生成长度为10的随机字母序列
class CallableDemo implements Callable<String> { private static final String sCHARACTERS = "abcdefghijklmnopqrstuvmxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; private final Random mRandom = new Random(); @Override public String call() throws Exception { StringBuilder builder = new StringBuilder(); for (int i = 0; i < 10; i++) { int index = Math.abs(mRandom.nextInt()) % sCHARACTERS.length(); builder.append(sCHARACTERS.charAt(index)); } return builder.toString(); } }再使用FutureTask执行任务并获取结果
public class Main { public static void main(String[] args) throws InterruptedException { try { CallableDemo demo = new CallableDemo(); FutureTask<String> task = new FutureTask<>(demo); // 实例化FutureTask,泛型和Callable的泛型一致 new Thread(task).start(); System.out.println(task.get()); // get()方法会阻塞,直到结果返回。因此FutureTask也可以用于闭锁 } catch (ExecutionException e) { e.printStackTrace(); } } }用FutureTask实现闭锁来统计线程运行时间,也比较容易
try { CallableDemo demo = new CallableDemo(); long start = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { FutureTask<String> task = new FutureTask<>(demo); new Thread(task).start(); System.out.println(task.get()); } System.out.println("耗费时间:" + (System.currentTimeMillis() - start)); } catch (ExecutionException e) { e.printStackTrace(); }除了同步代码块和同步方法,Lock是第三种解决多线程安全问题的方式,比同步方法和同步代码块更加灵活,需要通过lock()方法上锁,再通过unlock()方法释放锁
直接上代码,unlock()方法要放在try-catch里的finally块里
public class Main { public static void main(String[] args) throws InterruptedException { try { Ticket ticket = new Ticket(); for (int i = 0; i < 3; i++) { new Thread(ticket).start(); } } catch (Exception e) { e.printStackTrace(); } } } class Ticket implements Runnable { private int mTick = 100; private Lock mLock = new ReentrantLock(); @Override public void run() { while (mTick > 0) { mLock.lock(); mTick--; System.out.println(Thread.currentThread().getName() + ":" + mTick); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } finally { mLock.unlock(); } } } }1、简介
volatile用于解决内存可见性问题,内存可见性问题是指:多个线程操作共享数据时,由于访问的都是自己线程的TLAB,从而导致彼此操作不可见的问题。关于TLAB,请参见文章JVM学习笔记之堆中相关章节
解决内存可见性问题可以用synchronized、锁和volatile等方法
2、使用
对于以下代码,主线程的demo.isFlag()一值为false,因为TLAB的存在,导致主线程和子线程中的flag值不能同步修改。
public class Main { public static void main(String[] args) throws InterruptedException { try { ThreadDemo demo = new ThreadDemo(); new Thread(demo).start(); while (true) { if (demo.isFlag()) { System.out.println("........"); break; } } } catch (Exception e) { e.printStackTrace(); } } } class ThreadDemo implements Runnable { private boolean mFlag = false; @Override public void run() { try { Thread.sleep(200); } catch (Exception e) { } mFlag = true; } public boolean isFlag() { return mFlag; } public void setFlag(boolean flag) { mFlag = flag; } }但加了volatile关键字就不一样了,volitale改变了flag的内存可见性,让子线程改变flag时,直接刷新到内存中;而主线程直接从内存中读取flag,flag的值就改成了true
public class Main { public static void main(String[] args) throws InterruptedException { try { ThreadDemo demo = new ThreadDemo(); new Thread(demo).start(); while (true) { if (demo.isFlag()) { System.out.println("........"); break; } } } catch (Exception e) { e.printStackTrace(); } } } class ThreadDemo implements Runnable { private volatile boolean mFlag = false; @Override public void run() { try { Thread.sleep(200); } catch (Exception e) { } mFlag = true; } public boolean isFlag() { return mFlag; } public void setFlag(boolean flag) { mFlag = flag; } }但随之而来的问题就是volatile会带来性能的下降,但还是比synchronized要高,不过volatile没有互斥性,且不能保证变量的原子性,仅仅是把变量操作从TLAB改到了主存中去
以ConcurrentHashMap为例,学习一下并发容器类
HashMap线程不安全,HashTable线程安全,但效率低。
ConcurrentHashMap采用锁分段机制,用concurrentLevel度量锁分段,默认值为16。每个段都维护一个哈希映射,是一个独立的锁,因此多个线程访问不同的锁分段时,不仅线程安全,而且效率高。
但jdk1.8开始,ConcurrentHashMap内部也换成了CAS算法
同步TreeMap可以换成ConcurrentSkipListMap,当读数和遍历数远大于更新数时,可以使用CopyOnWriteArrayList替换同步的ArrayList
同步容器类的用法和普通容器类一样,该怎么用还怎么用,此处略过,不过可以参见文章多线程实现数列的累加,里面用到了并发链式队列
虚假唤醒是指,当wait()方法被包裹在if语句里,当被唤醒时,会出现错误的现象,例如以下代码
class Clerk { private int mNum = 0; public synchronized void add() throws InterruptedException { if (mNum >= 1) { System.out.println(Thread.currentThread().getName() + ": " + "mNum > 1.."); wait(); } mNum += 1; System.out.println(Thread.currentThread().getName() + ": " + mNum); notifyAll(); } public synchronized void sub() throws InterruptedException { if (mNum <= 0) { System.out.println(Thread.currentThread().getName() + ": " + "mNum < 0.."); wait(); } mNum -= 1; System.out.println(Thread.currentThread().getName() + ": " + mNum); notifyAll(); } } class Producer implements Runnable { private Clerk mClerk; public Producer(Clerk mClerk) { this.mClerk = mClerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { mClerk.add(); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { private Clerk mClerk; public Consumer(Clerk mClerk) { this.mClerk = mClerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { mClerk.sub(); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }当运行两个生产者、两个消费者时,会发现mCount为负数的情况,这是因为生产者notify店员后,多个消费者都进行了mCount-1操作
为了解决虚假唤醒问题,应该把wait()方法放在循环里调用
class Clerk { private int mNum = 0; public synchronized void add() throws InterruptedException { while (mNum >= 1) { System.out.println(Thread.currentThread().getName() + ": " + "mNum > 1.."); wait(); } mNum += 1; System.out.println(Thread.currentThread().getName() + ": " + mNum); notifyAll(); } public synchronized void sub() throws InterruptedException { while (mNum <= 0) { System.out.println(Thread.currentThread().getName() + ": " + "mNum < 0.."); wait(); } mNum -= 1; System.out.println(Thread.currentThread().getName() + ": " + mNum); notifyAll(); } } class Producer implements Runnable { private Clerk mClerk; public Producer(Clerk mClerk) { this.mClerk = mClerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { mClerk.add(); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { private Clerk mClerk; public Consumer(Clerk mClerk) { this.mClerk = mClerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { mClerk.sub(); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }同步锁也可以实现生产者消费者,锁对象里的Condition对象也有类似的等待与唤醒方法
class Clerk { private int mNum = 0; private Lock mLock = new ReentrantLock(); private Condition mCondition = mLock.newCondition(); public void add() throws InterruptedException { mLock.lock(); try { while (mNum >= 1) { System.out.println(Thread.currentThread().getName() + ": " + "mNum > 1.."); mCondition.await(50, TimeUnit.MILLISECONDS); } mNum += 1; System.out.println(Thread.currentThread().getName() + ": " + mNum); mCondition.signalAll(); } finally { mLock.unlock(); } } public void sub() throws InterruptedException { mLock.lock(); try { while (mNum <= 0) { System.out.println(Thread.currentThread().getName() + ": " + "mNum < 0.."); mCondition.await(50, TimeUnit.MILLISECONDS); } mNum -= 1; System.out.println(Thread.currentThread().getName() + ": " + mNum); mCondition.signalAll(); } finally { mLock.unlock(); } } } class Producer implements Runnable { private Clerk mClerk; public Producer(Clerk mClerk) { this.mClerk = mClerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { mClerk.add(); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { private Clerk mClerk; public Consumer(Clerk mClerk) { this.mClerk = mClerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { mClerk.sub(); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }启动数量相等的生产者消费者即可验证之
利用Condition可以实现线程有序交替,比如交替打印ABC
public class Main { public static void main(String[] args) throws InterruptedException { try { AlterDemo demo = new AlterDemo(); new Thread(() -> { for (int i = 0; i < 20; i++) { demo.printA(); } }).start(); new Thread(() -> { for (int i = 0; i < 20; i++) { demo.printB(); } }).start(); new Thread(() -> { for (int i = 0; i < 20; i++) { demo.printC(); } }).start(); } catch (Exception e) { e.printStackTrace(); } } } class AlterDemo { private int mNum = 1; private Lock mLock = new ReentrantLock(); private Condition mCondition1 = mLock.newCondition(); private Condition mCondition2 = mLock.newCondition(); private Condition mCondition3 = mLock.newCondition(); public void printA() { mLock.lock(); try { while (mNum != 1) { mCondition1.await(); } System.out.println("A"); mNum = 2; mCondition2.signal(); } catch (Exception e) { } finally { mLock.unlock(); } } public void printB() { mLock.lock(); try { while (mNum != 2) { mCondition2.await(); } System.out.println("B"); mNum = 3; mCondition3.signal(); } catch (Exception e) { } finally { mLock.unlock(); } } public void printC() { mLock.lock(); try { while (mNum != 3) { mCondition3.await(); } System.out.println("C"); mNum = 1; mCondition1.signal(); } catch (Exception e) { } finally { mLock.unlock(); } } }读写锁是为了保证写写/读写互斥
使用readLock()和writeLock()方法即可获取读写锁对象的读锁和写锁
class WriteReadDemo { private int mNum = 0; private ReadWriteLock mLock = new ReentrantReadWriteLock(); public void set(int num) { mLock.writeLock().lock(); try { mNum = num; } finally { mLock.writeLock().unlock(); } } public void get() { mLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + ": " + mNum); } finally { mLock.readLock().unlock(); } } }然后启动线程验证即可
WriteReadDemo demo = new WriteReadDemo(); for (int i = 0; i < 50; i++) { new Thread(() -> demo.set((int) (Math.random() * 100)), "Write").start(); } for (int i = 0; i < 100; i++) { new Thread(demo::get, "Read" + i).start(); }为了减少新建、维护、销毁线程的开销,我们可以使用线程池来创建调度线程,而不是直接new
可以通过Executors的相关new方法创建线程池
newFixedThreadPool()创建固定大小的线程池
newCachedThreadPool()创建缓存线程池,容量可根据需求自行调整
newSingleThreadExecutor()创建单个线程池
newScheduledThreadPool()创建固定大小的线程池,定时执行任务
ExecutorService pool = Executors.newFixedThreadPool(5);创建完后,就可以通过线程池的submit()方法就可以执行任务了
for (int i = 0; i < 10; i++) { pool.submit(() -> { int sum = 0; for (int j = 0; j < 51; j++) { sum += j; } System.out.println(Thread.currentThread().getName() + ": " + sum); }); }最后要关闭线程池
pool.shutdown();shutdown()会等待所有运行着的子线程结束后关闭,而shutdownNow()会立刻关闭,终止还没有结束的子线程。
对于ScheduledExecutorService调度任务,需要调用其schedule()方法,传入runnable或callable对象、延时和延时单位
ScheduledExecutorService pool = Executors.newScheduledThreadPool(5); for (int i = 0; i < 10; i++) { pool.schedule(() -> { int sum = 0; for (int j = 0; j < 51; j++) { sum += j; } System.out.println(Thread.currentThread().getName() + ": " + sum); }, (long) (Math.random() * 1000), TimeUnit.MILLISECONDS); } pool.shutdown();如果有兴趣阅读线程池源码,可以参见文章安卓开发学习之线程池源码简析
Fork/Join框架:在必要情况下,把一个大任务拆分(Fork)成若干个小任务,直到不能再分为止;然后再把小任务的结果合并(Join)成大任务的结果
这类似于MapReduce框架,只是MapReduce的拆分过程是必然的
工作窃取模式:
当某个子任务线程完成了自己任务队列中的任务,而大任务还没有结束,那么它就会随机从别的子任务线程的队列的末尾偷一个任务来执行
先定义任务类,继承RecursiveTask父类,指定泛型,实现方法compute()
class FJSumCalculate extends RecursiveTask<Long> { private long mStart, mEnd; private static final long sTHRESHOLD = 100L; // 拆分临界值 public FJSumCalculate(long start, long end) { mStart = start; mEnd = end; } @Override protected Long compute() { long length = mEnd - mStart; if (length <= sTHRESHOLD) { long sum = 0L; for (long i = mStart; i <= mEnd; i++) { sum += i; } return sum; } // 拆分 long middle = (mEnd + mStart) / 2; FJSumCalculate left = new FJSumCalculate(mStart, middle - 1); left.fork(); FJSumCalculate right = new FJSumCalculate(middle, mEnd); right.fork(); // 合并 return left.join() + right.join(); }注意临界值的选择,不能太小
然后在主方法中测试,使用ForkJoinPool来执行任务
ForkJoinPool pool = new ForkJoinPool(); System.out.println(pool.invoke(new FJSumCalculate(1L, 99999L))); pool.shutdown();java8中也可以使用LongStream来执行类似的并行累加逻辑
LongStream.rangeClosed(1L, 99999L).parallel().reduce(Long::sum).getAsLong();JUC线程学习笔记至此结束了,欢迎大家斧正