Flink用于外部数据访问的异步I/O

本页阐述了使用Flink的API来进行外部数据存储的异步I/O,对于不熟悉异步或者事件驱动编程的用户,一篇关于Future和事件驱动编程可能会很有用。

注意:关于异步I/O的详细设计和实现可以在异步I/O设计和实现这篇文章找到。

异步I/O操作的需要

当与外部系统进行交互(例如使用存储在数据库中的数据丰富流事件)时, 需要注意的是, 与外部系统的通信延迟并不决定流应用程序的总体工作。

原始的访问外部系统中的数据,例如通过一个MapFunction来访问,通常意味着同步交互:将一个请求发送到数据库,MapFunction等待直到接收到响应为止。很多情况下,这种等待会占用很大一部分函数的时间。

与外部数据库系统进行异步交互意味着一个并行函数实例可以并发地处理多个请求和并发地接收多个响应。那样的话,等待时间就可以被其他的请求或者响应所覆盖。至少,等待时间可以被多个请求摊销,这在很多情况下会导致更高的流吞吐量。


注意:通过扩展MapFunction到一个很高的并发度来提高吞吐量在一定程度上是可行的,但是常常会导致很高的资源消耗:有很多的并行MapFunction实例意味着更多的任务、线程、Flink内部网络连接、与数据库之间的网络连接、缓存以及通常的内部开销。

前提

如上节所述,实现一个连接数据库(或者key/value存储系统)的正确异步I/O需要一个客户端,数据库支持通过该客户端来进行异步请求。许多流行的数据库都支持这种客户端。

对于没有这种客户端的情况下,用户可以将异步客户端换成一个可以通过创建多个客户端并使用线程池处理同步调用来尝试将同步客户端转换为有限的并发客户端。然而,这个方法通常比纯粹的异步客户端性能要低一些。

异步I/O API

Flink的Async I/O允许用户在数据流中使用异步的请求客户端,这个API会处理与数据流的交互,同时还处理顺序、事件时间、容错等。

假设已经目标数据库已经有了异步客户端,要实现一个通过异步I/O来操作数据库还需要三个步骤:
  1、实现用来分发请求的AsyncFunction
  2、获取操作结果的callback,并将它提交到AsyncCollector
  3、将异步I/O操作作为转换操作应用于DataStream

下面代码展示了基本的模式:

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

    /** The database specific client that can issue concurrent requests with callbacks */
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(final String str, final AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception {

        // issue the asynchronous request, receive a future for result
        Future<String> resultFuture = client.query(str);

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the collector
        resultFuture.thenAccept( (String result) -> {

            asyncCollector.collect(Collections.singleton(new Tuple2<>(str, result)));
         
        });
    }
}

// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

    /** The database specific client that can issue concurrent requests with callbacks */
    lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

    /** The context used for the future callbacks */
    implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())


    override def asyncInvoke(str: String, asyncCollector: AsyncCollector[(String, String)]): Unit = {

        // issue the asynchronous request, receive a future for the result
        val resultFuture: Future[String] = client.query(str)

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the collector
        resultFuture.onSuccess {
            case result: String => asyncCollector.collect(Iterable((str, result)));
        }
    }
}

// create the original stream
val stream: DataStream[String] = ...

// apply the async I/O transformation
val resultStream: DataStream[(String, String)] =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

重要提醒:AsyncCollector在第一次调用AsyncCollector.collect时就完成了,所有后续的collect调用都会被忽略。

下面的两个参数控制了异步操作:
  ****Timeout****:timeout定义了异步操作过了多长时间后会被丢弃,这个参数是防止了死的或者失败的请求
  ****Capacity****:这个参数定义了可以同时处理多少个异步请求,虽然异步I/O方法会带来更好的吞吐量,但是算子任然会成为流应用的瓶颈。限制并发请求的数量确保了算子不会积累不断增加的积压的待处理请求,但一旦容量耗尽,它将触发背压。

结果顺序

由AsyncFunction发出的并发请求经常是以无序的形式完成,取决于哪个请求先完成。为了控制发出请求结果的顺序,Flink提供了两种模式:
  ****Unordered****:结果记录在异步请求完成后就发出,流中的记录的顺序通过异步I/O操作后会与先前的不一致。当使用处理时间作为时间特性时这种模式具有低延迟、低消耗特点。通过AsyncDataStream.unorderedWait(...)来使用这种模式。
  ****Ordered****:在这种情况下,流的顺序是保留的,结果记录发出的顺利与异步请求触发的顺序(算子输入记录的顺序)一致。为了实现这一点,算子会将结果记录缓存起来直到所有的处理记录都被发出(或者超时)为止。这常常会导致一定程度的延迟和checkpoint消耗,因为跟非排序模式相比,记录或者结果会被长时间保存在checkpoint State中。通过AsyncDataStream.orderedWait(...)来使用这种模式。

事件时间

当使用流程序使用事件时间时,异步I/O操作将正确处理水印,这具体说明了如下两种模式:
  ****Unordered****:水印不会超过记录反之亦然,这也就意味着水印建立起了一个秩序边界。记录在两个水印间无序地发出。在一个水印后产生的记录只能在这个水印发出之后才能发出,同样水印也只能在所有水印之前的记录都发出之后才能发出。
  ****Ordered****:保存水印的顺序,就如保存记录之间的顺序一样。与处理时间相比,开销没有显著变化。
请记住,摄入时间是一个特殊的事件时间,会基于源处理时间的自动产生水印。

容错性保证

异步I/O操作提供了exactly-once容错性保证,它将异步请求的记录存储在checkpoint中,并在从故障中恢复时恢复/重新触发请求。

实施提示

警告

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 10,946评论 6 13
  • “失败是成功之母”,这句话,我们大多数人从小到大都是耳熟能详的。但是这句话,讲了这么多年。真正能做到的能有几个呢?...
    鹿鹿无畏阅读 864评论 0 51
  • 花开君远行, 花落不见人, 相思愈久矣, 奈何阻重深。
    释迦干屎橛阅读 213评论 0 0
  • 知道自己漂泊了多久吗 你的影踪不难发现 呵,正在窗台发什么呆呢 还是早已厌倦了无边流浪路 躯体的停驻只为一颗心的闲...
    乐从心阅读 178评论 8 17