基于spark的Facebook社团挖掘

从图论的角度来说,社团挖掘问题就是相当于一个图的分隔问题,即给定一个图G=(V,E),顶点集V表示社交网络中的用户,边集E表示用户之间的友好关系,社团挖掘的目的就是找到一种最优的分割图的方法,使得分割后形成若干个子图,跨越不同子图的边的数量尽可能得小,同一个子图内部的边的数量尽可能大。最常见的衡量最优图的标准有三种:
最小割:图中所有的割中,边权值和最小的割为最小割
邻接函数W(A,B)表示子图A和子图B之间所有的权值之和,即
W(A,B)=∑(i∈A,i∈B)▒S(i,j)
假设A_1,…,A_k是图分割称为k个子图的结果,那么k个子图的割(Cut)可以表示为
Cut(A_1,…,A_k)=1/2 ∑_(i=1)^k▒〖W(A_i,〖A〗_i)〗
式中,〖A〗_i为A_i,的补集,W(A_i,〖A〗_i )则表示A_i,与其他部分边的总权重。谱聚类的目标就是求解最小的割。
最小比例隔:
大部分时候最小割的指标通常会产生非常不合理得分隔结果。以二分类为例,最小割指标通常会将整个图分隔称为一个节点和其余节点所有点的两个子图。所以除了希望跨越的子图的边总权重尽可能小,同时也希望每个子图都有合理的大小,因此引入了最小比例割。
RatioCut(A_1,…,A_k)=1/2 ∑_(i=1)^k▒(W(A_i,〖A〗_i))/|A_i |
式中,|A_i |为子图A_i中点的个数。
最小归一化割:
归一化割是最小割的另一种改进指标,其定义如下:
NormCut(A_1,…,A_k)=1/2 ∑_(i=1)^k▒(W(A_i,〖A〗_i))/(volA_i |)
式中,vol〖(A〗i)=∑(t∈A_j)▒S_(t,j) ,表示子图A_i所有节点发出去边的总权重。

基于谱聚类的社团挖掘算法,
求解社交网络G=(V,E)的K聚类的社团挖掘算法如下:
计算社交网络G=(V,E)对应的拉普拉斯矩阵L;
计算拉普拉斯矩阵L的前K个特征值k1<=…<=kk以及对应的特征向量v1,…,vk;
把这k个特征(列)向量排列在一起组成一个N×K的矩阵,将其中的每一行看做K维空间中的一个向量,并使用K均值算法进行聚类。聚类的聚类的结果每一行所属的类别对应社交网络G=(V,E)中相应节点的最终划分类别。

代码:

package experiment3

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.{SparkConf, SparkContext}
// 基于spark的社团挖掘实现
object train1 {
def main(args: Array[String]): Unit = {
  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("com").setLevel(Level.OFF)
  System.setProperty("spark.ui.showConsoleProgress", "false")
  Logger.getRootLogger().setLevel(Level.OFF)
  // step1:由于需要计算社交网络的拉普拉斯矩阵L,因此需要把网络读取到CoordinateMatrix中作为邻接矩阵S
  val conf = new SparkConf().setAppName("ForeachDemo").setMaster("local")
  val sc = new SparkContext(conf)
  println("开始运行")
  val data = sc.textFile("src/experiment3/dataset/facebook/0.edges")
  val adjMatrixEntry = data.map(_.split(' ') match { case Array(id1,id2)=> MatrixEntry(id1.toLong- 1,id2.toLong-1,-1.0)})
  val adjMatrix = new CoordinateMatrix(adjMatrixEntry)
  println("Num of nodes="+adjMatrix.numCols() +",NUm of deges="+data.count())
  // step1_result:Num of nodes=348,NUm of deges=5038

  // step2: 计算社交网络的拉普拉斯矩阵L= D-S;
  // 计算矩阵D的对角元素值
  val rows = adjMatrix.toIndexedRowMatrix.rows
  val diagMatrixEntry = rows.map(row =>
      MatrixEntry(row.index,row.index,row.vector.toArray.sum)
  )
  // 计算拉普拉斯矩阵L=D-S
  val laplaceMatrix = new CoordinateMatrix(sc.union(adjMatrixEntry,diagMatrixEntry))
  println("计算拉普拉斯矩阵L=D-S",laplaceMatrix.toRowMatrix().toString)
  // step3:计算拉普拉斯矩阵的特征列向量构成的矩阵(假设聚类个数是5)
  val eigenMatrix = laplaceMatrix.toRowMatrix.computePrincipalComponents(5)

  // step4:特征列向量矩阵的行向量就是网络中节点对应的5维向量表示
  val nodes = eigenMatrix.transpose.toArray.grouped(5).toSeq
  val nodeSeq = nodes.map(node=> Vectors.dense(node))
  val nodeVectors = sc.parallelize(nodeSeq)

  // step5: 最后求解节点在新的向量表示下的K均值聚类结果
  val clusters = new KMeans().setK(5).setMaxIterations(100).run(nodeVectors)
  val result = clusters.predict(nodeVectors).zipWithIndex().groupByKey().sortByKey()
  result.collect.foreach(c => println("Nodes in cluster " + (c._1 + 1) + ": " +
    c._2.foreach(n => print(" " + n)) +
    println()))
      }
}

