SparkSql系列(7/25) 去重方法
- - 算法之道消除重复的数据可以通过使用 distinct 和 dropDuplicates 两个方法,二者的区别在于, distinct 是所有的列进行去重的操作,假如你的 DataFrame里面有10列,那么只有这10列完全相同才会去重, dropDuplicates 则是可以指定列进行去重,相当于是 distinct 的缩小版.
消除重复的数据可以通过使用 distinct 和 dropDuplicates 两个方法,二者的区别在于, distinct 是所有的列进行去重的操作,假如你的 DataFrame里面有10列,那么只有这10列完全相同才会去重, dropDuplicates 则是可以指定列进行去重,相当于是 distinct 的缩小版
import spark.implicits._
val simpleData = Seq(("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)
)
val df = simpleData.toDF("employee_name", "department", "salary")
df.show()
输出如下所示:
+-------------+----------+------+ |employee_name|department|salary| +-------------+----------+------+ | James| Sales| 3000| | Michael| Sales| 4600| | Robert| Sales| 4100| | Maria| Finance| 3000| | James| Sales| 3000| | Scott| Finance| 3300| | Jen| Finance| 3900| | Jeff| Marketing| 3000| | Kumar| Marketing| 2000| | Saif| Sales| 4100| +-------------+----------+------+
//Distinct all columns
val distinctDF = df.distinct()
println("Distinct count: "+distinctDF.count())
distinctDF.show(false)
distinct() 函数会返回一个新的 DataFrame ,而不是原地修改。
Distinct count: 9 +-------------+----------+------+ |employee_name|department|salary| +-------------+----------+------+ |James |Sales |3000 | |Michael |Sales |4600 | |Maria |Finance |3000 | |Robert |Sales |4100 | |Saif |Sales |4100 | |Scott |Finance |3300 | |Jeff |Marketing |3000 | |Jen |Finance |3900 | |Kumar |Marketing |2000 | +-------------+----------+------+
或者你不指定 dropDuplicates 的参数,也是可以达到上面的效果
val df2 = df.dropDuplicates()
println("Distinct count: "+df2.count())
df2.show(false)
与 distinct 一样,函数都会返回一个新的 DataFrame
//Distinct using dropDuplicates
val dropDisDF = df.dropDuplicates("department","salary")
println("Distinct count of department & salary : "+dropDisDF.count())
dropDisDF.show(false)
结果如下所示:
Distinct count of department & salary : 8 +-------------+----------+------+ |employee_name|department|salary| +-------------+----------+------+ |Jen |Finance |3900 | |Maria |Finance |3000 | |Scott |Finance |3300 | |Michael |Sales |4600 | |Kumar |Marketing |2000 | |Robert |Sales |4100 | |James |Sales |3000 | |Jeff |Marketing |3000 | +-------------+----------+------+
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object SQLDistinct extends App {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val simpleData = Seq(("James", "Sales", 3000),
("Michael", "Sales", 4600),
("Robert", "Sales", 4100),
("Maria", "Finance", 3000),
("James", "Sales", 3000),
("Scott", "Finance", 3300),
("Jen", "Finance", 3900),
("Jeff", "Marketing", 3000),
("Kumar", "Marketing", 2000),
("Saif", "Sales", 4100)
)
val df = simpleData.toDF("employee_name", "department", "salary")
df.show()
//Distinct all columns
val distinctDF = df.distinct()
println("Distinct count: "+distinctDF.count())
distinctDF.show(false)
val df2 = df.dropDuplicates()
println("Distinct count: "+df2.count())
df2.show(false)
//Distinct using dropDuplicates
val dropDisDF = df.dropDuplicates("department","salary")
println("Distinct count of department & salary : "+dropDisDF.count())
dropDisDF.show(false)
}