HadoopReduce中v会出现对象重用

    技术2022-07-10  143

    Hadoop Reduce中v会出现对象重用( objects reuse )

    void reduce(K2 key, Iterator<V2> values, OutputCollector<K3,V3> output, Reporter reporter) throws IOExceptionReduces values for a given key.

    The framework calls this method for each <key, (list of values)> pair in the grouped inputs. Output values must be of the same type as input values. Input keys must not be altered. The framework will reuse the key and value objects that are passed into the reduce, therefore the application should clone the objects they want to keep a copy of. In many cases, all values are combined into zero or one value.

    Output pairs are collected with calls to OutputCollector.collect(Object,Object).

    Applications can use the Reporter provided to report progress or just indicate that they are alive. In scenarios where the application takes a significant amount of time to process individual key/value pairs, this is crucial since the framework might assume that the task has timed-out and kill that task. The other way of avoiding this is to set mapreduce.task.timeout to a high-enough value (or even zero for no time-outs).

    Parameters:

    key - the key. values - the list of values to reduce. output - to collect keys and combined values. reporter - facility to report progress.

    Throws:

    IOException

    reduce方法的javadoc中已经说明了会出现的问题:

    The framework calls this method for each <key, (list of values)> pair in the grouped inputs. Output values must be of the same type as input values. Input keys must not be altered. The framework will reuse the key and value objects that are passed into the reduce, therefore the application should clone the objects they want to keep a copy of.

    ​ 也就是说虽然reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象(注意:这里是指在一次reduce函数的调用过程中,会重复使用该两个对象;下一次reduce函数的调用,对象和上一次reduce函数中对象是不一样的。)。所以如果要保存key或者value的结果,只能将其中的值取出另存或者重新clone一个对象(例如Text store = new Text(value) 或者 String a = value.toString()),而不能直接赋引用。因为引用从始至终都是指向同一个对象,你如果直接保存它们,那最后它们都指向最后一个输入记录。会影响最终计算结果而出错。

    ArrayList<WebLogBean> beans = new ArrayList<WebLogBean>(); for (WebLogBean b : values) { beans.add(b); } for (WebLogBean bean : values) { WebLogBean webLogBean = new WebLogBean(); try { BeanUtils.copyProperties(webLogBean, bean); } catch(Exception e) { e.printStackTrace(); } beans.add(webLogBean); }

    The Iterator you receive from that Iterable’s iterator() method is special. The values may not all be in memory; Hadoop may be streaming them from disk. They aren’t really backed by a Collection, so it’s nontrivial to allow multiple iterations.

    reduce函数中的Iterator只能迭代一次

    最后想说明的是:hadoop 框架的作者们真的是考虑很周全,在 hadoop 框架中,不仅有对象重用,还有 JVM 重用等,节约一切可以节约的资源,提高一切可以提高的性能。因为在这种海量数据处理的场景下,性能优化是非常重要的,你可能处理100条数据体现不出性能差别,但是你面对的是千亿、万亿级别的数据呢?

    Processed: 0.044, SQL: 9