stream.parallel.forEach()中执行的操作非线程安全,可使用ReentrantLock和实现collector接口来解决
https://www.cnblogs.com/puyangsky/p/7608741.html
import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collector; import java.util.stream.IntStream; public class Test { private static List<Integer> list1 = new ArrayList<>(); private static List<Integer> list2 = new ArrayList<>(); private static List<Integer> list3 = new ArrayList<>(); private static Lock lock = new ReentrantLock(); private List<Integer> nums = new ArrayList<>(); private Set<Integer> result = new HashSet<>(); public class Container { // 定义本地的result public Set<Integer> set; public Container() { this.set = new HashSet<>(); } public Container accumulate(int num) { this.set.add((2 * num)); return this; } public Container combine(Container container) { this.set.addAll(container.set); return this; } public Set<Integer> getResult() { return this.set; } } public void run(){ IntStream.range(0,10000).forEach(nums::add); //实现collector接口 result = nums.parallelStream().collect(new Collector<Integer, Container, Set<Integer>>() { //拆分 @Override public Supplier<Container> supplier() { return Container::new; } //分开运算 @Override public BiConsumer<Container, Integer> accumulator() { return Container::accumulate; } //合并 @Override public BinaryOperator<Container> combiner() { return Container::combine; } @Override public Function<Container, Set<Integer>> finisher() { return Container::getResult; } @Override public Set<Characteristics> characteristics() { return Collections.emptySet(); } }); } public static void main(String[] args) { IntStream.range(0, 10000).forEach(list1::add); IntStream.range(0, 10000).parallel().forEach(list2::add); IntStream.range(0, 10000).parallel().forEach(i -> { lock.lock(); try { list3.add(i); }finally { lock.unlock(); } }); System.out.println("串行执行的大小:" + list1.size()+"->大小不变"); System.out.println("并行执行的大小:" + list2.size()+"->比原始数据少"); System.out.println("加锁并行执行的大小:" + list3.size()+"->大小不变"); Test test = new Test(); test.run(); System.out.println("原始数据大小:"+test.nums.size()); test.nums.forEach(i -> System.out.print(i + " ")); System.out.println("\n\ncollect方法加工后的数据大小:"+test.result.size()); test.result.forEach(i -> System.out.print(i + " ")); } }