neo4j与spark 的结合

image.png

image.png

正常来说 neo4j是用来图存储的,neo4j企业版 的性能远远高于 社区版,毕竟是收费的,不过 只要下载到就可以使用了,我已经用上了,非常棒。
spark 是用来 做 图计算的,Graphx,其实 spark 和Neo4j 有交叉点,在图论算法上都可以用上,

我们在使用 neo4j 和 spark 结合的时候
1.首先 如果你的neo4j 是需要账号密码登录的话,你就应该 在项目中配置一下,
两三种方式

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.spark.Neo4j
import org.neo4j.spark._
import collection.JavaConversions._
 val spark=SparkSession.builder().appName("play")
.master("local[*]")
.config("spark.neo4j.bolt.url", "bolt://localhost:7687")
                        .config("spark.neo4j.bolt.user", "neo4j")
                        .config("spark.neo4j.bolt.password", "hortmt")
                .getOrCreate()
 import spark.implicits._
 val neo=Neo4j(spark.sparkContext)
//这个是使用sparkSession配置

下一个是使用 sparkConf配置

 val conf = new SparkConf().setAppName("neoej")
                .setMaster("local[*]")
                .set("spark.neo4j.bolt.url", "bolt://localhost:7687")
                .set("spark.neo4j.bolt.user", "neo4j")
                .set("spark.neo4j.bolt.password", "hortmt")
        val sc =new SparkContext(conf)

        val neo=Neo4j(sc)

3 另外还有一种是通过 Neo4jConfig 来做配置

val sparkSession = SparkSession.builder()
              .master("local[*]")
                .appName("LoadDataToNeo4j")
                    .getOrCreate();

  val sc = sparkSession.sparkContext

  val config = Neo4jConfig("localhost:","neo4j",Option("root"))
  Neo4j(sc).cypher("CREATE (c:Client {id:1230}) return c").loadRdd
  sparkSession.close()
  1. 我们最常见的就是 把 neo4j 的node 转化为 spark 里的rdd 或者 dataframe 或者 graph ,其中 rdd 又分为 四种 ,普通rdd ,noderdd ,rowrdd,relrdd
    在整这个的时候首先 遇到两个障碍 ,第一是转化 ,第二是把 图的属性 提取出来变成 一个case class ,

比如我现在有一份 csv格式的 用户通话记录,header 里有五个属性
{.user,.other,.direction,.duration,.timestamp}) ,大概有 一万条记录
首先我先把 这个加载存储到 neo4j,注意 这个csv文件最好放在 neo 安装根目录下的import目录下,否则加载会出错,或者通过 http 下载 加载也可以

参考
https://raw.githubusercontent.com/data-commons/prep-buddy/b2e307f5f5261124ef2a97c4ff9225019804d864/data/calls.csv

https://raw.githubusercontent.com/data-commons/prep-buddy/b2e307f5f5261124ef2a97c4ff9225019804d864/data/calls_with_header.csv

LOAD CSV WITH HEADERS FROM "file:///calls_with_header.csv" AS line  
create (:person {user:line.user, other:line.other, direction:line.direction, duration:line.duration, timestamp:line.timestamp})

然后为这些节点创建关系 ,通过 拨打电话 呼叫 被叫 错过 来创建relationship

match (from:person),(to:person) where (from.other=to.user and from.direction="Outgoing")  
merge (from)-[r:rel{direction:from.direction,duration:from.duration,timestamp:from.timestamp}]->(to) 

match (from:person),(to:person) where (from.user=to.other and from.direction="Incoming")  
merge (from)-[r:rel{duration:from.direction,duration:from.duration,timestamp:from.timestamp}]->(to) 

单单创建关系 企业版比社区版至少要快五倍,
现在 Neo 里面有数据了 ,我们取数据就可以。
大家可以看到,我们的节点其实就是一个 person的实例,那我们在转化为rdd的时候 ,我们希望 rdd 其实wrapper的是person class ,所以我们首先建了一个case class

case class Person(user: String,other:String,direction:String,duration:String,timestamp:String)

然后我们的 执行Neo 的cypher语句

val rawGraphnode=neo.cypher("MATCH (n:person)where (n.duration <>0) RETURN  n{.user,.other,.direction,.duration,.timestamp}").loadNodeRdds

其实 loadRowRdd 也是可以的

  val rawGraphnode=neo.cypher("MATCH (n:person)where (n.duration <>0) RETURN  n{.user,.other,.direction,.duration,.timestamp}").loadRowRdd

通过调试和出错 结果发现 rdd里row 放的其实不是person对象,而是
java.util.Collections.UnmodifiableMap,可笑的是 这个类还是 一个私有类,不过通过发现它继承了 java的 java.util.Map
我们需要把它强制转化为java 的 map ,注意scala的map 是不可以直接转化的


image.png

我们先是 尝试了多遍 ,尤其是看输出

 rawGraphnode.foreach(xd=>{
            val ne=  xd(0).asInstanceOf[java.util.Map[String,String]]
            val pe=  new Person(ne("user"),ne("other"),ne("direction"),ne("duration"),ne("timestamp"))
            println(pe.user+"&&"+pe.other+"&&"+pe.direction+"&&"+pe.duration+"&&"+pe.timestamp)
        })

