【高并发】08线程安全同步容器

    技术2022-07-10  126

    一、Vector

    1、代码

    package com.current.flame.syncContainer; import com.current.flame.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author haoxiansheng */ @Slf4j @ThreadSafe public class VectorExample1 { // 请求总数 public static int clientTotal = 5000; // 允许同时执行的线程并发数 public static int threadTotal = 200; private static Vector<Integer> vector = new Vector<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); // 信号量 允许有多少个同时执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); // 闭锁 for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); updateVector(count); semaphore.release(); } catch (Exception e) { log.info("e=>{}", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("vector.size=>{}", vector.size()); } private static void updateVector(int count ) { vector.add(count); } }

    2、不安全

    package com.current.flame.syncContainer; import com.current.flame.annoations.NotThreadSafe; import java.util.Vector; /** * @author haoxiansheng * 同步容器因为线程执行顺序不同引发线程安全问题 */ @NotThreadSafe public class VectorExample2 { private static Vector<Integer> vector = new Vector<>(); public static void main(String[] args) { while (true) { for (int i = 0; i < 10; i++) { vector.add(i); } new Thread(() -> { for (int i = 0; i < vector.size(); i++) { vector.remove(i); } }).start(); new Thread(() -> { for (int i = 0; i < vector.size(); i++) { vector.get(i); } }).start(); } } }

    3、遍历出错

    package com.current.flame.syncContainer; import java.util.Iterator; import java.util.List; import java.util.Vector; /** * @author haoxiansheng * 集合遍历是同时做删除操作引发安全问题 */ public class VectorExample3 { public static void main(String[] args) { Vector<Integer> vector = new Vector<>(); //List<Integer> vector = new ArrayList<>(); vector.add(1); vector.add(2); vector.add(3); //testVocator1(vector); //testVocator2(vector); //testVocator3(vector); vector.forEach(x -> { //ConcurrentModificationException if (x.equals(3)) { vector.remove(x); } }); } // ConcurrentModificationException private static void testVocator1( List<Integer> vector) { // foreach for (Integer integer: vector) { if (integer.equals(3)) { vector.remove(integer); } } } // ConcurrentModificationException private static void testVocator1( Vector<Integer> vector) { // foreach for (Integer integer: vector) { if (integer.equals(3)) { vector.remove(integer); } } } // ConcurrentModificationException private static void testVocator2( Vector<Integer> vector) { // iterator Iterator<Integer> iterator = vector.iterator(); while (iterator.hasNext()) { Integer integer = iterator.next(); if (integer.equals(3)) { vector.remove(integer); } } } // success private static void testVocator3( Vector<Integer> vector) { for (int i = 0; i < vector.size(); i++) { if (vector.get(i).equals(3)) { vector.remove(i); } } } }

    二、HashTable(key、value 不能为null)

    1、代码 synchronizedList

    package com.current.flame.syncContainer; import com.current.flame.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.Hashtable; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author haoxiansheng */ @Slf4j @ThreadSafe public class HashTableExample { // 请求总数 public static int clientTotal = 5000; // 允许同时执行的线程并发数 public static int threadTotal = 200; private static Map<Integer, Integer> map = new Hashtable<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); // 信号量 允许有多少个同时执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); // 闭锁 for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); updateHashtable(count); semaphore.release(); } catch (Exception e) { log.info("e=>{}", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("map.size=>{}", map.size()); } private static void updateHashtable(int count ) { map.put(count, count); } }

    三、Collection.synchronizedXXX(List、Set、Map)

    1、

    package com.current.flame.syncContainer; import com.current.flame.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author haoxiansheng */ @Slf4j @ThreadSafe public class CollectionsExample1 { // 请求总数 public static int clientTotal = 5000; // 允许同时执行的线程并发数 public static int threadTotal = 200; private static List<Integer> list = Collections.synchronizedList(new ArrayList<>()); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); // 信号量 允许有多少个同时执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); // 闭锁 for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); updateSynchronizedList(count); semaphore.release(); } catch (Exception e) { log.info("e=>{}", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("list.size=>{}", list.size()); } private static void updateSynchronizedList(int count ) { list.add(count); } }

    2、 代码 synchronizedSet

    package com.current.flame.syncContainer; import com.current.flame.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author haoxiansheng */ @Slf4j @ThreadSafe public class CollectionsExample2 { // 请求总数 public static int clientTotal = 5000; // 允许同时执行的线程并发数 public static int threadTotal = 200; private static Set<Integer> set = Collections.synchronizedSet(new HashSet<>()); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); // 信号量 允许有多少个同时执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); // 闭锁 for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); updateSynchronizedSet(count); semaphore.release(); } catch (Exception e) { log.info("e=>{}", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("set.size=>{}", set.size()); } private static void updateSynchronizedSet(int count ) { set.add(count); } }

    3、代码 synchronizedMap

    package com.current.flame.syncContainer; import com.current.flame.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author haoxiansheng */ @Slf4j @ThreadSafe public class CollectionsExample3 { // 请求总数 public static int clientTotal = 5000; // 允许同时执行的线程并发数 public static int threadTotal = 200; private static Map<Integer, Integer> map = Collections.synchronizedMap(new HashMap<>()); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); // 信号量 允许有多少个同时执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); // 闭锁 for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); updateSynchronizedMap(count); semaphore.release(); } catch (Exception e) { log.info("e=>{}", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("map.size=>{}", map.size()); } private static void updateSynchronizedMap(int count ) { map.put(count, count); } }
    Processed: 0.016, SQL: 9