Spark范例:K-means算法

标签: 分布式 海量数据 算法 Scala spark | 发表时间:2011-12-26 12:01 | 作者:yiihsia
出处:http://www.yiihsia.com

k-means 算法接受输入量 k ;然后将n个数据对象划分为 k个聚类以便使得所获得的聚类满足:同一聚类中的对象相似度较高;而不同聚类中的对象相似度较小。聚类相似度是利用各聚类中对象的均值所获得一个“中心对象”(引力中心)来进行计算的。

算法首先会随机确定K个中心位置(位于空间中代表聚类中心的点),然后将各个数据项分配给最临近的中心点。待分配完成之后,聚类中心就会移到分配给该聚类的所有节点的平均位置处,然后整个分配过程重新开始。这个过程会一直重复下去,直到分配过程不再产生变化为止。下图是涉及5个数据项和2个聚类的过程。

《集体智慧编程》笔记之三:聚类发现群组

spark的例子里也提供了K-means算法的实现,我加了注释方便理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package spark.examples
 
import java.util.Random
import spark.SparkContext
import spark.SparkContext._
import spark.examples.Vector._
 
object SparkKMeans {
  /**
   * line -> vector
   */
  def parseVector(line: String): Vector = {
    return new Vector(line.split(' ').map(_.toDouble))
  }
 
  /**
   * 计算该节点的最近中心节点
   */
  def closestCenter(p: Vector, centers: Array[Vector]): Int = {
    var bestIndex = 0
    var bestDist = p.squaredDist(centers(0))//差平方之和
    for (i <- 1 until centers.length) {
      val dist = p.squaredDist(centers(i))
      if (dist < bestDist) {
        bestDist = dist
        bestIndex = i
      }
    }
    return bestIndex
  }
 
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println("Usage: SparkKMeans <master> <file> <dimensions> <k> <iters>")
      System.exit(1)
    }
    val sc = new SparkContext(args(0), "SparkKMeans")
    val lines = sc.textFile(args(1), args(5).toInt)
    val points = lines.map(parseVector(_)).cache() //文本中每行为一个节点,再将每个节点转换成Vector
    val dimensions = args(2).toInt//节点的维度
    val k = args(3).toInt //聚类个数
    val iterations = args(4).toInt//迭代次数
 
    // 随机初始化k个中心节点
    val rand = new Random(42)
    var centers = new Array[Vector](k)
    for (i <- 0 until k)
      centers(i) = Vector(dimensions, _ => 2 * rand.nextDouble - 1)
    println("Initial centers: " + centers.mkString(", "))
    val time1 = System.currentTimeMillis()
    for (i <- 1 to iterations) {
      println("On iteration " + i)
 
      // Map each point to the index of its closest center and a (point, 1) pair
      // that we will use to compute an average later
      val mappedPoints = points.map { p => (closestCenter(p, centers), (p, 1)) }
 
      val newCenters = mappedPoints.reduceByKey {
        case ((sum1, count1), (sum2, count2)) => (sum1 + sum2, count1 + count2) //(向量相加, 计数器相加)
      }.map { 
        case (id, (sum, count)) => (id, sum / count)//根据前面的聚类,重新计算中心节点的位置
      }.collect
 
      // 更新中心节点
      for ((id, value) <- newCenters) {
        centers(id) = value
      }
    }
    val time2 = System.currentTimeMillis()
    println("Final centers: " + centers.mkString(", ") + ", time: "+(time2- time1) )
  }
}

例子中使用了iterations来限制迭代次数,并不是一种好的方法。可以设置一个阀值,在更新中心节点前,判断新的节点和上一次计算的中心计算差平方之和是否已经到了阀值内,如果是,则不需要继续计算下去。

其中用到的Vector类 https://github.com/mesos/spark/blob/master/examples/src/main/scala/spark/examples/Vector.scala

相关 [spark means 算法] 推荐:

Spark范例:K-means算法

- - yiihsia[互联网后端技术]_yiihsia[互联网后端技术]
k-means 算法接受输入量 k ;然后将n个数据对象划分为 k个聚类以便使得所获得的聚类满足:同一聚类中的对象相似度较高;而不同聚类中的对象相似度较小. 聚类相似度是利用各聚类中对象的均值所获得一个“中心对象”(引力中心)来进行计算的. 算法首先会随机确定K个中心位置(位于空间中代表聚类中心的点),然后将各个数据项分配给最临近的中心点.

