在我们常见的数据结构中,图是一种有点冷门,时常被忽视的数据表达方式。大部分场景下,我们的数据关系足够简单,例如,一对一,一对多,这种情况下,表结构(关系数据库)或者文档结构(文档数据库)足以刻画数据之前的关系,对于稍复杂的关联/嵌套数据,通过join和文档嵌套的方式也可以轻松构建,此时在抽象层面,我们可以把数据的结构看作是一个树结构(数仓中也称为星型模型和雪花模型 )。那什么时候会用到图呢?答案是在数据中存在大量多对多的模式(Pattern)时,我们可以应用图数据结构(图数据库)来刻画和解决现实世界的问题。典型的图分析场景有:

下面我们从图的基础概念出发,分别介绍在单机与分布式环境下的图的存储方法图分析的常见算法。此外,还会重点分析一些图算法的分布式实现以及社区挖掘的应用

这里主要介绍的内容来自于

图基本概念与算法

图在数据结构中多少都有一些接触,下面我们自行回忆一下图的基本概念:

图的表示方法

图的基本表示方法有

在neo4j数据库中使用了类似邻接表的表示方法,每一个node节点具有(下图):

每个Relation中有(下图):

我们可以看出通过这个数据结构,方便的实现如下功能:通过图顶点Vertex A遍历他的所有边,找到与他关联的节点的值属性值。

图算法综述

这里我们把常见的图相关的算法分为3类,分别为

下面分别介绍一下这些算法算法与应用场景

遍历寻路

遍历算法是最常见的图算法,

最短路径算法也是十分普遍的算法

其他

中心点

这是一类衡量评估某个节点在图中重要性的算法,例如某个点是否是图的中心,是否具有枢纽作用等

聚类与社区发现

可以将这些算法理解为无监督的聚类算法(clustering algorithms)或者分区算法(partitioning algorithms)

https://neo4j.com/blog/graph-algorithms-neo4j-15-different-graph-algorithms-and-what-they-do/

分布式图分析

在数据量超过单机容量时,我们会考虑使用分布式图来做数据分析。在分布式场景下,图的表示方法与算法有些许差异,例如,数据如何分布,算法如何并行与同步,此外,还需保证计算的效率,在合理的时间内结果的收敛。下面我们分布从存储计算角度了解这个问题。

分布式图的存储

图存储的基本方法

大型图的存储总体上有边分割和点分割两种存储方式

一个例子如下,下图是对3条边的图进行分布,a是使用边分割的方式,节点ABCD的节点数据值存储了一份,但是边的数据有6份(还包含节点的引用,虚节点),但是每台机器有节点的完整的边信息。b是使用了点分割,可以看到123条边都只存储了一次,单顶点数据却冗余存储

当前主流系统实现均为点分割方式。考虑到不均衡节点在大图中是常见现象,以及磁盘成本的下降,使用冗余的节点存储减少网络通信。

Spark中的图存储

spark中图存储的基本方法使用了点分割的方式:每台机器存储Edge数据,同时对Vertex构建查找路由。Graph由EdgeRDD与VertexRDD组成,其中EdgeRDD存储了边,包含了src/dst/边的attr ;VertexRDD则含有顶点数据,有vid和属性attr,此外,VertexRDD还需要能通过顶点定位到所在的边,既即路由功能。下面简要介绍一下EdgeRDD与VertexRDD的数据结构:

分布式图计算模型

图计算的基本思想是BSP模型,即Bulk Synchronous Parallell(分布式批同步),他的基本思想是将图的计算分解成一系列串行的superstep(超步):

每一个超步内部强调计算与通讯分离,具体而言包含三个阶段:

我们发现上述模型也有类似与shuffle的过程,但是对比MR,BSP模型具有下列不同:

BSP模型的传统实现

BSP的有两种常见的计算框架:

下面是分别使用Pregel框架和GraphLab框架实现pagerank的伪代码:

// Pregel框架实现pagerank,被图的顶点调用
def PageRank(v: Id, msgs: List[Double]) {
	// 计算消息和
	var msgSum = 0
	for (m <- msgs) { msgSum = msgSum + m }
	// 更新 PageRank (PR)
	A(v).PR = 0.15 + 0.85 * msgSum
	// 广播新的PR消息
	for (j <- OutNbrs(v)) {
		msg = A(v).PR / A(v).NumLinks
		send_msg(to=j, msg)
	}
	// 检查终止
	if (converged(A(v).PR)) voteToHalt(v)
}


