Java8 paralleStream的线程安全性

/ Java / 没有评论 / 198浏览

parallelStream

Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。

    public static void main(String[] args) {
        List<Integer> list1 = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            list1.add(i);
        }
        List<String> list2 =new ArrayList<>();
        list1.parallelStream().forEach(i-> list2.add(String.valueOf(i)));
        System.out.println("null count : "+list2.stream().filter(Objects::isNull).count());
        List<String> list3 = Collections.synchronizedList(new ArrayList<>());
        list1.parallelStream().forEach(i-> list3.add(String.valueOf(i)));
        System.out.println("null count : "+list3.stream().filter(Objects::isNull).count());
    }

输出


null count : 108
null count : 0

Process finished with exit code 0

list2的实现没有保证线程安全,导致赋值时可能多个任务重复赋值到list2的一个位置,剩下的位置没得到赋值,最后结果为null。 解决方法就是list2,list3使用线程安全的实现或者用Stream的collect()方法。


public static void main(String[] args) {
        List<Integer> list1 = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            list1.add(i);
        }
        List<String> list2 =new ArrayList<>();
        list1.parallelStream().forEach(i-> list2.add(String.valueOf(i)));
        System.out.println("null count : "+list2.stream().filter(Objects::isNull).count());
        List<String> list3 = Collections.synchronizedList(new ArrayList<>());
        list1.parallelStream().forEach(i-> list3.add(String.valueOf(i)));
        System.out.println("null count : "+list3.stream().filter(Objects::isNull).count());
        List<String> list4 = list1.parallelStream().map(i -> String.valueOf(i)).collect(Collectors.toList());
        System.out.println("null count : "+list4.stream().filter(Objects::isNull).count());
    }