Dataset根据指定列去重保留最后一条数据

      单纯的对Dataset去重可以采用spark的API中的dropDuplicate()方法以及可以指定列名的4个重载方法:dropDuplicates(scala.collection.Seq<String> colNames)dropDuplicates(String[] colNames)dropDuplicates(String col1, scala.collection.Seq<String> cols)dropDuplicates(String col1, String... cols),返回值都是去重后的新的Dataset。但是有时候我们需要根据指定列去重后保留最后一条数据,采用dropDuplicate()方法就无法实现了。这时候我们可以将指定的列作为key(如果值过长,可以对其进行HASH,进行压缩映射),转换原数据为mapPartionsRDD,然后再reduceByKey进行去重,保留最后的一条数据。

java版实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final String[] keys = view.getPrimKeys();//获取主键
JavaPairRDD<String, Row> reducePairRDD = viewRDD.toJavaRDD().mapToPair(
new PairFunction<Row, String, Row>() {
public Tuple2<String, Row> call(Row row) throws Exception {
String key = "";
for (int i = 0; i < keys.length; i++) {
key += row.getAs(keys[i]);
}
return new Tuple2<String, Row>(key, row);
}
}
).reduceByKey(new Function2<Row, Row, Row>() {
public Row call(Row row, Row row2) throws Exception {
return row2;
}
}).values;

scala版实现:

1
val unRdd = ds.rdd.map(line=>((line.getAs[String]("col1")+line.getAs[String]("col2")),line)).reduceByKey((x,y)=>y,6).map(_._2)

      在对Dataset转换成mapPartitionsRDD时也可以使用keyBy():

1
2
keyBy()
public static <U> JavaPairRDD<U,T> keyBy(Function<T,U> f)

      对于keyBy()和mapToPair()两个方法的本质并没有区别:

1
2
3
4
5
//mapToPair实现源码
def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}

1
2
3
4
5
6
7
8
9
10
//keyBy()实现源码
def keyBy[U](f: JFunction[T, U]): JavaPairRDD[U, T] = {
implicit val ctag: ClassTag[U] = fakeClassTag
JavaPairRDD.fromRDD(rdd.keyBy(f))
}
//它会调用下面的keyBy方法
def keyBy[K](f: T => K): RDD[(K, T)] = withScope {
val cleanedF = sc.clean(f)
map(x => (cleanedF(x), x))
}

      可以看到两个方法都是使用rdd.map方法来生成新的RDD。