3G基础知识
Hadoop备忘:Reduce阶段Iterablevalues中的每个值都共享一个对象
/**
* Iterate through the values for the current key, reusing the same value
* object, which is stored in the context.
* @return the series of values associated with the current key. All of the
* objects returned directly and indirectly from this method are reused.
*/
public
Iterable getValues() throws IOException, InterruptedException {
return iterable;
}
在Reduce阶段,具有相同key的的所有的value都会被组织到一起,形成一种key:values的形式。
一般情况下,我们会针对某个key下的所有的values进行处理,这里需要注意一个问题,当我们写下如下代码的时候:
protected void reduce(KEYIN key, Iterable values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
我们在一个循环中,每次得到的value实际上都是指向的同一个对象,只是在每次迭代的时候,将新的值反序列化到这个对象中,以更新此对象的值:
/**
* Advance to the next key/value pair.
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!hasMore) {
key = null;
value = null;
return false;
}
firstValue = !nextKeyIsSame;
DataInputBuffer nextKey = input.getKey();
currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition());
buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
key = keyDeserializer.deserialize(key);
DataInputBuffer nextVal = input.getValue();
buffer.reset(nextVal.getData(), nextVal.getPosition(),
nextVal.getLength() - nextVal.getPosition());
value = valueDeserializer.deserialize(value);
currentKeyLength = nextKey.getLength() - nextKey.getPosition();
currentValueLength = nextVal.getLength() - nextVal.getPosition();
hasMore = input.next();
if (hasMore) {
nextKey = input.getKey();
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
currentRawKey.getLength(),
nextKey.getData(),
nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition()
) == 0;
} else {
nextKeyIsSame = false;
}
inputValueCounter.increment(1);
return true;
}
为什么要注意这种情况呢?正如本文开始引用的那段Hadoop源代码中的注释所指明的,这里主要涉及到引用。如果值的类型是自己实现的某种Writable,比如说AType,并且在其中持有对其它对象的引用,同时,在Reduce阶段还要对相邻的两个值(current_value,value)进行同时进行操作,这个时候,如果你仅仅是将value强制类型转换成相应的AType,这个时候,current_value和value其实是指向的同一段内存空间,也就是说,当我们迭代完第一次的时候,current_value缓存了当前的值,但是当进行下一次迭代,取到的value,其实就是将它们共同指向的那段内存做了更新,换句话说,current_value所指向的内存也会被更新成value的值,如果不了解Reduce阶段values的迭代实现,很可能就会造成莫名其妙的程序行为,而解决方案就是创建一个全新的对象来缓存“上一个”值,从而避免这种情况。
全部0条评论
快来发表一下你的评论吧 !