K-Means 算法

- - 酷壳 - CoolShell.cn
最近在学习一些数据挖掘的算法,看到了这个算法,也许这个算法对你来说很简单,但对我来说,我是一个初学者,我在网上翻看了很多资料,发现中文社区没有把这个问题讲得很全面很清楚的文章,所以,把我的学习笔记记录下来,分享给大家. k-Means 算法是一种  cluster analysis 的算法,其主要是来计算数据聚集的算法,主要通过不断地取离种子点最近均值的算法.

利用Mahout实现在Hadoop上运行K-Means算法

- - CSDN博客云计算推荐文章
    K-Means算法是基于分划分的最基本的聚类算法,是学习机器学习、数据挖掘等技术的最基本的 知识,所以掌握其运行原理是很重要的.     转载请注明出处:  http://hanlaiming.freetzi.com/?p=144.      一、介绍Mahout.     Mahout是Apache下的开源机器学习软件包,目前实现的机器学习算法主要包含有 协同过滤/推荐引擎, 聚类和 分类三个部分.

TensorFlow实战之K-Means聚类算法实践

- - SegmentFault 最新的文章
Google 最近开源了它的第二代人工智能与数值计算库TensorFlow. TensorFlow由Google大脑团队开发,并且能够灵活地运行在多个平台上——包括GPU平台与移动设备中. TensorFlow的核心就是使用所谓的数据流,可以参考Wikipedia上的有关于 Genetic Programming 的相关知识,譬如:.

k-means聚类JAVA实例

- - CSDN博客互联网推荐文章
《mahout in action》第六章. datafile/cluster/simple_k-means.txt数据集如下:. 1、从D中随机取k个元素,作为k个簇的各自的中心. 2、分别计算剩下的元素到k个簇中心的相异度,将这些元素分别划归到相异度最低的簇. 3、根据聚类结果,重新计算k个簇各自的中心,计算方法是取簇中所有元素各自维度的算术平均数.

基于Spark MLlib平台的协同过滤算法---电影推荐系统

- - zzm
又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用.     说到推荐系统,大家可能立马会想到协同过滤算法. 本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用. 基于模型的协同过滤应用---电影推荐.     一、协同过滤算法概述.

Spark概览

- - 简单文本
Spark具有先进的DAG执行引擎,支持cyclic data flow和内存计算. 因此,它的运行速度,在内存中是Hadoop MapReduce的100倍,在磁盘中是10倍. 这样的性能指标,真的让人心动啊. Spark的API更为简单,提供了80个High Level的操作,可以很好地支持并行应用.

Spark与Mapreduce?

- - 崔永键的博客
我本人是类似Hive平台的系统工程师,我对MapReduce的熟悉程度是一般,它是我的底层框架. 我隔壁组在实验Spark,想将一部分计算迁移到Spark上. 年初的时候,看Spark的评价,几乎一致表示,Spark是小数据集上处理复杂迭代的交互系统,并不擅长大数据集,也没有稳定性. 但是最近的风评已经变化,尤其是14年10月他们完成了Peta sort的实验,这标志着Spark越来越接近替代Hadoop MapReduce了.

Spark迷思

- - ITeye博客
目前在媒体上有很大的关于Apache Spark框架的声音,渐渐的它成为了大数据领域的下一个大的东西. 证明这件事的最简单的方式就是看google的趋势图:. 上图展示的过去两年Hadoop和Spark的趋势. Spark在终端用户之间变得越来越受欢迎,而且这些用户经常在网上找Spark相关资料. 这给了Spark起了很大的宣传作用;同时围绕着它的也有误区和思维错误,而且很多人还把这些误区作为银弹,认为它可以解决他们的问题并提供比Hadoop好100倍的性能.

Spark 优化

- - CSDN博客推荐文章
提到Spark与Hadoop的区别,基本最常说的就是Spark采用基于内存的计算方式,尽管这种方式对数据处理的效率很高,但也会往往引发各种各样的问题,Spark中常见的OOM等等. 效率高的特点,注定了Spark对性能的严苛要求,那Spark不同程序的性能会碰到不同的资源瓶颈,比如:CPU,带宽、内存.