Akka之Source相关API总结

(1)apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed]
由Iterable创建Source。
例如:Source(Seq(1,2,3))
这类似于从迭代器开始, 但与此流的发布者直接连接的每个订阅者都将看到一个单独的元素流 (总是从开始处开始), 而不管它们何时订阅。

(2)fromIterator[T](f: () ⇒ Iterator[T]): Source[T, NotUsed]
从一个产生迭代器的函数开始一个新的Source。生成的元素流将继续,直到迭代器运行为空或在评估next()方法时失败。根据来自下游转换步骤的需求,将元素从迭代器中拉出。
例如:

val iterator = Iterator.iterate(false)(!_)
//创建一个无限迭代器,重复地将给定的函数应用于先前的结果。
//第一个参数是初始值,第二个参数是将重复应用的函数。
Source.fromIterator(() ⇒ iterator)
        .grouped(10)
        .runWith(Sink.head)
        .futureValue

结果是
immutable.Seq(false, true, false, true, false, true, false, true, false, true)

(3)cycle[T](f: () ⇒ Iterator[T]): Source[T, NotUsed]
从给定的元素(由产生迭代器的函数得到)开始一个循环的Source。元素的生产流将无限地重复由函数参数提供的元素序列。
例如:

Source.cycle(() ⇒ List(1, 2, 3).iterator)
         .grouped(9)
         .runWith(Sink.head)
         .futureValue

结果是
immutable.Seq(1, 2, 3, 1, 2, 3, 1, 2, 3)

(4)fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M]
由source形状(即只有一个出口)的图创建Source。
例如:

val pairs = Source.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._
 
  // prepare graph elements
  val zip = b.add(Zip[Int, Int]())
  def ints = Source.fromIterator(() => Iterator.from(1))
 
  // connect the graph
  ints.filter(_ % 2 != 0) ~> zip.in0
  ints.filter(_ % 2 == 0) ~> zip.in1
 
  // expose port
  SourceShape(zip.out)
})

(5)fromFuture[T](future: Future[T]): Source[T, NotUsed]
从给定的Future创建Source。当Future以成功值完成时(可能在物化Flow之前或者之后发生),流由一个元素组成。当Future以失败完成时,流将终止并带有一个failure。
例如:
Source.fromFuture(Future.successful("Hello Streams!"))

(6)fromCompletionStage[T](future: CompletionStage[T]): Source[T, NotUsed]
类似于Scala的Future创建Source,此处是由Java的CompletionStage创建Source。

(7)fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): Source[T, Future[M]]
由给定的future source形状的图创建Source。一旦给定的Future成功完成,则元素从异步source流出。如果Future失败,则流失败。

(8)fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]]
类似于fromFutureSource

(9)tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable]
元素定期以指定的间隔发出。
"滴答" 元素将被传递到请求任何元素的下游用户。
如果使用者在生成滴答元素时没有请求任何元素, 它以后将不会接收该滴答元素。它将在请求更多元素时立即接收新的滴答元素。
例如:
Source.tick(initialDelay = 2.second, interval = 1.second, "message!")

(10)single[T](element: T): Source[T, NotUsed]
由一个元素创建Source。
例如:Source.single("only one element")

(11)repeat[T](element: T): Source[T, NotUsed]
创建一个连续发送给定元素的Source。
例如:

Source.repeat(42)
        .grouped(3)
        .runWith(Sink.head) 
        .futureValue

结果是:
immutable.Seq(42,42,42)

(12)unfold[S, E](s: S)(f: S ⇒ Option[(S, E)]): Source[E, NotUsed]
创建一个Source,它会将S类型的值展开成一对下一个状态S,'E`类型的输出元素。
例如,10M以下的所有斐波纳契数字:

Source.unfold(0 → 1) {
    case (a, _) if a > 10000000 ⇒ None
    case (a, b) ⇒ Some((b → (a + b)) → a)
 }

(13)unfoldAsync[S, E](s: S)(f: S ⇒ Future[Option[(S, E)]]): Source[E, NotUsed]
与unfold相同,但是使用一个异步函数来产生下一个状态元素元组。

Source.unfoldAsync(0 → 1) {
     case (a, _) if a > 10000000 ⇒ Future.successful(None)
     case (a, b) ⇒ Future{
       Thread.sleep(1000)
       Some((b → (a + b)) → a)
     }
 }

(14)empty[T]: Source[T, NotUsed]
创建一个没有元素的Source,即为每个连接的Sink立即完成的空流。
例如:Source.empty

(15)maybe[T]: Source[T, Promise[Option[T]]]
创建一个Source,它物化为一个scala.concurrent.Promise,它控制什么元素从Source发出。

如果物化的promise以Some完成,那么该值将在下游生成,然后是完成。
如果物化的promise以None完成,那么下游不会产生值,并立即发出完成信号。
如果物化的promise以failure完成,那么返回的source将以那个错误终止。
如果在promise完成前,source的下游取消,那么promise将以None完成。

(16)failed[T](cause: Throwable): Source[T, NotUsed]
创建一个Source,它立刻终止流,并将错误cause给每一个连接的Sink。
例如:

val ex = new Exception("buh")
Source.failed(ex)
     .flatMapMerge(1, identity)
     .runWith(Sink.head)
      .futureValue

(17)lazily[T, M](create: () ⇒ Source[T, M]): Source[T, Future[M]]
创建一个Source,直到下游有需求才物化,当source物化时,物化的future将以其值完成,如果下游取消或失败没有任何需求,则不会调用创建工厂,物化的Future是失败。

(18)asSubscriber[T]: Source[T, Subscriber[T]]
创建一个Source,其物化为一个org.reactivestreams.Subscriber

