英文:
Accessing HashMap inside flatMapToPair
问题
Edit: 已经使用 RDD.collectAsMap()
解决。
我试图复制解决方案,来自于 http://on-demand.gputechconf.com/gtc/2016/presentation/S6424-michela-taufer-apache-spark.pdf 第28-30页的问题。
我有一个HashMap<Integer, Integer>,我在map函数外部实例化。HashMap包含以下数据:
{1:2, 2:3, 3:2, 4:2, 5:3}
之前定义的RDD previousRDD 具有类型:
JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>>
具有数据:
1: [(1,2), (1,5)]
2: [(2,1), (2,3), (2,5)]
3: [(3,2), (3,4)]
4: [(4,3), (4,5)]
5: [(5,1), (5,2), (5,4)]
我试图使用flatMapToPair创建一个新的RDD:
JavaPairRDD<Integer, Integer> newRDD = previousRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>>, Integer, Integer>() {
@Override
public Iterator<Tuple2<Integer, Integer>> call(Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>> integerIterableTuple2) throws Exception {
Integer count;
ArrayList<Tuple2<Integer, Integer>> list = new ArrayList<>();
count = hashMap.get(integerIterableTuple2._1);
for (Tuple2<Integer, Integer> t : integerIterableTuple2._2) {
Integer tcount = hashMap.get(t._2);
if (count < tcount || (count.equals(tcount) && integerIterableTuple2._1 < t._2)) {
list.add(t);
}
}
return list.iterator();
}
});
但在这个过程中,for循环内的 hashMap.get(t._2)
大多数时间都会得到NULL。我已经检查过HashMap中是否有正确的值。
在Spark函数内部是否有一种正确获取HashMap值的方法?
英文:
Edit: Already solved using RDD.collectAsMap()
I am trying to replicate the solution to the problem from pages 28-30 of http://on-demand.gputechconf.com/gtc/2016/presentation/S6424-michela-taufer-apache-spark.pdf
I have a HashMap<Integer, Integer> that I instantiate outside of the map function. The HashMap contains the following data:
{1:2, 2:3, 3:2, 4:2, 5:3}
A previously defined RDD previousRDD was has the type:
JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>>
has the data:
1: [(1,2), (1,5)]
2: [(2,1), (2,3), (2,5)]
3: [(3,2), (3,4)]
4: [(4,3), (4,5)]
5: [(5,1), (5,2), (5,4)]
I try to create a new RDD with a flatMapToPair:
JavaPairRDD<Integer, Integer> newRDD = previousRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>>, Integer, Integer>() {
@Override
public Iterator<Tuple2<Integer, Integer>> call(Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>> integerIterableTuple2) throws Exception {
Integer count;
ArrayList<Tuple2<Integer, Integer>> list = new ArrayList<>();
count = hashMap.get(integerIterableTuple2._1);
for (Tuple2<Integer, Integer> t : integerIterableTuple2._2) {
Integer tcount = hashMap.get(t._2);
if (count < tcount || (count.equals(tcount) && integerIterableTuple2._1 < t._2)) {
list.add(t);
}
}
return list.iterator();
}
});
But in this, the hashMap.get(t._2)
inside the for loop gets NULLs most of the time. I have checked that the proper values are inside the HashMap.
Is there a way to properly get the values of a HashMap inside a Spark function?
答案1
得分: 0
应该可以工作。Spark应该捕捉您的变量,对其进行序列化,然后将其与每个任务一起发送到每个工作节点。您可以尝试广播这个映射
sc.broadcast(hashMap)
并且使用结果来替代hashMap
。从内存使用的角度来看,这更加高效(每个执行器共享存储)。
英文:
It should work. Spark should capture your variable, serialize it and send to each worker with each task. You might try broadcasting this map
sc.broadcast(hashMap)
and use the result instead of hashMap
. It is more efficient memory-wise too (shared storage per executor).
答案2
得分: 0
我在类变量方面遇到了类似的问题。你可以尝试将变量设置为局部变量,或者声明一个额外的变量,就像这样:
Map localMap = hashMap;
JavaPairRDD<Integer, Integer> newRDD = previousRDD.flatMapToPair(
...
Integer tcount = localMap.get(t._2);
...
);
我认为这是由于 Spark 序列化机制引起的。你可以在这里阅读更多相关信息。
英文:
I had similar problem with class variables. You can try make your variable local or declare one more, like this:
Map localMap = hashMap;
JavaPairRDD<Integer, Integer> newRDD = previousRDD.flatMapToPair(
...
Integer tcount = localMap.get(t._2);
...
);
I think this is due to spark serialization mechanism. You can read more about it here.
专注分享java语言的经验与见解,让所有开发者获益!
评论