NiFi 应用

需求描述

1, 使用nifi每天跑一次,把confluence的昨天的谁写了什么题目的记录同步到一张新表
2, 使用superset设置一个dashboard,观看最近两周的每人每天的贡献度

相关数据

confluence.CONTENT

CREATE TABLE `CONTENT` (
`CONTENTID`  bigint(20) NOT NULL ,
`HIBERNATEVERSION`  int(11) NOT NULL DEFAULT 0 ,
`CONTENTTYPE`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL ,
`TITLE`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`LOWERTITLE`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`VERSION`  int(11) NULL DEFAULT NULL ,
`CREATOR`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`CREATIONDATE`  datetime NULL DEFAULT NULL ,
`LASTMODIFIER`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`LASTMODDATE`  datetime NULL DEFAULT NULL ,
`VERSIONCOMMENT`  mediumtext CHARACTER SET utf8 COLLATE utf8_bin NULL ,
`PREVVER`  bigint(20) NULL DEFAULT NULL ,
`CONTENT_STATUS`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`PAGEID`  bigint(20) NULL DEFAULT NULL ,
`SPACEID`  bigint(20) NULL DEFAULT NULL ,
`CHILD_POSITION`  int(11) NULL DEFAULT NULL ,
`PARENTID`  bigint(20) NULL DEFAULT NULL ,
`MESSAGEID`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`PLUGINKEY`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`PLUGINVER`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`PARENTCCID`  bigint(20) NULL DEFAULT NULL ,
`DRAFTPAGEID`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`DRAFTSPACEKEY`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`DRAFTTYPE`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`DRAFTPAGEVERSION`  int(11) NULL DEFAULT NULL ,
`PARENTCOMMENTID`  bigint(20) NULL DEFAULT NULL ,
`USERNAME`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
PRIMARY KEY (`CONTENTID`)
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_bin
ROW_FORMAT=DYNAMIC
;

confluence.user_mapping

CREATE TABLE `user_mapping` (
`user_key`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL ,
`username`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL ,
`lower_username`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
PRIMARY KEY (`user_key`),
UNIQUE INDEX `unq_lwr_username` (`lower_username`) USING BTREE 
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_bin
ROW_FORMAT=DYNAMIC
;

nifi_db.Commitments

CREATE TABLE `Commitments` (
`ContentId`  bigint(20) NOT NULL COMMENT '内容ID' ,
`WeekOfYear`  varchar(255) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT '年份周数' ,
`Title`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '内容抬头' ,
`Modifier`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '更新人' ,
`LastModDate`  datetime NOT NULL COMMENT '最后更新时间' ,
`Creator`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '创建人' ,
`CreateDate`  datetime NULL DEFAULT NULL COMMENT '创建时间' ,
PRIMARY KEY (`ContentId`, `LastModDate`)
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=latin1 COLLATE=latin1_swedish_ci
ROW_FORMAT=DYNAMIC
;

配置服务控制器(Controller Service)

DBCPForConfluence(DBCPConnectionPool)

PROPERTIES

properties values
Database Connection URL jdbc:mysql://47.96.97.244:3306/confluence?useUnicode=true&characterEncoding=utf8
Database Driver Class Name com.mysql.jdbc.Driver
Database Driver Location(s) /usr/share/java/mysql-connector-java.jar
Database User root
Password ***

DBCPForNiFi_db(DBCPConnectionPool)

PROPERTIES

properties values
Database Connection URL jdbc:mysql://gateway001:3306/nifi_db?useUnicode=true&characterEncoding=utf8
Database Driver Class Name com.mysql.jdbc.Driver
Database Driver Location(s) /usr/share/java/mysql-connector-java.jar
Database User root
Password ***

全量导入历史贡献记录

获取数据(ExecuteSQL)

SCHEDULING

Scheduling Strategy Timer driven
Run Schedule 1 days
Execution Primary node
Concurrent Tasks 1

PROPERTIES

Property Value
Database Connection Pooling Service DBCPForConfluence(见上文Controller Service)
SQL select query 代码见下文
Max Wait Time 0 seconds
Normalize Table/Column Names false
Use Avro Logical Types false
Default Decimal Precision 10
Default Decimal Scale 0
SELECT
    a.CONTENTID,
    a.TITLE,
    b.USERNAME AS CREATOR,
    c.USERNAME AS MODIFIER,
    a.CREATIONDATE AS CREATEDATE,
    a.LASTMODDATE,
    YEARWEEK(a.LASTMODDATE) AS WeekOfYear
FROM
    CONTENT a
LEFT JOIN user_mapping b ON a.CREATOR = b.user_key
LEFT JOIN user_mapping c ON a.LASTMODIFIER = c.user_key
WHERE
    a.CONTENTTYPE = 'PAGE'
AND a.spaceid = 98306
AND a.title IS NOT NULL
AND a.PARENTID IS NOT NULL
ORDER BY
    WeekOfYear DESC,
    modifier DESC,
    LASTMODDATE;

格式转化(ConvertAvroToJSON)

直接新增该Processor,默认配置即可。

SQL生成(ConvertJSONToSQL)

PROPERTIES

properties values
JDBC Connection Pool DBCPForNiFi_db(见上文)
Statement Type INSERT
Table Name Commitments

SQL写入(PutSQL)

PROPERTIES

Property Value
JDBC Connection Pool DBCPForNiFi_db
SQL Statement 见下文
Support Fragmented Transactions true
Transaction TimeoutNo value setBatch Size 100
Obtain Generated Keys false
Rollback On Failure true

SQL Statement

注:

该参数可为空,

当为空时,则默认执行ConvertJSONToSQL处理器提供的SQL。

当该参数不为空时,则忽略ConvertJSONToSQL处理器提供的SQL,只取其数据。

本需求场景下,此处建议置空;

REPLACE INTO Commitments (
    ContentId,
    Title,
    Creator,
    Modifier,
    CreateDate,
    LastModDate,
    WeekOfYear
)
VALUES
    (?, ?, ?, ?, ?, ?, ?)

整体流程如图:

定期(每天)导入贡献记录

获取数据(ExecuteSQL)

SCHEDULING

Scheduling Strategy Timer driven
Run Schedule 1 days
Execution Primary node
Concurrent Tasks 1

PROPERTIES

Property Value
Database Connection Pooling Service DBCPForConfluence(见上文Controller Service)
SQL select query 代码见下文
Max Wait Time 0 seconds
Normalize Table/Column Names false
Use Avro Logical Types false
Default Decimal Precision 10
Default Decimal Scale 0
SELECT
    a.CONTENTID,
    a.TITLE,
    b.USERNAME AS CREATOR,
    c.USERNAME AS MODIFIER,
    a.CREATIONDATE AS CREATEDATE,
    a.LASTMODDATE,
    YEARWEEK(a.LASTMODDATE) AS WeekOfYear
FROM
    CONTENT a
LEFT JOIN user_mapping b ON a.CREATOR = b.user_key
LEFT JOIN user_mapping c ON a.LASTMODIFIER = c.user_key
WHERE
    a.CONTENTTYPE = 'PAGE'
AND a.spaceid = 98306
AND a.title IS NOT NULL
AND a.PARENTID IS NOT NULL
AND WEEK (a.CREATIONDATE) = WEEK (CURRENT_DATE())
ORDER BY
    WeekOfYear DESC,
    modifier DESC,
    LASTMODDATE;

后续处理器配置同全量导入即可(见上文);

整体流程如下图:

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,692评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,482评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,995评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,223评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,245评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,208评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,091评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,929评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,346评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,570评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,739评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,437评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,037评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,677评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,833评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,760评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,647评论 2 354

推荐阅读更多精彩内容