当我们发现 转化是正常的,然后再转化为 RDD[Person]

 val personRDD:RDD[Person]=  rawGraphnode.map(row=>
        {
            val rawMap= row(0).asInstanceOf[java.util.Map[String,String]]
            val pe=  new Person(rawMap("user"),rawMap("other"),rawMap("direction"),rawMap("duration"),rawMap("timestamp"))
            pe
        }
)

这里面其实最重要的是 map 转化为case class ,这里面其实有很多种优雅的方式实现 ,不过我用的是最笨的,不过我这个也不太容易出错 。
大家参考
https://stackoverflow.com/questions/20684572/scala-convert-map-to-case-class

不过 如果大家使用上 as 语法后,其实就灭有 那么困难了

 val rawGraphnode=neo.cypher("MATCH (n:person)where (n.duration <>0) RETURN  n.user as user,n.other as other,n.direction as direction,n.duration as duration,n.timestamp as  timestamp").loadRowRdd
 rawGraphnode.take(10).foreach(println(_))
image.png

下面的重点是 把 neo4j的节点 转化为 Dataframe 和 graph 的尝试 ,分两种
第一种可以很好按照 schme 的要求保留具体属性 来创建 Dataframe

  val schme:StructType=StructType(Seq(
            StructField("user",StringType,nullable = false),
            StructField("other",StringType,nullable = false),
            StructField("direction",StringType,nullable = true),
            StructField("duration",StringType,nullable = true),
            StructField("timestamp",StringType,nullable = true)
        )
        )
val rawGNDF= spark.createDataFrame(rawGraphnode,schme)
        rawGNDF.printSchema()
image.png

第二种等于是根据 case class 的属性来的,但是 nullable 会都为true

 val dfs= spark.createDataFrame(personRDD)
        dfs.printSchema()
image.png

关于 spark sql row 对象与case class 的转化 ,大家可以看这里
https://stackoverflow.com/questions/28166555/how-to-convert-row-of-a-scala-dataframe-into-case-class-most-efficiently

image.png
image.png

想直接转DataFrame 其实还是有阻碍的,单个属性可以,多个属性就报废了,schema的语法是 (fieldName,fieldtype),其中type 支持 String double long boolean ,如果是object integer float 他们会自动转化

 val rawGraphnode=neo.cypher("MATCH (n:person)where (n.duration <>0) RETURN  n{.user,.other,.direction,.duration,.timestamp}")
                .loadDataFrame(schema = ("user","String"),("other","double"),("direction","string"),("duration","integer"),("timestamp","boolean"))

后来我发现 其实是可以的,就是 需要再 查询语句中使用 as 语法,这样就转化成功了

 val rawGraphnode=neo.cypher("MATCH (n:person)where (n.duration <>0) RETURN  n.user as user,n.other as other,n.direction as direction,n.duration as duration,n.timestamp as  timestamp")
                .loadDataFrame(schema = ("user","object"),("other","object"),("direction","string"),("duration","String"),("timestamp","String"))


        rawGraphnode.printSchema()
        rawGraphnode.show(10)
image.png
image.png

使用上 as 以后 转化为 rowRDD 也非常方便 省力,唯一的缺点就是 structType 都是 string

 val rawGraphnode=neo.cypher("MATCH (n:person)where (n.duration <>0) RETURN  n.user as user,n.other as other,n.direction as direction,n.duration as duration,n.timestamp as  timestamp").loadRowRdd
        val dddf=rawGraphnode.first()
        println(dddf.schema)
image.png

之后的转化为case class 也比较顺手

  rawGraphnode.foreach(xd=>
        {
            val pe=new Person(xd.getAs[String]("user"),xd.getAs[String]("other"),xd.getAs[String]("direction"),xd.getAs[String]("duration"),xd.getAs("timestamp"))
            println(pe.user+"&&"+pe.other+"&&"+pe.direction+"&&"+pe.duration+"&&"+pe.timestamp)
        })

转化为 RDD版的case Class 也非常方便

 val perRDD:RDD[Person]=rawGraphnode.map(xd=>{
            val pe=new Person(xd.getAs[String]("user"),xd.getAs[String]("other"),xd.getAs[String]("direction"),xd.getAs[String]("duration"),xd.getAs("timestamp"))
            pe
        })
        perRDD.take(10).foreach(println(_))
image.png

然后我们 尝试造图

val neo4j: Neo4j = Neo4j(sc=spark.sparkContext).rels(relquery)
        val graph: Graph[Long, String] = neo4j.loadGraph[Long,String]
        println(graph.vertices.count())

打印 11015 ,正确,

推荐一个 GitHub的项目
https://github.com/luhm2017/graphx-analysis

git clone  https://github.com/luhm2017/graphx-analysis.git
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,378评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,356评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,702评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,259评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,263评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,036评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,349评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,979评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,469评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,938评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,059评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,703评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,257评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,262评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,501评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,792评论 2 345

推荐阅读更多精彩内容