(19)actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef]
如何先定义流,而后给流传递数据呢?答案就是Source.actorRef。说明:Source.actorRef没有背压策略。
创建一个Source,其物化为一个akka.actor.ActorRef

如果下游有需求, 发送到该actor的消息将被发送到流中, 否则它们将被缓冲, 直到收到需求请求为止。

根据定义的akka.stream.OverflowStrategy,如果缓冲区中没有可用空间, 则可能会丢弃元素。

策略akka.stream.OverflowStrategy.backpressure不受支持, 如果将其作为参数传递, 则会抛出异常 llegalArgument ( "Backpressure overflowStrategy not supported")。

可以使用0的bufferSize禁用缓冲区, 如果下游没有需求, 则会丢弃接收到的消息。当 bufferSize是 0, overflowStrategy并不重要。在此源之后添加一个异步边界;因此, 假定下游总是会产生需求是绝不会安全的。

通过将akka.actor.Status.Failure发送到actor引用, 从而失败来完成流。在标示完成前,以防actor仍消耗其内部缓冲区(在收到一个akka.actor.Status.Success之后),它收到一个akka.actor.Status.Failure,故障将为 立即向下游发信号(而不是完成信号)。

当流完成、失败或从下游取消时, actor将被停止, 即您可以在发生此情况时观察它以获得通知。

val stringSourceinFuture=Source.actorRef[String](100,OverflowStrategy.fail) // 缓存最大为100,超出的话,将以失败告终
  val hahaStrSource=stringSourceinFuture.filter(str=>str.startsWith("haha")) //source数据流中把不是以"haha"开头的字符串过滤掉
  val actor=hahaStrSource.to(Sink.foreach(println)).run()
  actor!"asdsadasd"
  actor!"hahaasd"
  actor!Success("ok")// 数据流成功完成并关闭

(20)combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ⇒ Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed]
MergeConcat按照扇入策略将多个Source合并,返回一个Source。
例如:

val sourceOne = Source(List(1))
val sourceTwo = Source(List(2))
val merged = Source.combine(sourceOne, sourceTwo)(Merge(_))

val mergedResult: Future[Int] = merged.runWith(Sink.fold(0)(_ + _))

(21)zipN[T](sources: immutable.Seq[Source[T, _]]): Source[immutable.Seq[T], NotUsed]
将多个流的元素合并到一个序列流中。

val sources = immutable.Seq(
        Source(List(1, 2, 3)),
        Source(List(10, 20, 30)),
        Source(List(100, 200, 300)))

Source.zipN(sources)
        .runWith(Sink.seq)
        .futureValue

结果是:

immutable.Seq(
          immutable.Seq(1, 10, 100),
          immutable.Seq(2, 20, 200),
          immutable.Seq(3, 30, 300))

(22)zipWithN[T, O](zipper: immutable.Seq[T] ⇒ O)(sources: immutable.Seq[Source[T, _]]): Source[O, NotUsed]
使用组合函数将多个流的元素合并到一个序列流中。

val sources = immutable.Seq(
        Source(List(1, 2, 3)),
        Source(List(10, 20, 30)),
        Source(List(100, 200, 300)))

Source.zipWithN[Int, Int](_.sum)(sources)
        .runWith(Sink.seq)
        .futureValue

结果是:

immutable.Seq(111, 222, 333)

(23)queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]]
创建一个Source,它物化为````akka.stream.scaladsl.SourceQueue```。

您可以将元素推送到队列中, 如果下游有需求, 则它们将被发送到流中, 否则它们将被缓冲, 直到收到需求请求为止。如果下游终止, 缓冲区中的元素将被丢弃。

根据定义的akka.stream.OverflowStrategy,如果缓冲区中没有可用空间, 则可能会丢弃元素。

确认机制可用。
akka.stream.scaladsl.SourceQueue.offer返回Future [QueueOfferResult],如果元素被添加到缓冲区或发送到下游,则它将以QueueOfferResult.Enqueued完成。 如果元素被丢弃,它将以QueueOfferResult.Dropped完成。当流失败时,以QueueOfferResult.Failure完成 或者下游完成时,以
QueueOfferResult.QueueClosed完成。

当缓冲区已满时,策略akka.stream.OverflowStrategy.backpressure不会完成最后offer():Future调用。

可以使用akka.stream.scaladsl.SourceQueue.watchCompletion查看流的可访问性。当流完成时,它返回一个以成功完成的future或者当流失败时,它返回一个以失败完成的future。

可以通过设置bufferSize为0关闭缓冲区,然后接收到的消息将等待下游的需求,如果有另一个消息等待下游需求,这种情况下结果将根据溢出策略完成。

(24)unfoldResource[T, S](create: () ⇒ S, read: (S) ⇒ Option[T], close: (S) ⇒ Unit): Source[T, NotUsed]
从某个可以打开、读取、关闭的资源,创建一个Source。
以阻塞的方式与资源交互。

可以使用监管策略来处理read函数的异常。所有由createclose抛出的异常,都将使流失败。

Restart监管策略将关闭并再次创建阻塞IO。默认策略是Stop,意味着在read函数出现错误流将终止。

通过变更akka.stream.blocking-io-dispatcher或者为提供的Source使用ActorAttributes设置,来配置默认的调度器。

遵守ActorAttributes. SupervisionStrategy属性。

(25)unfoldResourceAsync[T, S](create: () ⇒ Future[S], read: (S) ⇒ Future[Option[T]], close: (S) ⇒ Future[Done]): Source[T, NotUsed]
类似于unfoldResource,但是使用返回Futures而不是纯值的函数。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容