Akka Stream之使用流式IO

Akka Sream提供了一种使用流处理文件IO和TCP连接的方法。虽然一般方法与使用Akka IO 的基于Actor的 TCP 处理非常相似, 但通过使用Akka Stream, 您将不必手动对背压信号做出反应, 因为该库对您来说是透明的。

流式TCP

为了实现一个简单的EchoServer,我们绑定到一个给定的地址,它返回一个Source[IncomingConnection, Future[ServerBinding]],该Source为服务器要处理的每一个新连接射入一个IncomingConnection元素:

val binding: Future[ServerBinding] =
  Tcp().bind("127.0.0.1", 8888).to(Sink.ignore).run()

binding.map { b =>
  b.unbind() onComplete {
    case _ => // ...
  }
}
tcp-stream-bind.png

The last boolean argument indicates that we require an explicit line ending even for the last message before the connection is closed. In this example we simply add exclamation marks to each incoming text message and push it through the flow:
接下来,我们简单地使用一个Flow处理每个传入连接,该Flow将用作处理阶段,以从和到TCP Socket处理和发送ByteString s。由于一个ByteString不一定对应于一行文本(客户端可能会以分块的形式发送行),因此我们使用Framing.delimiter帮助Flow 将输入块组合到实际文本行中。最后一个布尔参数表示,我们需要一个明确的行结束,即使是连接关闭之前的最后一个消息。 在本例中, 我们只需将感叹号添加到每个传入的文本消息中, 并将其推送到流中:

import akka.stream.scaladsl.Framing

val connections: Source[IncomingConnection, Future[ServerBinding]] =
  Tcp().bind(host, port)
connections runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo = Flow[ByteString]
    .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
    .map(_.utf8String)
    .map(_ + "!!!\n")
    .map(ByteString(_))

  connection.handleWith(echo)
}
tcp-stream-run.png

请注意,虽然Akka Streams中的大部分构建块都是可重用且可自由共享的,但传入连接Flow并不是这样,因为它直接对应于现有的已经接受的连接,因此它的处理只能物化一次。

通过从服务器逻辑中取消传入的连接流 (例如,将其下游连接到Sink.cancelled并将其上游连接到Source.empty)可以关闭连接。 也可以通过取消IncomingConnection source 连接来关闭服务器的套接字。

然后,我们可以通过使用netcat向TCP Socket发送数据来测试TCP服务器:

$ echo -n "Hello World" | netcat 127.0.0.1 8888
Hello World!!!

连接:REPL客户端

在这个例子中,我们通过TCP实现了一个相当朴素的Read Evaluate Print Loop客户端。 假设我们知道一台服务器已经在 TCP上公开了一个简单的命令行接口, 并希望通过 TCP上的Akka Stream与它进行交互。要打开一个传出连接套接字,我们使用outgoingConnection方法:

val connection = Tcp().outgoingConnection("127.0.0.1", 8888)

val replParser =
  Flow[String].takeWhile(_ != "q")
    .concat(Source.single("BYE"))
    .map(elem => ByteString(s"$elem\n"))

val repl = Flow[ByteString]
  .via(Framing.delimiter(
    ByteString("\n"),
    maximumFrameLength = 256,
    allowTruncation = true))
  .map(_.utf8String)
  .map(text => println("Server: " + text))
  .map(_ => readLine("> "))
  .via(replParser)

connection.join(repl).run()

我们用来处理服务器交互的 repl 流首先打印服务器响应,然后等待命令行的输入(这个阻塞调用在这里只是为了简单起见),并将其转换为ByteString,然后将其发送到服务器。

一个有弹性的REPL客户端将比这更复杂,例如它应该将输入读取分解成一个单独的mapAsync步骤,并且有一种方法让服务器在任何给定时刻写入比一个ByteString块更多的数据,但这些改进仍然存在 作为读者的锻炼。

避免背压循环中的死锁和活动问题

当编写这样的端到端背压系统时, 有时可能会出现循环的情况, 其中任何一方都在等待另一方开始对话。在前面所示的两个例子中, 我们总是假设我们所连接的一方会开始对话, 这实际上意味着双方都在承受背压, 无法开始对话。有多种处理方式,在图形循环,活动和死锁章节中有深入的解释,但是在客户端 - 服务器场景中,通常最简单的方法是让任何一方发送一个初始消息。

