SparkSql系列(7/25) 去重方法
- - 算法之道消除重复的数据可以通过使用 distinct 和 dropDuplicates 两个方法,二者的区别在于, distinct 是所有的列进行去重的操作,假如你的 DataFrame里面有10列,那么只有这10列完全相同才会去重, dropDuplicates 则是可以指定列进行去重,相当于是 distinct 的缩小版.
消除重复的数据可以通过使用 distinct
和 dropDuplicates
两个方法,二者的区别在于, distinct
是所有的列进行去重的操作,假如你的 DataFrame里面有10列,那么只有这10列完全相同才会去重, dropDuplicates
则是可以指定列进行去重,相当于是 distinct
的缩小版
import spark.implicits._ val simpleData = Seq(("James", "Sales", 3000), ("Michael", "Sales", 4600), ("Robert", "Sales", 4100), ("Maria", "Finance", 3000), ("James", "Sales", 3000), ("Scott", "Finance", 3300), ("Jen", "Finance", 3900), ("Jeff", "Marketing", 3000), ("Kumar", "Marketing", 2000), ("Saif", "Sales", 4100) ) val df = simpleData.toDF("employee_name", "department", "salary") df.show()
输出如下所示:
+-------------+----------+------+ |employee_name|department|salary| +-------------+----------+------+ | James| Sales| 3000| | Michael| Sales| 4600| | Robert| Sales| 4100| | Maria| Finance| 3000| | James| Sales| 3000| | Scott| Finance| 3300| | Jen| Finance| 3900| | Jeff| Marketing| 3000| | Kumar| Marketing| 2000| | Saif| Sales| 4100| +-------------+----------+------+
//Distinct all columns val distinctDF = df.distinct() println("Distinct count: "+distinctDF.count()) distinctDF.show(false)
distinct()
函数会返回一个新的 DataFrame ,而不是原地修改。
Distinct count: 9 +-------------+----------+------+ |employee_name|department|salary| +-------------+----------+------+ |James |Sales |3000 | |Michael |Sales |4600 | |Maria |Finance |3000 | |Robert |Sales |4100 | |Saif |Sales |4100 | |Scott |Finance |3300 | |Jeff |Marketing |3000 | |Jen |Finance |3900 | |Kumar |Marketing |2000 | +-------------+----------+------+
或者你不指定 dropDuplicates
的参数,也是可以达到上面的效果
val df2 = df.dropDuplicates() println("Distinct count: "+df2.count()) df2.show(false)
与 distinct
一样,函数都会返回一个新的 DataFrame
//Distinct using dropDuplicates val dropDisDF = df.dropDuplicates("department","salary") println("Distinct count of department & salary : "+dropDisDF.count()) dropDisDF.show(false)
结果如下所示:
Distinct count of department & salary : 8 +-------------+----------+------+ |employee_name|department|salary| +-------------+----------+------+ |Jen |Finance |3900 | |Maria |Finance |3000 | |Scott |Finance |3300 | |Michael |Sales |4600 | |Kumar |Marketing |2000 | |Robert |Sales |4100 | |James |Sales |3000 | |Jeff |Marketing |3000 | +-------------+----------+------+
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ object SQLDistinct extends App { val spark: SparkSession = SparkSession.builder() .master("local[1]") .appName("SparkByExamples.com") .getOrCreate() spark.sparkContext.setLogLevel("ERROR") import spark.implicits._ val simpleData = Seq(("James", "Sales", 3000), ("Michael", "Sales", 4600), ("Robert", "Sales", 4100), ("Maria", "Finance", 3000), ("James", "Sales", 3000), ("Scott", "Finance", 3300), ("Jen", "Finance", 3900), ("Jeff", "Marketing", 3000), ("Kumar", "Marketing", 2000), ("Saif", "Sales", 4100) ) val df = simpleData.toDF("employee_name", "department", "salary") df.show() //Distinct all columns val distinctDF = df.distinct() println("Distinct count: "+distinctDF.count()) distinctDF.show(false) val df2 = df.dropDuplicates() println("Distinct count: "+df2.count()) df2.show(false) //Distinct using dropDuplicates val dropDisDF = df.dropDuplicates("department","salary") println("Distinct count of department & salary : "+dropDisDF.count()) dropDisDF.show(false) }