PersistentBuffer
是一系列 Akka Streams 流组建的第一个。它像 Akka Streams缓冲区一样工作,不同的是,缓存的内容存储于一系列内存映射的文件中,由 PersistentBuffer
构造提供。这可以让缓冲区大小实际无上限,不使用JVM堆来存储,在同时处理百万级/每秒时性能优异。
依赖
以下依赖在Persistent Buffer工作时需要:
"org.squbs" %% "squbs-pattern" % squbsVersion,
"net.openhft" % "chronicle-queue" % "4.5.13"
例子
以下的例子显示了 PersistentBuffer
在流中的用法:
implicit val serializer = QueueSerializer[ByteString]()
val source = Source(1 to 1000000).map { n => ByteString(s"Hello $n") }
val buffer = new PersistentBuffer[ByteString](new File("/tmp/myqueue"))
val counter = Flow[Any].map( _ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val countFuture = source.via(buffer.async).runWith(counter)
此版本在GraphDSL显示相同
implicit val serializer = QueueSerializer[ByteString]()
val source = Source(1 to 1000000).map { n => ByteString(s"Hello $n") }
val buffer = new PersistentBuffer[ByteString](new File("/tmp/myqueue"))
val counter = Flow[Any].map( _ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create(counter) { implicit builder =>
sink =>
import GraphDSL.Implicits._
source ~> buffer.async ~> sink
ClosedShape
})
val countFuture = streamGraph.run()
背压(Back-Pressure)
PersistentBuffer
不背压上游流量。它将获取给它的所有的流元素,并且通过增长或旋转队列文件的数量来增长存储。它没有任何方法确定缓冲区的限制或存储大小。下游流量背压按照每个Akka Streams和Reactive Streams的要求进行。
如果PersistentBuffer
stage被下游流量混淆, PersistentBuffer
不会缓存并且它实际上会背压。为了保证PersistentBuffer
确实运行在它自己的空间,在这之后加入一个async
边界。
失败 & 恢复
由于它的持久特性, PersistentBuffer
可以从突然的流关闭,故障,JVM故障甚至潜在的系统故障中恢复。在同一个目录通过 PersistentBuffer
重启流将启动发出存贮在缓存中的元素,在新的元素加入进来之前不会消费。在先前的流故障或关闭时缓存中正在消费的元素(并未消费完成)将会丢失。
因为缓存通过本地存储、心轴或SSD,因此这个缓存的性能和耐久性同样取决于存储的耐久性。所以,理解和推断缓存的耐久性非常重要,与那些数据库和其他热离线持久存储不是同一个级别,以换取更高的性能。
Akka Streams stage批处理请求并在内部缓存记录。 PersistentBuffer
保证回复和记录的持久化到达onPush
, Akka Stream stage内部缓存的记录未到达onPush
将会在故障中丢失。
提交保证(Commit Guarantee)
在一个不可预知的故障情况中,从 PersistentBuffer
stage发出的元素却未抵达sink
将会丢失。有些情况下,它可能需要避免此类数据丢失。在 sink
之前使用commit
stage对这类情况有帮助。加入 commit
stage,使用 PersistentBufferAtLeastOnce
。请参考下面commit
stage 用法的例子:
implicit val serializer = QueueSerializer[ByteString]()
val source = Source(1 to 1000000).map { n => ByteString(s"Hello $n") }
val tempPath = new File("/tmp/myqueue")
val config = ConfigFactory.parseMap {
Map(
"persist-dir" -> s"${tempPath.getAbsolutePath}"
)
}
val buffer = new PersistentBufferAtLeastOnce[ByteString](config)
val commit = buffer.commit[ByteString]
val flowSink = // do some transformation or a sink flow with expected failure
val counter = Flow[Any].map( _ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create(counter) { implicit builder =>
sink =>
import GraphDSL.Implicits._
// ensures that records are reprocessed when something fails at tranform flow
source ~> buffer ~> flowSink ~> commit ~> sink
ClosedShape
})
val countFuture = streamGraph.run()
请注意,commit
无法防止在 sink
(或者其他commit之后的stage)内部缓存中的丢失。
提交订单(Commit Order)
commit
stage应该正常按照顺序接收元素。然而,流中的一个潜在的bug可能引起一个元素丢弃或不按顺序抵达 commit
stage。默认的commit-order-policy
设置为 lenient
,使流继续运行在这个场景中。你可以设置为 strict
,以便抛出CommitOrderException
异常,并让Supervision.Decider
确定要执行的操作。
空间管理
一个典型的持久队列目录查看如下:
$ ls -l
-rw-r--r-- 1 squbs_user 110054053 83886080 May 17 20:00 20160518.cq4
-rw-r--r-- 1 squbs_user 110054053 8192 May 17 20:00 tailer.idx
当所有的读者成功处理读取queue,队列文件自动删除。
配置
队列通过传递一个保存了所有默认配置的持久化目录的地址创建。所有的例子可以在上面看到。或者,它可以通过传递在构建时一个 Config
对象创建。 Config
对象是一个标准的HOCON 配置。下面的例子展示了使用Config
构建PersistentBuffer
:
val configText =
"""
| persist-dir = /tmp/myQueue
| roll-cycle = xlarge_daily
| wire-type = compressed_binary
| block-size = 80m
""".stripMargin
val config = ConfigFactory.parseString(configText)
//使用Config构建缓存
val buffer = new PersistentBuffer[ByteString](config)
下面的配置属性用于 PersistentBuffer
persist-dir = /tmp/myQueue # Required
roll-cycle = daily # Optional, defaults to daily
wire-type = binary # Optional, defaults to binary
block-size = 80m # Optional, defaults to 64m
index-spacing = 16k # Optional, defaults to roll-cycle's spacing
index-count = 16 # Optional, defaults to roll-cycle's count
commit-order-policy = lenient # Optional, default to lenient
Roll-cycle可以用大写或小写指定。roll-cycle
支持以下值:
Roll Cycle | 容量(Capacity) |
---|---|
MINUTELY | 64 million entries per minute |
HOURLY | 256 million entries per hour |
SMALL_DAILY | 512 million entries per day |
DAILY | 4 billion entries per day |
LARGE_DAILY | 32 billion entries per day |
XLARGE_DAILY | 2 trillion entries per day |
HUGE_DAILY | 256 trillion entries per day |
Wire-type可以通过大写或者小写指定。wire-type
支持以下值:
- TEXT
- BINARY
- FIELDLESS_BINARY
- COMPRESSED_BINARY
- JSON
- RAW
- CSV
内存大小诸如 block-size
和index-spacing
依据memory size format defined in the HOCON specification指定。
序列化(Serialization)
QueueSerializer[T]
需要被隐式的提供给PersistentBuffer[T]
,如上面的例子所示:
implicit val serializer = QueueSerializer[ByteString]()
QueueSerializer[T]()
为你的目标类型调用生产一个序列化器(Serializer)。它基于基础设施的序列化和反序列化。
实现Serializer
控制队列中细粒度的持久化格式,你可能需要实现你自己的序列化器(serializer)如下:
case class Person(name: String, age: Int)
class PersonSerializer extends QueueSerializer[Person] {
override def readElement(wire: WireIn): Option[Person] = {
for {
name <- Option(wire.read().`object`(classOf[String]))
age <- Option(wire.read().int32)
} yield { Person(name, age) }
}
override def writeElement(element: Person, wire: WireOut): Unit = {
wire.write().`object`(classOf[String], element.name)
wire.write().int32(element.age)
}
}
使用这个序列化器(serializer),只需要在构建PersistentBuffer
之前声明它为隐式的,如下:
implicit val serializer = new PersonSerializer()
val buffer = new PersistentBuffer[Person](new File("/tmp/myqueue")
广播缓存(Broadcast Buffer)
BroadcastBuffer
是持久化缓存的一个变种。这个工作与PersistentBuffer
相似,流元素广播至多个输出端口。因此它是缓存和广播stage的组合。这个配置采用一个名为output-ports
的附加参数,用于指定输出端口的数量。
当流元素从每个输出端口发出(以独立的速度,取决于下游的速度要求)时,特别需要广播缓存。
val configText =
"""
| persist-dir = /tmp/myQueue
| roll-cycle = xlarge_daily
| wire-type = compressed_binary
| block-size = 80m
| output-ports = 3
""".stripMargin
val config = ConfigFactory.parseString(configText)
// Construct the buffer using a Config.
val bcBuffer = new BroadcastBuffer[ByteString](config)
例子
implicit val serializer = QueueSerializer[ByteString]()
val in = Source(1 to 100000)
val flowCounter = Flow[Any].map(_ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create(flowCounter) { implicit builder =>
sink =>
import GraphDSL.Implicits._
val buffer = new BroadcastBufferAtLeastOnce[ByteString](config)
val commit = buffer.commit[ByteString]
val bcBuffer = builder.add(buffer.async)
val mr = builder.add(merge)
in ~> transform ~> bcBuffer ~> commit ~> mr ~> sink
bcBuffer ~> commit ~> mr
bcBuffer ~> commit ~> mr
ClosedShape
})
val countFuture = streamGraph.run()
积分(Credits)
PersistentBuffer
利用Chronicle-Queue 4.x作为高性能内存映射队列持久化。