注意
在背压循环(即使在不同的系统之间也可能出现)的情况下,有时您必须确定哪一方开始对话才能将其解除。这通常可以通过从一方中注入一个初始消息(对话启动器)来实现。

为了破解背压循环,我们需要注入某个初始消息,一个“对话启动器”。首先,我们需要确定连接的哪一边应该是被动的,哪个方面是主动的。幸运的是,在大多数情况下, 找到开始对话的正确位置是相当简单的, 因为它经常是我们试图使用流实现的协议所固有的。在类似于聊天的应用程序中,如我们的示例,通过发出“hello”消息使服务器启动会话是有意义的:

connections.runForeach { connection =>

  // server logic, parses incoming commands
  val commandParser = Flow[String].takeWhile(_ != "BYE").map(_ + "!")

  import connection._
  val welcomeMsg = s"Welcome to: $localAddress, you are: $remoteAddress!"
  val welcome = Source.single(welcomeMsg)

  val serverLogic = Flow[ByteString]
    .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
    .map(_.utf8String)
    .via(commandParser)
    // merge in the initial banner after parser
    .merge(welcome)
    .map(_ + "\n")
    .map(ByteString(_))

  connection.handleWith(serverLogic)
}

为了发出初始消息,我们将带有一个元素的Source合并,在命令处理之后,但在进行帧化和转换成ByteString之前,这样我们不必重复这样的逻辑。

本例中,服务器端基于分析出命令- BYE 关闭流,而客户端情况下是q。这是通过以下方式实现:从流中提取直到q,连接带有单一Bye元素的Source,在原source完成后发送它。

在协议中使用帧化

诸如TCP之类的流传输协议只是传递字节流,并且从应用程序的角度来看不知道逻辑块的大小是多少。通常在实现网络协议时,你想要引入自己的帧化。这可以有两种方式:帧结束标记,例如, 结束行\ n,可以通过Framing.delimiter进行帧化。或者可以使用长度字段来构建成帧协议。有一个Framing.simpleFramingProtocol提供的bidi协议实现,在ScalaDoc查看更多信息。

流化文件IO

Akka Streams提供简单的Source和Sink,可以使用ByteString实例来对文件执行IO操作。

给一个目标路径,使用FileIO.fromPath从文件创建数据流,还有一个可选的chunkSize,它决定了在这样的流中确定为一个 "元素" 的缓冲区大小:

import akka.stream.scaladsl._
val file = Paths.get("example.csv")

val foreach: Future[IOResult] = FileIO.fromPath(file)
  .to(Sink.ignore)
  .run()

请注意, 这些处理阶段由Actor支持, 默认情况下预先配置的线程池(配置为在专门用于文件 IO )支持的调度器上运行。这一点非常重要, 因为它将阻塞的文件 IO 操作与 ActorSystem 的其余部分隔离开来, 从而使每个调度器能够以最有效的方式进行利用。如果要在全局范围内为文件 IO 操作配置自定义调度器, 可以更改akka.stream.blocking-io-dispatcher,或者在代码中为某个具体的阶段指定一个自定义Dispatcher,像这样:

FileIO.fromPath(file)
  .withAttributes(ActorAttributes.dispatcher("custom-blocking-io-dispatcher"))
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 229,362评论 6 537
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 99,013评论 3 423
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 177,346评论 0 382
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 63,421评论 1 316
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 72,146评论 6 410
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,534评论 1 325
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,585评论 3 444
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 42,767评论 0 289
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 49,318评论 1 335
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 41,074评论 3 356
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,258评论 1 371
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 38,828评论 5 362
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,486评论 3 347
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 34,916评论 0 28
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 36,156评论 1 290
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 51,993评论 3 395
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 48,234评论 2 375

推荐阅读更多精彩内容

  • 1 基本流处理 让我们首先看看使用akka-stream处理流的真正含义。图1展示了在某个处理节点上,元素是一个个...
    乐言笔记阅读 2,675评论 1 1
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,807评论 18 139
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,330评论 11 349
  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 11,064评论 6 13
  • 身高1.63,体重飚到130,想减肥的念头一直像蚊子一样飞来飞去,意志力很不坚定。 今天是21天减肥实行的第一天下...
    蒙节阅读 288评论 0 1