一、线程安全并发容器JUC 1、CopyOnWriteArrayList
package com.current.flame.current; import com.current.flame.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.concurrent.*; /** * @author haoxiansheng */ @Slf4j @ThreadSafe public class CopyOnWriteArrayListExample { // 请求总数 public static int clientTotal = 5000; // 允许同时执行的线程并发数 public static int threadTotal = 200; private static List<Integer> list = new CopyOnWriteArrayList<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); // 信号量 允许有多少个同时执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); // 闭锁 for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); updateCopyOnWriteArrayLis(count); semaphore.release(); } catch (Exception e) { log.info("e=>{}", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("list.size=>{}", list.size()); } private static void updateCopyOnWriteArrayLis(int count ) { list.add(count); } } 缺点:读多写少、性能消耗、不能实时性的需求 优点:读写分离、最终一致性、开辟新的空间解决线程冲突 源码 public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }2、ConcurrentSkipListSet
package com.current.flame.current; import com.current.flame.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.Set; import java.util.concurrent.*; /** * @author haoxiansheng * 单个remove add 操作是原子性的 removeAll addAll底层调对应的remove add 所以可能被其他线程影响 */ @Slf4j @ThreadSafe public class ConcurrentSkipListSetExample { // 请求总数 public static int clientTotal = 5000; // 允许同时执行的线程并发数 public static int threadTotal = 200; private static Set<Integer> set = new ConcurrentSkipListSet<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); // 信号量 允许有多少个同时执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); // 闭锁 for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); updateConcurrentSkipListSet(count); semaphore.release(); } catch (Exception e) { log.info("e=>{}", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("set.size=>{}", set.size()); } private static void updateConcurrentSkipListSet(int count ) { set.add(count); } }3、CopyOnWriteArraySet
package com.current.flame.current; import com.current.flame.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.Set; import java.util.concurrent.*; /** * @author haoxiansheng */ @Slf4j @ThreadSafe public class CopyOnWriteArraySetExample { // 请求总数 public static int clientTotal = 5000; // 允许同时执行的线程并发数 public static int threadTotal = 200; private static Set<Integer> set = new CopyOnWriteArraySet<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); // 信号量 允许有多少个同时执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); // 闭锁 for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); updateCopyOnWriteArraySet(count); semaphore.release(); } catch (Exception e) { log.info("e=>{}", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("set.size=>{}", set.size()); } private static void updateCopyOnWriteArraySet(int count ) { set.add(count); } }4、ConcurrentHashMap
package com.current.flame.current; import com.current.flame.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.concurrent.*; /** * @author haoxiansheng */ @Slf4j @ThreadSafe public class ConcurrentHashMapExample { // 请求总数 public static int clientTotal = 5000; // 允许同时执行的线程并发数 public static int threadTotal = 200; private static Map<Integer, Integer> map = new ConcurrentHashMap<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); // 信号量 允许有多少个同时执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); // 闭锁 for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); updateConcurrentHashMap(count); semaphore.release(); } catch (Exception e) { log.info("e=>{}", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("map.size=>{}", map.size()); } private static void updateConcurrentHashMap(int count ) { map.put(count, count); } }5、ConcurrentSkipListMap
package com.current.flame.current; import com.current.flame.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.concurrent.*; /** * @author haoxiansheng */ @Slf4j @ThreadSafe public class ConcurrentSkipListMapExample { // 请求总数 public static int clientTotal = 5000; // 允许同时执行的线程并发数 public static int threadTotal = 200; private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); // 信号量 允许有多少个同时执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); // 闭锁 for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); updateConcurrentSkipListMap(count); semaphore.release(); } catch (Exception e) { log.info("e=>{}", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("map.size=>{}", map.size()); } private static void updateConcurrentSkipListMap(int count ) { map.put(count, count); } }6、ConcurrentSkipListMap是有序的、支持更高的并发
1、线程限制:一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改
2、共享只读:一个共享只读的对象,在没有额外同步的情况下,可以被多个线程并发访问,但是任何线程都不能修改它
3、线程安全对象: 一个线程安全的对象或者容器,在内部通过同步机制来保证线程安全,所以其他线程无需额外的同步就可以通过公共接口随意访问它。
4、被守护对象: 被守护对象只能通过获取特点的锁来访问