基于spark的链路预测算法

package experiment3

import org.apache.hadoop.metrics2.lib.MutableMetric
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd._
import org.apache.spark.graphx._
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

//基于spark的链路预测算法
object train2 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("com").setLevel(Level.OFF)
    System.setProperty("spark.ui.showConsoleProgress", "false")
    Logger.getRootLogger().setLevel(Level.OFF)
    // 数据集是基于SNAP符号社交网络Epinions数据集
    // step1:读取数据集
    val conf = new SparkConf().setAppName("ForeachDemo").setMaster("local")
    val sc = new SparkContext(conf)
    println("开始运行")
    val data = sc.textFile("src/experiment3/dataset/soc-sign-epinions.txt")
    val parsedData = data.map(_.split('\t') match { case Array(id1, id2, sign) => (id1.toLong, id2.toLong, sign.toDouble)})
    //step2:创建节点对象的VertexRDD和正边构成的EdgeRDD对象,用于构建Graph
    val nodes: RDD[(VertexId, String)] = sc.parallelize("OL until 121828L").map(id => (id, id.toString))
    val edges: RDD[Edge[String]] =
      parsedData.filter(_._3 == 1.0).map { case (id1, id2, sign) =>
        Edge(id1, id2, id1 + "->" + id2)
      }
    val network = Graph(nodes, edges)
    val network_ops = network.ops
    //输出社交网络的节点数量和正边的数量
    println("Number of nodes = " + network_ops.numVertices)
    println("NUmber of edges = " + network_ops.numEdges)
    // 结果:Number of nodes = 114470
    // NUmber of edges = 717667

    // step3:只统计节点的度作为分类特征
    val degrees = network_ops.degrees.map{case (id,degree) => (id.toLong,degree)}
    val inDegrees = network_ops.inDegrees.map{case (id,degree) => (id.toLong,degree)}
    val outDegrees = network_ops.outDegrees.map{case (id,degree)=>(id.toLong,degree)}

    //step4:将度特征整合到边的信息当中
    val dataset = parsedData.map{case (id1,id2,sign)=> (id1,(id2,sign))
      }.join(degrees).join(inDegrees).join(outDegrees).map{
      case (id1,((((id2,sign),degree),inDegree),outDegree))=>
        (id2,(id1,sign,degree,inDegree,outDegree))
    }.join(degrees).join(inDegrees).join(outDegrees).map{
      case (id2,((((id1,sign,degree1,inDegree1,outDegree1),degree2),inDegree2),outDegree2))=>
        (sign,degree1,inDegree1,outDegree1,degree2,inDegree2,outDegree2)
    }

    //step5:按照3:2划分为训练集和测试集
    val parseDataset = dataset.map{case (s,d1,d2,d3,d4,d5,d6)=>
      if(s==1.0)
          LabeledPoint(1.0,Vectors.dense(d1,d2,d3,d4,d5,d6))
      else
        LabeledPoint(0.0,Vectors.dense(d1,d2,d3,d4,d5,d6))
    }
    val positiveDataset = parseDataset.filter(_.label == 1.0)
    val negativeDataset =  parseDataset.filter(_.label == 0.0)
    val positiveSplts =  positiveDataset.randomSplit(Array(0.6,0.4))
    val negativeSplts =  negativeDataset.randomSplit(Array(0.6,0.4))
    val training = sc.union(positiveSplts(0),negativeSplts(0)).cache()
    val testing =sc.union(positiveSplts(1),negativeSplts(1)).cache()

    //step6:按照二分类问题训练Logistic模型
    val model = new LogisticRegressionWithLBFGS().setNumClasses(2).run(training)

    //step7:输出训练的模型在测试样本上的准确率和召回率
    val predictonAndLabels = testing.map{case LabeledPoint(label,features)=>
      val prediction = model.predict(features)
      (prediction,label)}
    val metrics = new MulticlassMetrics(predictonAndLabels)
    println("Precision = "+metrics.precision(1.0))
    println("Recall = "+metrics.recall(1.0))

    //steps:8 输出每个特征的权值
    val weight = (1 to model.numFeatures) zip model.weights.toArray
      weight.foreach{case (k,w)=>
      println("Feature"+k+"="+w)
      }

  }
}