// GraphLab框架实现pagerank
// gather汇总,被图的边调用
def Gather(a: Double, b: Double) = a + b
// apply更新顶点,图的顶点调用(msg是数据交换了)
def Apply(v, msgSum) {
	A(v).PR = 0.15 + 0.85 * msgSum
	if (converged(A(v).PR)) voteToHalt(v)
}
// scatter更新边
def Scatter(v, j) = A(v).PR / A(v).NumLinks
Spark Graphx中BSP模型的实现

GraphX也是基于BSP模式。GraphX的核心API是aggregateMessages,此外基于次API还封装了一个类似Pregel的操作,它结合了的PregelGraphLab框架的特点。我们可以**把Spark中的Pregel理解为多轮的aggregateMessages + joinVertices的组合(即一个superstep)。**首先我们看一下aggregateMessages API:

 // api定义
class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
 
	def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
}

aggregateMessages计算过程:先遍历边,再遍历顶点(只遍历send后的顶点),最终形成(vid,msg)的RDD,VertexRDD[Msg]

joinVertices操作则是把原来的graph与收到的msg关联起来,并定义了顶点的更新函数mapFunc`

// 关联消息的操作,vprog是顶点更新
g = g.joinVertices(messages)(vprog) 

具体实现参考Graphx的Pregel源码

对比:Graphx中的Pregel与原始的Pregel框架和GraphLab有什么异同?

部分图算法的分布式实现举例

我们使用前面介绍的Graphx的Pregel方法来实现图算法,这里举两个源码中的例子,分别是PageRank和LPA,可以发现使用该方法,对于这两种迭代算法的实现都十分简短

一个Graphx版本的PageRank实现如下:

val pagerankGraph: Graph[Double, Double] = graph
  // Associate the degree with each vertex
  .outerJoinVertices(graph.outDegrees) {
    (vid, vdata, deg) => deg.getOrElse(0)
  }
  // Set the weight on the edges based on the degree
  .mapTriplets(e => 1.0 / e.srcAttr)
  // Set the vertex attributes to the initial pagerank values
  .mapVertices((id, attr) => 1.0)

def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
   resetProb + (1.0 - resetProb) * msgSum
def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
    Iterator((edge.dstId, edge.srcAttr * edge.attr))
def messageCombiner(a: Double, b: Double): Double = a + b
val initialMessage = 0.0

// Execute Pregel for a fixed number of iterations.
Pregel(pagerankGraph, initialMessage, numIter)(
  vertexProgram, sendMessage, messageCombiner)
}}}

一个Graphx版本的LPA实现如下(源码文件):

    val lpaGraph = graph.mapVertices { case (vid, _) => vid }
// 发送自己的标签给邻居,使用顶点id作为标签
    def sendMessage(e: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, Map[VertexId, Long])] = {
      Iterator((e.srcId, Map(e.dstAttr -> 1L)), (e.dstId, Map(e.srcAttr -> 1L)))
    }
// 统计邻居节点标签的数量
    def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long])
      : Map[VertexId, Long] = {
      val map = mutable.Map[VertexId, Long]()
      (count1.keySet ++ count2.keySet).foreach { i =>
        val count1Val = count1.getOrElse(i, 0L)
        val count2Val = count2.getOrElse(i, 0L)
        map.put(i, count1Val + count2Val)
      }
      map
    }
// 使用数量最多的邻居标签作为顶点的属性值
    def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]): VertexId = {
      if (message.isEmpty) attr else message.maxBy(_._2)._1
    }
    val initialMessage = Map[VertexId, Long]()
// 迭代计算
    Pregel(lpaGraph, initialMessage, maxIterations = maxSteps)(
      vprog = vertexProgram,
      sendMsg = sendMessage,
      mergeMsg = mergeMessage)
  }

社区发现算法

社区发现是什么

首先,我们需要解释在图场景下社区的概念。社区是在图中具有如下特征的子图

从上述定义可以看出:社区是一个比较含糊的概念,只给出了一个定性的刻画。

我们可以把社区发现算法对比与无监督学习中的聚类算法,典型的如 K-means系列算法,他们也是把相邻的节点进行聚合分类,不同的只有k-means中通过特征的『距离』作为衡量的标准。图的社区发现中通过边以及边的权重作为依据。

社区发现算法系列

算法分类

社区发现中,我们希望把一个图分割成N个互不相交的社区(N往往未知),一般有两种思路:第一种是cut,也就是划分,把无关联的边去掉,进而取到核心的社区。第二种是gather,也就是聚合,将关联性比较大的顶点聚集起来,关联性较小的顶点剔除出去。

其中cut算法的典型代表是:k-core decomposition。基本思想是对图中的所有顶点进行判定,若度小于K,则将该点和所有关联的边从图中删除,然后再进行下一轮迭代,直到图中所有的顶的度都大于K。

LOOP

​ (1)找到小于K的点

​ (2)删除步骤1找到的点和边

​ (3)继续进行步骤1,若找不到符合条件的点则结束

END

gather算法的典型代表是:Label Propagation Algorithm标签传播算法,每个顶点在开始的时候都设立自己的标签,然后向所有的邻居进行广播。每个顶点在接收到广播的时候呢,将收到的最多的标签作为自己的标签,进行下一轮的迭代。

初始化:(1)顶点标签

LOOP

(2)发送自己的标签

(3)接收标签

(4)将最大标签作为自己的标签

​ (5)继续进行方法2,直到达到最大迭代次数。

END

效果评估-Modularity

不同算法的效果如何衡量,即社区质量评估是社区发现中的一个重要问题,一般我们使用一个数值标量来做为评估标准:Modularity。定义如下:网络中连接社区结构内部顶点的边所占的比例与另外一个随机网络中连接社区结构内部顶点的边所占比例的期望值相见得到的差值。

Modularity的定义历史可以参考这篇文章。重点理解:

常见算法

除了我们之前提到的k-core decomposition与LPA外,有一系列基于他们衍生算法,主要解决算法中存在的问题。这里我们讨论一下LPA的衍生算法。在LPA算法中,只能将顶点划分一个社区内,这样会导致一个经常出现的顶点的分类结果震荡问题。最常见的改进是SLPA算法(Speaker Listener Label Label Propagation Algorithm),SLPA 中引入了 Listener 和 Speaker 两个比较形象的概念,你可以这么来理解:在刷新节点标签的过程中,任意选取一个节点作为 listener,则其所有邻居节点就是它的 speaker 了,speaker 通常不止一个,一大群 speaker 在七嘴八舌时,listener 到底该听谁的呢?这时我们就需要制定一个规则。在 LPA 中,我们以出现次数最多的标签来做决断,其实这就是一种规则。只不过在 SLPA 框架里,规则的选取比较多罢了(可以由用户指定)。当然,与 LPA 相比,SLPA 最大的特点在于:它会记录每一个节点在刷新迭代过程中的历史标签序列(例如迭代 T 次,则每个节点将保存一个长度为 T 的序列,如上图所示),当迭代停止后,对每一个节点历史标签序列中各(互异)标签出现的频率做统计,按照某一给定的阀值过滤掉那些出现频率小的标签,剩下的即为该节点的标签(通常有多个)。过程如下:

初始化:(1)顶点标签

LOOP

​ (2)根据规则发送标签

​ (3)接收标签,将标签以一定的规则追加到自己的标签中

​ (4)继续进行方法2,直到达到最大迭代次数。

END

​ (5)对结果进行解析,自行决定单顶点社区的个数。

此外,还有一些衍生算法致力于解决LPA中的大社区问题,即分类中会出现某些社区具有特别巨量的节点。我们往往通过引入传播距离作为控制,改进算法的结果,常见的算法有HANP,DCLP等,可以参考这篇文章

另一大类的社区算法还会在算法运行过程中使用Modularity作为优化目标来进一步指导节点的分类,这就是我们最后会详细说明的Fast unfolding Louvain算法,该算法除了分类效果最好外,还可以生成多分辨率图,巧妙的避免了『巨型社区』的问题。

总结常见的社区发现算法如下:

社区发现算法实现-Louvain实现

当前效果比较好的社区发现算法是Fast unfolding Louvain算法。该算法对于大数据量的图(亿节点+)具有快速的聚类效果,此外,通过多分辨率图的方式解决『大社区』问题。可以阅读论文:fast unfolding of communities in large networks,这篇论文十分紧凑推荐阅读。

DGA库介绍与源码

背景:实际开发中并没有重头开发算法库,而是在第一版本中使用了第三方的库,这个库在github上使用的人非常少,风险很高
文档:http://sotera.github.io/distributed-graph-analytics/
介绍:这个库主要是在Hadoop环境(Giraph)和Spark环境(GraphX)下实现了一些图的算法。这些框架都使用了BSP模型(同步栅栏模型).

DGA实现了如下5种算法:

#####Louvain Modularity算法源码分析与导读

文档资料

代码流程分析

louvain迭代过程分析:

louvainCore.louvain(sc, louvainGraph, minimumCompressionProgress, progressCounter),输入louvainGraph

  1. 计算totalGraphWeight,(这里是图的边的权重*2,或者叫度的权重)
  2. 使用aggregateMessages,将每个vertex的附近的邻居的社区id和communitySigmaTot 拿到
  1. do-while迭代:注意终止条件:stop--含义表示update<minProgress的进展太小(合并节点太少)
  1. 计算最终的actualQ(modularity)返回:算法就是先用公式算每个社区的modularity,然后整体求和

可能优化点:aggregateMessages不用map,而是直接算deltaQ并取最大值

deltaQ的计算问题

我们发现算法实现时的deltaQ计算与原始论文不同。由于我们只需要比较deltaQ值的相对大小,因此上述两个公式是等价的。证明可以参考这篇文章
重点理解:我们只需要计算相对大小即可(就可以比较找出最大变换),因此在unfold fast论文中deltaQ计算公式:

ΔQ=[in+ki,in2m(tot+ki2m)2][in2m(tot2m)2(ki2m)2]

为了简化计算,可以转变为相对值:

ΔQ=ki,intot×kim

DGA实现中的deltaQ问题的理解:

论文中理解deltaQ的关键点是:首先假设某个节点在图中在一个独立的社区中,然后有deltaQ的公式:

ΔQ=[in+ki,in2m(tot+ki2m)2][in2m(tot2m)2(ki2m)2]

这个公式中的,最后一项就是基于前面的假设,即这个节点是个独立的社区,所以他自身的modularity是(ki2m)2

但是,注意但是!如果节点不在一个独立的社区中,怎么办?论文的方案很简单:把这个过程分为两步,第一步把节点从原来的社区移走,第二步再把节点加入新社区。这里的第一步就是上面的公式的负数。如果算法实现没错的化,就是只移动节点到有增加deltQ的社区,那么移出一定是个负数,因此,最后完整的我们需要看看这个第一步的deltaQ的负数和第二步deltaQ的正数之和是不是>0。

但是,但是。。实际实现算法的时候没有这么干!!
上面我们说为了简化计算,deltaQ可以转变为相对值:

ΔQ=ki,intot×kim

在DGA库中,我们遍历每个节点,我们尝试把当前节点加入到他邻居的社区(注意表述的顺序,谁加入到谁)。因此,如果这个节点他不是一个单独的社区,他必然要先脱离自己原来的社区,对每个节点而言这是一个固定的值(一个节点比如划分到某个社区了!)。因此我们在寻找『最大deltaQ的相邻节点社区』的过程中,只需要比较相对值,可以忽略这个固定的值。。。。
但是,正如上一段所述,如何保证这个两步之和大于0呢?这就需要在扫描相邻社区时,『特殊』处理,如果发现相邻社区与节点自己的社区相同时,不跳过!!(我们可能认为加入相同社区deltaQ=0)照常计算deltaQ,但是$\sum_{t o t} $中需要减去该节点自身的值,然后算出deltaQ。把这个deltaQ和其他deltaQ一起参与到最终的『最大deltaQ的相邻节点社区』选举中。如果加入其他社区没有收益,就会导致这个『特殊』的deltaQ脱颖而出。(隐含我们不需要对deltaQ>0进行判断,但是lib中还是判断了)

多层图的Modularity/图压缩算法理解

论文在阶段二中有个重要的点:如何压缩图,具体而言每个社区是一个新顶点,如何处理原来的社区内关系?

理解:图的压缩不影响Modularity的值
我们在compressGraph图之后,此时同一个社区的多个节点会合并成一个1个节点,并根据社区内连接到数量N相应的增加N个self-loop连接(在DGA中使用internalWeight表示)。
注意:压缩后的图的Modularity与压缩前的Modularity相等!!!,使用的公式相同:

Q=12mvw[Avwkvkw2m]δ(cv,cw)=i=1c(eiiai2)

再次理解这个公式:

DGA库优化与Bug分析

在使用此库时,发现了大量的问题(并且test case无法通过),最终使得Louvain算法的实现完全错误,导致大量情况下提前结束迭代,此时算法并未收敛。除了错误外,还有大量的性能问题,总结三类问题如下:

Spark误用时导致的Bug:

  1. rdd内map函数重复计算多次。原因:没有cache(或者unpersist)的rdd会有这种现象
  2. unpersist过早的问题:如果在正在触发计算之前unpersist会导致cache的效果没有(lazy),如下
val rdd=sc.read(xxx).cache
val rdd2=rdd.map(x=>x+1)
val rdd3=rdd.map(x=>x+1)
rdd.unpersist() // 无效代码
rdd2.count rdd3.count
// 应当在这里!!
rdd.unpersist()
  1. rdd是不可变数据集!!,一般情况下RDD[CustomObj]对象都是case class定义的,但是,代码中奇葩的使用了class定义(即LouvainData类),并定义了内部的var变量,在map,join等中直接修改变量。导致结果玄学,没有意义。
val rdd:RDD[CustomObj]=sc.read(xxx)
val rdd2=rdd.map(d=>d.flag=1) // 不可以直接修改,而是应该返回新对象!!!
rdd2.filter(d.flag==1)
  1. innerJoin误用,在计算最终的actualQ值时原始代码中使用了GraphX API的innerJoin函数,该函数只会返回两个RDD中都存在的join key。丢弃左侧RDD中存在,但是右侧RDD不存在的Key,因此最终计算的整个图的modularity值缺失了大量的图的值(具体到算法中丢弃了那些没有在本层迭代中发生modularity变化的社区),正确算法应该使用leftJoin

deltaQ计算公式错误:

算法实现中q函数的计算公式是deltaQ = k_i_in - (k_i * sigma_tot / M),但是,这的M的含义是edge_weight*2,原始公式中对应的是edge_weight之和,这里的M应该替换为M/2

Scala代码性能问题:

性能问题主要发生在GraphX中最重要的API:aggregateMessages中。在原始算法中代码如下:


  private def sendCommunityData(e: EdgeContext[LouvainData, Long, Map[(Long, Long), Long]]) = {
    val m1 = (Map((e.srcAttr.community, e.srcAttr.communitySigmaTot) -> e.attr))
    val m2 = (Map((e.dstAttr.community, e.dstAttr.communitySigmaTot) -> e.attr))
    e.sendToSrc(m2)
    e.sendToDst(m1)
  }


  /**
   * Merge neighborhood community data into a single message for each vertex
   */
  private def mergeCommunityMessages(m1: Map[(Long, Long), Long], m2: Map[(Long, Long), Long]) = {
    val newMap = scala.collection.mutable.HashMap[(Long, Long), Long]()
    m1.foreach({ case (k, v) =>
      if (newMap.contains(k)) newMap(k) = newMap(k) + v
      else newMap(k) = v
    })
    m2.foreach({ case (k, v) =>
      if (newMap.contains(k)) newMap(k) = newMap(k) + v
      else newMap(k) = v
    })
    newMap.toMap
  }

// 调用
communityRDD = louvainGraph.aggregateMessages(sendCommunityData, mergeCommunityMessages)

核心问题发生在mergeCommunityMessages函数的newMap.toMap中,在scala的实现中,这里会新创建大量的对象,导致GC严重,拖慢了整个算法。改进方法如下:

    */
  private def mergeCommunityMessages(m1: Map[(Long, Long), Long], m2: Map[(Long, Long), Long]) = {
    val merged = (m2 foldLeft m1) (
      (acc, v) => acc + (v._1 -> (v._2 + acc.getOrElse(v._1, 0L)))
    )
    merged
  }

由于该函数被大量调用,此优化带来性能5x以上的提升。

Louvain算法应用思考

尾巴:使用Gephi可视化图分析

图可视化方法是我们在开发测试和效果验证的重要手段,调研了如下,对于巨型图(百万节点)的可视化问题(也叫多分辨率显示)没有太好的现成工具。下面是一些少量顶点的图可视化工具,其中最最常见的,也是在图的论文中广泛使用来展示实验效果的工具就是gephi。

使用Gephi来运行louvain算法的简单可视化教程如下:

  1. 导入数据,数据格式,为tsv,即tab分隔的边

  1. 点击右侧,模块化的运行按钮启动运行算法

  1. 此时在数据页面出现新的一列,表示分类结果

  1. 在图中展示结果,给节点上色

  1. 挑选一直算法尝试调整图的布局分布

总结

参考文档: