JUC线程学习笔记

    技术2022-07-11  157

    目录

    背景

    CAS

    简介

    应用

    CountDownLatch

    简介

    应用

    Callable接口

    简介

    使用

    Lock同步锁

    简介

    使用

    volatile关键字

    并发容器类

    简介

    其他容器类

    虚假唤醒

    Condition的唤醒与等待

    读写锁

    简介

    用法

    线程池

    简介

    用法

    ForkJoinPool分支/合并框架

    简介

    使用

    代替

    结语

    背景

    继续赋闲……整理上个月学习JUC线程的笔记。JUC是java.util.concurrent包的简称,示例代码都是在jdk8的环境下运行的

    CAS

    简介

    CAS是Compare And Swap的简称,用于保证数据的原子性,这是硬件对于并发操作共享数据的支持

    包含三个值:内存值V、预估值A,更新值B

    当且仅当V == A时,V = B,否则不进行任何操作

    应用

    CAS在java中的实现类为原子数据类,可以对值进行原子性修改

    原始的修改比如i = i++分为三步:

    int temp = i;

    i = i + 1;

    i = temp;

    这在多线程的环境下显然是不安全的

    public class Main { public static void main(String[] args) { CASDemo casDemo = new CASDemo(); for (int i = 0; i < 10; i++) { new Thread(casDemo).start(); } } } class CASDemo implements Runnable { private int mNum = 0; @Override public void run() { try { Thread.sleep(500); System.out.println(Thread.currentThread().getName() + ":" + getNum()); } catch (InterruptedException e) { e.printStackTrace(); } } public int getNum() { return mNum++; } }

    输出结果可能会有值重复,但如果换成原子类,就不会有这种情况

    public class Main { public static void main(String[] args) { CASDemo casDemo = new CASDemo(); for (int i = 0; i < 10; i++) { new Thread(casDemo).start(); } } } class CASDemo implements Runnable { private AtomicInteger mAtomicNum = new AtomicInteger(0); @Override public void run() { try { Thread.sleep(500); System.out.println(Thread.currentThread().getName() + ":" + getAtomicNum()); } catch (InterruptedException e) { e.printStackTrace(); } } public int getAtomicNum() { return mAtomicNum.getAndIncrement(); } }

    原子变量除了使用CAS算法保证操作的原子性外,还使用了volatile关键字保证内存可见性

    CountDownLatch

    简介

    CountDownLatch(闭锁):在完成某些运算时,只有其他所有线程的运算全部完成,当前运算才继续执行

    应用

    用法参加以下代码,及其注释

    public class Main { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(5); // 初始化闭锁对象,给出初值 LatchDemo demo = new LatchDemo(latch); long start = System.currentTimeMillis(); for (int i = 0; i < 5; i++) { new Thread(demo).start(); } latch.await(); // 等待闭锁值为0 System.out.println("耗费时间:" + (System.currentTimeMillis() - start)); } } class LatchDemo implements Runnable { private final CountDownLatch mLatch; public LatchDemo(CountDownLatch latch) { mLatch = latch; } @Override public void run() { for (int i = 0; i < 50000; i++) { if (i % 2 == 0) { System.out.println(i); } } synchronized (mLatch) { // 由于闭锁对象被多个线程引用,所以此处加个同步锁 mLatch.countDown(); // 子线程执行完,闭锁值-1 } } }

    Callable接口

    简介

    除了实现Runnable接口和继承Thread类,实现Callable接口是实现线程的第三种方式,相比于实现Runnable接口,Callable接口需要给出类型,方法有返回值和异常,且需要FutureTask的支持

    使用

    先实现Callable接口,指定泛型为String,生成长度为10的随机字母序列

    class CallableDemo implements Callable<String> { private static final String sCHARACTERS = "abcdefghijklmnopqrstuvmxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; private final Random mRandom = new Random(); @Override public String call() throws Exception { StringBuilder builder = new StringBuilder(); for (int i = 0; i < 10; i++) { int index = Math.abs(mRandom.nextInt()) % sCHARACTERS.length(); builder.append(sCHARACTERS.charAt(index)); } return builder.toString(); } }

    再使用FutureTask执行任务并获取结果

    public class Main { public static void main(String[] args) throws InterruptedException { try { CallableDemo demo = new CallableDemo(); FutureTask<String> task = new FutureTask<>(demo); // 实例化FutureTask,泛型和Callable的泛型一致 new Thread(task).start(); System.out.println(task.get()); // get()方法会阻塞,直到结果返回。因此FutureTask也可以用于闭锁 } catch (ExecutionException e) { e.printStackTrace(); } } }

    用FutureTask实现闭锁来统计线程运行时间,也比较容易

    try { CallableDemo demo = new CallableDemo(); long start = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { FutureTask<String> task = new FutureTask<>(demo); new Thread(task).start(); System.out.println(task.get()); } System.out.println("耗费时间:" + (System.currentTimeMillis() - start)); } catch (ExecutionException e) { e.printStackTrace(); }

    Lock同步锁

    简介

    除了同步代码块和同步方法,Lock是第三种解决多线程安全问题的方式,比同步方法和同步代码块更加灵活,需要通过lock()方法上锁,再通过unlock()方法释放锁

    使用

    直接上代码,unlock()方法要放在try-catch里的finally块里

    public class Main { public static void main(String[] args) throws InterruptedException { try { Ticket ticket = new Ticket(); for (int i = 0; i < 3; i++) { new Thread(ticket).start(); } } catch (Exception e) { e.printStackTrace(); } } } class Ticket implements Runnable { private int mTick = 100; private Lock mLock = new ReentrantLock(); @Override public void run() { while (mTick > 0) { mLock.lock(); mTick--; System.out.println(Thread.currentThread().getName() + ":" + mTick); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } finally { mLock.unlock(); } } } }

    volatile关键字

    1、简介

    volatile用于解决内存可见性问题,内存可见性问题是指:多个线程操作共享数据时,由于访问的都是自己线程的TLAB,从而导致彼此操作不可见的问题。关于TLAB,请参见文章JVM学习笔记之堆中相关章节

    解决内存可见性问题可以用synchronized、锁和volatile等方法

    2、使用

    对于以下代码,主线程的demo.isFlag()一值为false,因为TLAB的存在,导致主线程和子线程中的flag值不能同步修改。

    public class Main { public static void main(String[] args) throws InterruptedException { try { ThreadDemo demo = new ThreadDemo(); new Thread(demo).start(); while (true) { if (demo.isFlag()) { System.out.println("........"); break; } } } catch (Exception e) { e.printStackTrace(); } } } class ThreadDemo implements Runnable { private boolean mFlag = false; @Override public void run() { try { Thread.sleep(200); } catch (Exception e) { } mFlag = true; } public boolean isFlag() { return mFlag; } public void setFlag(boolean flag) { mFlag = flag; } }

    但加了volatile关键字就不一样了,volitale改变了flag的内存可见性,让子线程改变flag时,直接刷新到内存中;而主线程直接从内存中读取flag,flag的值就改成了true

    public class Main { public static void main(String[] args) throws InterruptedException { try { ThreadDemo demo = new ThreadDemo(); new Thread(demo).start(); while (true) { if (demo.isFlag()) { System.out.println("........"); break; } } } catch (Exception e) { e.printStackTrace(); } } } class ThreadDemo implements Runnable { private volatile boolean mFlag = false; @Override public void run() { try { Thread.sleep(200); } catch (Exception e) { } mFlag = true; } public boolean isFlag() { return mFlag; } public void setFlag(boolean flag) { mFlag = flag; } }

    但随之而来的问题就是volatile会带来性能的下降,但还是比synchronized要高,不过volatile没有互斥性,且不能保证变量的原子性,仅仅是把变量操作从TLAB改到了主存中去

    并发容器类

    以ConcurrentHashMap为例,学习一下并发容器类

    简介

    HashMap线程不安全,HashTable线程安全,但效率低。

    ConcurrentHashMap采用锁分段机制,用concurrentLevel度量锁分段,默认值为16。每个段都维护一个哈希映射,是一个独立的锁,因此多个线程访问不同的锁分段时,不仅线程安全,而且效率高。

    但jdk1.8开始,ConcurrentHashMap内部也换成了CAS算法

    其他容器类

    同步TreeMap可以换成ConcurrentSkipListMap,当读数和遍历数远大于更新数时,可以使用CopyOnWriteArrayList替换同步的ArrayList

    同步容器类的用法和普通容器类一样,该怎么用还怎么用,此处略过,不过可以参见文章多线程实现数列的累加,里面用到了并发链式队列

    虚假唤醒

    虚假唤醒是指,当wait()方法被包裹在if语句里,当被唤醒时,会出现错误的现象,例如以下代码

    class Clerk { private int mNum = 0; public synchronized void add() throws InterruptedException { if (mNum >= 1) { System.out.println(Thread.currentThread().getName() + ": " + "mNum > 1.."); wait(); } mNum += 1; System.out.println(Thread.currentThread().getName() + ": " + mNum); notifyAll(); } public synchronized void sub() throws InterruptedException { if (mNum <= 0) { System.out.println(Thread.currentThread().getName() + ": " + "mNum < 0.."); wait(); } mNum -= 1; System.out.println(Thread.currentThread().getName() + ": " + mNum); notifyAll(); } } class Producer implements Runnable { private Clerk mClerk; public Producer(Clerk mClerk) { this.mClerk = mClerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { mClerk.add(); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { private Clerk mClerk; public Consumer(Clerk mClerk) { this.mClerk = mClerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { mClerk.sub(); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }

    当运行两个生产者、两个消费者时,会发现mCount为负数的情况,这是因为生产者notify店员后,多个消费者都进行了mCount-1操作

    为了解决虚假唤醒问题,应该把wait()方法放在循环里调用

    class Clerk { private int mNum = 0; public synchronized void add() throws InterruptedException { while (mNum >= 1) { System.out.println(Thread.currentThread().getName() + ": " + "mNum > 1.."); wait(); } mNum += 1; System.out.println(Thread.currentThread().getName() + ": " + mNum); notifyAll(); } public synchronized void sub() throws InterruptedException { while (mNum <= 0) { System.out.println(Thread.currentThread().getName() + ": " + "mNum < 0.."); wait(); } mNum -= 1; System.out.println(Thread.currentThread().getName() + ": " + mNum); notifyAll(); } } class Producer implements Runnable { private Clerk mClerk; public Producer(Clerk mClerk) { this.mClerk = mClerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { mClerk.add(); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { private Clerk mClerk; public Consumer(Clerk mClerk) { this.mClerk = mClerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { mClerk.sub(); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }

    Condition的唤醒与等待

    同步锁也可以实现生产者消费者,锁对象里的Condition对象也有类似的等待与唤醒方法

    class Clerk { private int mNum = 0; private Lock mLock = new ReentrantLock(); private Condition mCondition = mLock.newCondition(); public void add() throws InterruptedException { mLock.lock(); try { while (mNum >= 1) { System.out.println(Thread.currentThread().getName() + ": " + "mNum > 1.."); mCondition.await(50, TimeUnit.MILLISECONDS); } mNum += 1; System.out.println(Thread.currentThread().getName() + ": " + mNum); mCondition.signalAll(); } finally { mLock.unlock(); } } public void sub() throws InterruptedException { mLock.lock(); try { while (mNum <= 0) { System.out.println(Thread.currentThread().getName() + ": " + "mNum < 0.."); mCondition.await(50, TimeUnit.MILLISECONDS); } mNum -= 1; System.out.println(Thread.currentThread().getName() + ": " + mNum); mCondition.signalAll(); } finally { mLock.unlock(); } } } class Producer implements Runnable { private Clerk mClerk; public Producer(Clerk mClerk) { this.mClerk = mClerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { mClerk.add(); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { private Clerk mClerk; public Consumer(Clerk mClerk) { this.mClerk = mClerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { mClerk.sub(); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }

    启动数量相等的生产者消费者即可验证之

    利用Condition可以实现线程有序交替,比如交替打印ABC

    public class Main { public static void main(String[] args) throws InterruptedException { try { AlterDemo demo = new AlterDemo(); new Thread(() -> { for (int i = 0; i < 20; i++) { demo.printA(); } }).start(); new Thread(() -> { for (int i = 0; i < 20; i++) { demo.printB(); } }).start(); new Thread(() -> { for (int i = 0; i < 20; i++) { demo.printC(); } }).start(); } catch (Exception e) { e.printStackTrace(); } } } class AlterDemo { private int mNum = 1; private Lock mLock = new ReentrantLock(); private Condition mCondition1 = mLock.newCondition(); private Condition mCondition2 = mLock.newCondition(); private Condition mCondition3 = mLock.newCondition(); public void printA() { mLock.lock(); try { while (mNum != 1) { mCondition1.await(); } System.out.println("A"); mNum = 2; mCondition2.signal(); } catch (Exception e) { } finally { mLock.unlock(); } } public void printB() { mLock.lock(); try { while (mNum != 2) { mCondition2.await(); } System.out.println("B"); mNum = 3; mCondition3.signal(); } catch (Exception e) { } finally { mLock.unlock(); } } public void printC() { mLock.lock(); try { while (mNum != 3) { mCondition3.await(); } System.out.println("C"); mNum = 1; mCondition1.signal(); } catch (Exception e) { } finally { mLock.unlock(); } } }

    读写锁

    简介

    读写锁是为了保证写写/读写互斥

    用法

    使用readLock()和writeLock()方法即可获取读写锁对象的读锁和写锁

    class WriteReadDemo { private int mNum = 0; private ReadWriteLock mLock = new ReentrantReadWriteLock(); public void set(int num) { mLock.writeLock().lock(); try { mNum = num; } finally { mLock.writeLock().unlock(); } } public void get() { mLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + ": " + mNum); } finally { mLock.readLock().unlock(); } } }

    然后启动线程验证即可

    WriteReadDemo demo = new WriteReadDemo(); for (int i = 0; i < 50; i++) { new Thread(() -> demo.set((int) (Math.random() * 100)), "Write").start(); } for (int i = 0; i < 100; i++) { new Thread(demo::get, "Read" + i).start(); }

    线程池

    简介

    为了减少新建、维护、销毁线程的开销,我们可以使用线程池来创建调度线程,而不是直接new

    用法

    可以通过Executors的相关new方法创建线程池

    newFixedThreadPool()创建固定大小的线程池

    newCachedThreadPool()创建缓存线程池,容量可根据需求自行调整

    newSingleThreadExecutor()创建单个线程池

    newScheduledThreadPool()创建固定大小的线程池,定时执行任务

    ExecutorService pool = Executors.newFixedThreadPool(5);

    创建完后,就可以通过线程池的submit()方法就可以执行任务了

    for (int i = 0; i < 10; i++) { pool.submit(() -> { int sum = 0; for (int j = 0; j < 51; j++) { sum += j; } System.out.println(Thread.currentThread().getName() + ": " + sum); }); }

    最后要关闭线程池

    pool.shutdown();

    shutdown()会等待所有运行着的子线程结束后关闭,而shutdownNow()会立刻关闭,终止还没有结束的子线程。

    对于ScheduledExecutorService调度任务,需要调用其schedule()方法,传入runnable或callable对象、延时和延时单位

    ScheduledExecutorService pool = Executors.newScheduledThreadPool(5); for (int i = 0; i < 10; i++) { pool.schedule(() -> { int sum = 0; for (int j = 0; j < 51; j++) { sum += j; } System.out.println(Thread.currentThread().getName() + ": " + sum); }, (long) (Math.random() * 1000), TimeUnit.MILLISECONDS); } pool.shutdown();

    如果有兴趣阅读线程池源码,可以参见文章安卓开发学习之线程池源码简析

    ForkJoinPool分支/合并框架

    简介

    Fork/Join框架:在必要情况下,把一个大任务拆分(Fork)成若干个小任务,直到不能再分为止;然后再把小任务的结果合并(Join)成大任务的结果

    这类似于MapReduce框架,只是MapReduce的拆分过程是必然的

     

    工作窃取模式:

    当某个子任务线程完成了自己任务队列中的任务,而大任务还没有结束,那么它就会随机从别的子任务线程的队列的末尾偷一个任务来执行

    使用

    先定义任务类,继承RecursiveTask父类,指定泛型,实现方法compute()

    class FJSumCalculate extends RecursiveTask<Long> { private long mStart, mEnd; private static final long sTHRESHOLD = 100L; // 拆分临界值 public FJSumCalculate(long start, long end) { mStart = start; mEnd = end; } @Override protected Long compute() { long length = mEnd - mStart; if (length <= sTHRESHOLD) { long sum = 0L; for (long i = mStart; i <= mEnd; i++) { sum += i; } return sum; } // 拆分 long middle = (mEnd + mStart) / 2; FJSumCalculate left = new FJSumCalculate(mStart, middle - 1); left.fork(); FJSumCalculate right = new FJSumCalculate(middle, mEnd); right.fork(); // 合并 return left.join() + right.join(); }

    注意临界值的选择,不能太小

    然后在主方法中测试,使用ForkJoinPool来执行任务

    ForkJoinPool pool = new ForkJoinPool(); System.out.println(pool.invoke(new FJSumCalculate(1L, 99999L))); pool.shutdown();

    代替

    java8中也可以使用LongStream来执行类似的并行累加逻辑

    LongStream.rangeClosed(1L, 99999L).parallel().reduce(Long::sum).getAsLong();

    结语

    JUC线程学习笔记至此结束了,欢迎大家斧正

    Processed: 0.017, SQL: 9