parallelStream线程不安全问题分析及解决办法

    技术2023-07-15  97

    问题代码

    public static void main(String[] args) { for (int i = 0; i < 5; i++) { //调用多次,复现多线程的问题 test(); } } public static void test() { //声明数据源集合 List<Integer> list = new ArrayList<>(); for (int i = 0; i < 100; i++) { //添加100个元素到集合中 list.add(i); } //添加数据的集合 List<Integer> list2 = new ArrayList<>(); //使用parallelStream的遍历方法来添加元素到新的集合 list.parallelStream().forEach(i -> { list2.add(i); }); //打印添加元素之后的集合长度 System.out.println(list2.size()); }

    执行结果

    100 98 100 Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598) at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) at com.example.demo.collections.LambdaList.test(LambdaList.java:25) at com.example.demo.collections.LambdaList.main(LambdaList.java:11) Caused by: java.lang.ArrayIndexOutOfBoundsException: 73 at java.util.ArrayList.add(ArrayList.java:459) at com.example.demo.collections.LambdaList.lambda$test$0(LambdaList.java:26) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.execLocalTasks(ForkJoinPool.java:1040) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1058) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Process finished with exit code 1

    问题分析

    代码很简单,就是使用parallelStream流遍历集合,并把元素添加到新的ArrayList集合中,根据执行结果可以看出来,会出现数据条数对不上,或者直接抛异常的问题。

    因为parallelStream是通过ForkJoinPoll来利用多核多线程执行任务,而ArrayList是非线程安全的,所以会有线程安全问题,解决办法也很简单,把parallelStream换成stream串行执行遍历或者把ArrayList换成线程安全的CopyOnWriteArrayList即可。因为换成stream在数据量较大的情况下会显著降低速度,所以建议换成CopyOnWriteArrayList。代码如下。

    解决方案

    方案一:替换parallelStream为stream,因为main方法代码不变,所以只贴出来test方法的代码

    public static void test() { //声明数据源集合 List<Integer> list = new ArrayList<>(); for (int i = 0; i < 100; i++) { //添加100个元素到集合中 list.add(i); } //添加数据的集合 List<Integer> list2 = new ArrayList<>(); //使用stream替换parallelStream list.stream().forEach(i -> { list2.add(i); }); //打印添加元素之后的集合长度 System.out.println(list2.size()); }

    执行结果

    100 100 100 100 100

    方案二:使用CopyOnWriteArrayList来替换ArrayList

    public static void test() { //声明数据源集合 List<Integer> list = new ArrayList<>(); for (int i = 0; i < 100; i++) { //添加100个元素到集合中 list.add(i); } //添加数据的集合,使用CopyOnWriteArrayList替换ArrayList List<Integer> list2 = new CopyOnWriteArrayList<>(); //使用parallelStream的遍历方法来添加元素到新的集合 list.parallelStream().forEach(i -> { list2.add(i); }); //打印添加元素之后的集合长度 System.out.println(list2.size()); }

    执行结果

    100 100 100 100 100
    Processed: 0.010, SQL: 9