在 Java 5.0 提供了 java.util.concurrent (简称JUC )包,在此包中增加了在并发编程中很常用 的实用工具类,用于定义类似于线程的自定义子系统,包括线程池、异步 IO 和轻量级任务框架。 提供可调的、灵活的线程池。还提供了设计用于多线程上下文中的 Collection 实现等。
下面给出一个模拟CAS算法的例子
/* * 模拟 CAS 算法 */ public class TestCompareAndSwap { public static void main(String[] args) { final CompareAndSwap cas = new CompareAndSwap(); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { int expectedValue = cas.get(); boolean b = cas.compareAndSet(expectedValue, (int)(Math.random() * 101)); System.out.println(b); } }).start(); } } } class CompareAndSwap{ private int value; //获取内存值 public synchronized int get(){ return value; } //比较 public synchronized int compareAndSwap(int expectedValue, int newValue){ int oldValue = value; if(oldValue == expectedValue){ this.value = newValue; } return oldValue; } //设置 public synchronized boolean compareAndSet(int expectedValue, int newValue){ return expectedValue == compareAndSwap(expectedValue, newValue); } }测试CopyOnWriteArrayList
/* * CopyOnWriteArrayList/CopyOnWriteArraySet : “写入并复制” * 注意:添加操作多时,效率低,因为每次添加时都会进行复制,开销非常的大。并发迭代操作多时可以选择。 */ public class TestCopyOnWriteArrayList { public static void main(String[] args) { HelloThread ht = new HelloThread(); for (int i = 0; i < 10; i++) { new Thread(ht).start(); } } } class HelloThread implements Runnable{ // private static List<String> list = Collections.synchronizedList(new ArrayList<String>()); private static CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>(); static{ list.add("AA"); list.add("BB"); list.add("CC"); } @Override public void run() { Iterator<String> it = list.iterator(); while(it.hasNext()){ System.out.println(it.next()); list.add("AA"); } } }显示锁 Lock
在 Java 5.0 之前,协调共享对象的访问时可以使用的机制只有 synchronized 和 volatile 。Java 5.0 后增加了一些新的机制,但并不是一种替代内置锁的方法,而是当内置锁不适用时,作为一种可选择的高级功能。ReentrantLock 实现了 Lock 接口,并提供了与synchronized 相同的互斥性和内存可见性。但相较于 synchronized 提供了更高的处理锁的灵活性。 /* * 一、用于解决多线程安全问题的方式: * * synchronized:隐式锁 * 1. 同步代码块 * * 2. 同步方法 * * jdk 1.5 后: * 3. 同步锁 Lock * 注意:是一个显示锁,需要通过 lock() 方法上锁,必须通过 unlock() 方法进行释放锁 */ public class TestLock { public static void main(String[] args) { Ticket ticket = new Ticket(); new Thread(ticket, "1号窗口").start(); new Thread(ticket, "2号窗口").start(); new Thread(ticket, "3号窗口").start(); } } class Ticket implements Runnable{ private int tick = 100; private Lock lock = new ReentrantLock(); @Override public void run() { while(true){ lock.lock(); //上锁 try{ if(tick > 0){ try { Thread.sleep(200); } catch (InterruptedException e) { } System.out.println(Thread.currentThread().getName() + " 完成售票,余票为:" + --tick); } }finally{ lock.unlock(); //释放锁 } } } }下面看一个生产者消费者案例来使用Condition 控制线程通信
/* * 生产者消费者案例: */ public class TestProductorAndConsumerForLock { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor pro = new Productor(clerk); Consumer con = new Consumer(clerk); new Thread(pro, "生产者 A").start(); new Thread(con, "消费者 B").start(); new Thread(pro, "生产者 C").start(); new Thread(con, "消费者 D").start(); } } class Clerk { private int product = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); // 进货 public void get() { lock.lock(); try { //if (product >= 1) { 存在虚假唤醒 while (product >= 1) { System.out.println("产品已满!"); try { condition.await(); // 应该使用在循环中,以防有虚假唤醒 } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + " : " + ++product); condition.signalAll(); } finally { lock.unlock(); } } // 卖货 public void sale() { lock.lock(); try { // if (product <= 0) { while (product <= 0) { System.out.println("缺货!"); try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + " : " + --product); condition.signalAll(); } finally { lock.unlock(); } } } // 生产者 class Productor implements Runnable { private Clerk clerk; public Productor(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } clerk.get(); } } } // 消费者 class Consumer implements Runnable { private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 20; i++) { clerk.sale(); } } }ScheduledExecutorService 一个 ExecutorService,可安排在给定的延迟后运行或定期执行的命令。
/* * 一、线程池:提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应的速度。 * * 二、线程池的体系结构: * java.util.concurrent.Executor : 负责线程的使用与调度的根接口 * |--**ExecutorService 子接口: 线程池的主要接口 * |--ThreadPoolExecutor 线程池的实现类 * |--ScheduledExecutorService 子接口:负责线程的调度 * |--ScheduledThreadPoolExecutor :继承 ThreadPoolExecutor, 实现 ScheduledExecutorService * * 三、工具类 : Executors * ExecutorService newFixedThreadPool() : 创建固定大小的线程池 * ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。 * ExecutorService newSingleThreadExecutor() : 创建单个线程池。线程池中只有一个线程 * * ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务。 */ public class TestScheduledThreadPool { public static void main(String[] args) throws Exception { ScheduledExecutorService pool = Executors.newScheduledThreadPool(5); for (int i = 0; i < 5; i++) { Future<Integer> result = pool.schedule(new Callable<Integer>(){ @Override public Integer call() throws Exception { int num = new Random().nextInt(100);//生成随机数 System.out.println(Thread.currentThread().getName() + " : " + num); return num; } }, 1, TimeUnit.SECONDS); System.out.println(result.get()); } pool.shutdown(); } }Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行 join 汇总。
Fork/Join 框架与线程池的区别:
采用 “工作窃取”模式(work-stealing): 当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程的等待时间,提高了性能。使用Fork/Join进行计算
public class TestForkJoinPool { public static void main(String[] args) { Instant start = Instant.now(); ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, 50000000000L); Long sum = pool.invoke(task); System.out.println(sum); Instant end = Instant.now(); System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());//166-1996-10590 } @Test public void test1(){ Instant start = Instant.now(); long sum = 0L; for (long i = 0L; i <= 50000000000L; i++) { sum += i; } System.out.println(sum); Instant end = Instant.now(); System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());//35-3142-15704 } //java8 新特性 @Test public void test2(){ Instant start = Instant.now(); Long sum = LongStream.rangeClosed(0L, 50000000000L) .parallel() .reduce(0L, Long::sum); System.out.println(sum); Instant end = Instant.now(); System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());//1536-8118 } } class ForkJoinSumCalculate extends RecursiveTask<Long>{ /** * */ private static final long serialVersionUID = -259195479995561737L; private long start; private long end; private static final long THURSHOLD = 10000L; //临界值 public ForkJoinSumCalculate(long start, long end) { this.start = start; this.end = end; } @Override protected Long compute() { long length = end - start; if(length <= THURSHOLD){ long sum = 0L; for (long i = start; i <= end; i++) { sum += i; } return sum; }else{ long middle = (start + end) / 2; ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle); left.fork(); //进行拆分,同时压入线程队列 ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle+1, end); right.fork(); // return left.join() + right.join(); } } }