单纯的对Dataset去重可以采用spark的API中的dropDuplicate()
方法以及可以指定列名的4个重载方法:dropDuplicates(scala.collection.Seq<String> colNames)
、dropDuplicates(String[] colNames)
、dropDuplicates(String col1, scala.collection.Seq<String> cols)
、dropDuplicates(String col1, String... cols)
,返回值都是去重后的新的Dataset。但是有时候我们需要根据指定列去重后保留最后一条数据,采用dropDuplicate()方法就无法实现了。这时候我们可以将指定的列作为key(如果值过长,可以对其进行HASH,进行压缩映射),转换原数据为mapPartionsRDD,然后再reduceByKey进行去重,保留最后的一条数据。
** java版实现: **
1 | final String[] keys = view.getPrimKeys();//获取主键 |
** scala版实现: **
1 | val unRdd = ds.rdd.map(line=>((line.getAs[String]("col1")+line.getAs[String]("col2")),line)).reduceByKey((x,y)=>y,6).map(_._2) |
在对Dataset转换成mapPartitionsRDD时也可以使用keyBy():
1 | keyBy() |
对于keyBy()和mapToPair()两个方法的本质并没有区别:
1 | //mapToPair实现源码 |
1 | //keyBy()实现源码 |
可以看到两个方法都是使用rdd.map方法来生成新的RDD。