流式数据库PipelineDB(集成Kafka)

1. 前言

1.1 PipelineDB 介绍

偶然发现了个流式数据库PipelineDB,它是基于PostgreSQL数据库改造的,允许我们通过sql的方式,对数据流做操作,并把操作结果储存起来。

这年头,真是SQL on everything。

其基本的过程是:

  • 创建PipelineDB Stream。
  • 编写SQL,对Stream做操作。
  • 操作结果被保存到 continuous view,其背后是物理表在支撑。

1.2 安装PipelineDB

我们的安装是在Centos 7上面进行的。 PipelineDB不让用root权限的用户操作,请提前创建用户。

#下载
wget https://s3-us-west-2.amazonaws.com/download.pipelinedb.com/pipelinedb-0.9.6-centos7-x86_64.rpm
# 安装
rmp -ivh ----prefix=/opt/pipelinedb
# 初始化 pipeline-init -D <data directory>
pipeline-init -D /opt/pipelinedb/dbdata
pipelinedb -D /opt/pipelinedb/dbdata
# 激活 continuous query(仅需执行一次,后续重启不用再做)
psql -h localhost -p 5432 -d pipeline -c "ACTIVATE"

2. Quick Start例子

本例是关于 Wikipedia页面访问数据的统计。每一条访问记录,包括以下字段,以英文逗号分割。

hour project page title view count bytes served

2.1 创建continuous视图

首先,我们创建一个continuous view,使用psql工具。从sql里,我们能够看到统计方法和访问记录的对应关系。

psql -h localhost -p 5432 -d pipeline -c "
CREATE STREAM wiki_stream (hour timestamp, project text, title text, view_count bigint, size bigint);
CREATE CONTINUOUS VIEW wiki_stats AS
SELECT hour, project,
        count(*) AS total_pages,
        sum(view_count) AS total_views,
        min(view_count) AS min_views,
        max(view_count) AS max_views,
        avg(view_count) AS avg_views,
        percentile_cont(0.99) WITHIN GROUP (ORDER BY view_count) AS p99_views,
        sum(size) AS total_bytes_served
FROM wiki_stream
GROUP BY hour, project;"

2.2 创建Stream

我们通过curl工具,获取wiki的数据集,并压缩数据,作为一个Stream写入到stdin。因为数据集比较大,当我们执行了几秒钟之后,可以使用ctrl+c中断curl操作。

curl -sL http://pipelinedb.com/data/wiki-pagecounts | gunzip | \
        psql -h localhost -p 5432 -d pipeline -c "
        COPY wiki_stream (hour, project, title, view_count, size) FROM STDIN"

2.3 查看结果

通过下面的命令,从视图(continuous view)读取streaming的统计计算结果。

psql -h localhost -p 5432 -d pipeline -c "
SELECT * FROM wiki_stats ORDER BY total_views DESC";

3. PipelineDB和kafka的集成

3.1 pipeline_kafka组件安装

PipelineDB默认是没有pipeline_kafka扩展组件的,需要我们自己安装。安装需要git,如果没有git,请使用yum -y install git 安装git。

1.安装librdkafka

pipeline_kafka依赖librdkafka,需要先安装librdkafka。

git clone -b 0.9.1 https://github.com/edenhill/librdkafka.git ~/librdkafka
cd ~/librdkafka
./configure --prefix=/usr
make
sudo make install

2.安装pipeline_kafka

编译安装pipeline_kafk。如果有编译依赖的缺失,请根据缺失补充安装依赖。

./configure
make
make install

配置pipeline_kafka

# 编辑配置文件
vi /opt/pipelinedb/dbdata/pipelinedb.conf
# 在结尾输入以下内容并保存(:wq)
# Add settings for extensions here
shared_preload_libraries = 'pipeline_kafka'

重启数据库,使得扩展组件生效

# pipeline-ctl -D <data directory> start|stop|restart
pipeline-ctl -D /opt/pipelinedb/dbdata restart

3.2 Stream SQL开发过程

# 连接数据库
psql -h localhost -p 5432 -d pipeline
# 创建pipeline_kafka
CREATE EXTENSION pipeline_kafka;
# 配置kafka broker
SELECT pipeline_kafka.add_broker('192.168.1.170:9092');
# 创建Stream,从kafka里接受三个参数
CREATE STREAM msg_stream (sjc varchar, thread_name varchar, msg varchar);
# 创建CONTINUOUS VIEW
CREATE CONTINUOUS VIEW msg_result AS SELECT sjc,thread_name,msg FROM msg_stream;
# 开始消费kafka消息
# topic是my-topic,连接PipelineDB Stream名是msg_stream,消息类型是text,消息以英文逗号分割。
SELECT pipeline_kafka.consume_begin ( 'my-topic', 'msg_stream', format := 'text', 
            delimiter := ',', quote := NULL, escape := NULL, batchsize := 1000,
            maxbytes := 32000000, parallelism := 1, start_offset := NULL );


# 如果要停止Stream,请使用以下命令。
SELECT pipeline_kafka.consume_end('my-topic', 'msg_stream');

3.3 验证

1.向kafka发送消息

登录kafka节点的服务器,进入到kafka home路径,使用以下命令发送消息

# 启动producer
bin/kafka-console-producer.sh --broker-list 192.168.1.90:9092 --topic my-topic
# 输入以下数据
a,b,c

2.在PipelineDB中查询收到的消息

从CONTINUOUS VIEW中 查询数据,可以看到有一条记录,即[a,b,c]。

psql -h localhost -p 5432 -d pipeline -c "
SELECT * FROM msg_result";

ps: 当我们连接到PipelineDB,我们可以使用PostgreSQL的命令,来查看有那些数据库对象生成。例如通过 \d 可以查看到,当我们创建CONTINUOUS VIEW的时候,额外创建了msg_result_mrel、msg_result_seq和msg_result_osrel,实际的数据就存储在msg_result_mrel中。

Schema Name Type Owner
public msg_result continuous view pipelinedb
public msg_result_mrel table pipelinedb
public msg_result_osrel stream pipelinedb
public msg_result_seq sequence pipelinedb
public msg_stream stream pipelinedb

http://docs.pipelinedb.com/quickstart.html

https://github.com/pipelinedb/pipeline_kafka

(完)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容