ZStream companion object API
zioの非同期streamコンポーネントであるZStreamのコンパニオンオブジェクトのAPI集。
但し書き
以下のコード例はすべて最終的にZStreamをrunしたZIOに変換されたものだが、最後のexitCodeとZIO自体のrun部分は省略してあるので適宜保管して読んでほしい。
absolve
EitherのstreamからRightなものまでを拾う。
またLeftはstreamをrunしたZIOのエラー型に統合される。
ZStream
.absolve(ZStream(Right(1), Right(2), Right(3)))
.foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
ZStream
.absolve(ZStream(Right(1), Left("error"), Right(3)))
.foreach(i => putStrLn(i.toString))
.catchSome { case s: String => putStrLn(s) }
// 1
// error
access
環境の値にアクセスして、その値を持つZStreamを作成できる。
ZStream
.access[Has[String]](_.get)
.foreach(i => putStrLn(i))
.provideSomeLayer[ZEnv](ZLayer.succeed("str"))
// str
accessM
accessの環境値をeffectを伴いながら取得できる。
ZStream
.accessM[Has[String]](s => ZIO.succeed(s.get))
.foreach(i => putStrLn(i))
.provideSomeLayer[ZEnv](ZLayer.succeed("str"))
// str
accessStream
環境値を取得しつつ、それがZStreamの場合に対応できる。
ZStream
.accessStream[Has[String]](s => ZStream(s.get, s.get, s.get))
.foreach(i => putStrLn(i))
.provideSomeLayer[ZEnv](ZLayer.succeed("str"))
// str
// str
// str
apply(ZManaged)
ZManagedからZStreamを作成できる。
ZStreamはZManagedがNoneのエラー値を持つZIOを排出してきた時点でstopする。
ZStream自体がエラーにならないことに注意。
ZStream(ZManaged.succeed(ZIO.succeed(Chunk(1)))).foreach(i =>
putStrLn(i.toString)
)
// 1が無限にでる
ZStream(ZManaged.succeed(ZIO.fail(None).as(Chunk(1))))
.foreach(i => putStrLn(i.toString))
// 何も出ない
apply(*A)
任意の数の値をとって、それを排出するZStreamを作成する。
ZStream(1, 2, 3).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
bracket
値を取得しつつ、そのクリーンアップ処理を差し込む事ができる。
ZStream
.bracket(ZIO(1))(a => ZIO.succeed(println(s"clean up: ${a.toString}")))
.foreach(i => putStrLn(i.toString))
// 1
// clean up: 1
bracketExit
bracketでExitを利用できる。
ZStream
.bracketExit(ZIO(1))((a, e) => URIO(println(s"finish: $a $e")))
.foreach(i => putStrLn(i.toString))
// 1
// finish: 1 Success(())
crossN
stream同士のデカルト積をとる。
引数のstreamの数の違いでいくつかのvariantが存在する。
ZStream
.crossN(ZStream(1, 2), ZStream(3, 4))((a, b) => a + b)
.foreach(i => putStrLn(i.toString))
// 4
// 5
// 5
// 6
concatAll
streamのChunk(ZIOが持つArrayみたいなデータ型)を結合する。
ZStream
.concatAll(Chunk(ZStream(1, 2), ZStream(3, 4)))
.foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
// 4
die
アプリケーション外のエラーとして失敗するようなstreamを生成する。
ZStream
.die(new Exception("error")) // 死ぬstreamを
.runDrain // とりあえず走らせて
.sandbox // アプリケーション外のエラーをアプリケーションエラーとして取り出して
.catchSome { case e => ZIO(e) } // catchして
.tap(e => putStrLn(e.toString)) // print
// Traced(Die(java.lang.Exception: error), ... )
dieMessage
dieのメッセージのStringだけから死ぬようにできるバージョン。
ZStream
.dieMessage("error")
.runDrain
.sandbox
.catchSome(e => ZIO(e))
.tap(i => putStrLn(i.toString))
// Traced(Die(java.lang.RuntimeException: error), ... )
done
Exit値からstreamを作成する。
ZStream.done(Exit.succeed(1)).foreach(i => putStrLn(i.toString))
// 1
empty
空っぽのstreamを作成する。
ZStream.empty.foreach(i => putStrLn(i.toString))
// 何も出ない
environment
環境値に依存するstreamを作成する。
環境値はrunしたあとにZIOに変換されたあとでも、前でもどちらのタイミングでもprovideできる。
ZStream
.environment[Has[String]]
.foreach(r => putStrLn(r.get[String]))
.provideCustomLayer(ZLayer.succeed("str"))
// str
fail
アプリケーションエラーとして死ぬstreamを作成する。
ZStream
.fail(new Exception("error"))
.runDrain
.catchSome { case e => ZIO(e) }
.tap(i => putStrLn(i.toString))
// java.lang.Exception: error
finalizer
終了後の処理を持つstreamを作成する。
このメソッドで作成されたstreamはUnitを唯一要素として持つ。
ZStream.finalizer(UIO(println("finish"))).foreach(i => putStrLn(i.toString))
// ()
// finish
fromChunk
Chunkからstreamを作成する。
ZStream.fromChunk(Chunk(1, 2)).foreach(i => putStrLn(i.toString))
// 1
// 2
fromHub/fromChunkHub
Hubというzioが持つpub/sub実装からstreamを作成する。
中身がChunk版もある。
for {
hub <- ZHub.unbounded[Int]
stream = ZStream.fromHub(hub)
fiber <- stream.foreach(i => putStrLn(i.toString)).fork
_ <- clock.sleep(1.seconds) // 適当にsleepしないとstreamのセットアップが終了する前にpublishしてしまう
_ <- hub.publish(1)
_ <- fiber.join
} yield ()
// 1
for {
hub <- ZHub.unbounded[Chunk[Int]]
stream = ZStream.fromChunkHub(hub)
fiber <- stream.foreach(i => putStrLn(i.toString)).fork
_ <- clock.sleep(1.seconds)
_ <- hub.publish(Chunk(1, 2, 3))
_ <- fiber.join
} yield ()
// 1
// 2
// 3
fromHubManaged/fromChunkHubManaged
fromHubとかのようにHubからstreamを作成するのだが、streamを作成したあとの処理を追加することによってstreamの作成を待ってpublishなどの処理を行うことができる。
中身がChunk版もある。
val fromHubManaged = for {
p <- Promise.make[Nothing, Unit]
hub <- ZHub.unbounded[Int]
managed = ZStream.fromHubManaged(hub).tapM(_ => p.succeed(()))
stream = ZStream.unwrapManaged(managed)
fiber <- stream.foreach(i => putStrLn(i.toString)).fork
_ <- p.await
_ <- hub.publish(1)
_ <- fiber.join
} yield ()
// 1
for {
p <- Promise.make[Nothing, Unit]
hub <- ZHub.unbounded[Chunk[Int]]
managed = ZStream.fromChunkHubManaged(hub).tapM(_ => p.succeed(()))
stream = ZStream.unwrapManaged(managed)
fiber <- stream.foreach(i => putStrLn(i.toString)).fork
_ <- p.await
_ <- hub.publish(Chunk(1, 2, 3))
_ <- fiber.join
} yield ()
// 1
// 2
// 3
fromHubWithShutdown/fromChunkHubWithShutdown
Hubからstreamを作成しつつ、streamが止まったあとにhubをshutdownする。
中身がChunk版もある。
for {
hub <- ZHub.unbounded[Int]
stream = ZStream.fromHubWithShutdown(hub)
fiber <- stream.take(1).runCollect.tap(i => putStrLn(i.toString)).fork
_ <- clock.sleep(1.seconds)
_ <- hub.publish(1)
_ <- fiber.join
s <- hub.isShutdown
_ <- putStrLn(s.toString)
} yield ()
// Chunk(1)
// true
for {
hub <- ZHub.unbounded[Chunk[Int]]
stream = ZStream.fromChunkHubWithShutdown(hub)
_ <- stream.runHead.tap(i => putStrLn(i.toString)).fork
_ <- clock.sleep(1.seconds)
_ <- hub.publish(Chunk(1, 2, 3))
_ <- clock.sleep(1.seconds)
s <- hub.isShutdown
_ <- putStrLn(s.toString)
} yield ()
// Some(1)
// true
fromHubManagedWithShutdown/fromChunkHubManagedWithShutdown
Managedから作ることでstreamの作成後を保証し、streamが閉じたあとにhubも閉じる。
モリモリ。
中身がChunk版もある。
for {
p <- Promise.make[Nothing, Unit]
hub <- ZHub.unbounded[Int]
managed = ZStream.fromHubManagedWithShutdown(hub).tapM(_ => p.succeed(()))
stream = ZStream.unwrapManaged(managed)
fiber <- stream.take(1).runCollect.tap(i => putStrLn(i.toString)).fork
_ <- p.await
_ <- hub.publish(1)
_ <- fiber.join
s <- hub.isShutdown
_ <- putStrLn(s.toString)
} yield ()
// Chunk(1)
// true
for {
p <- Promise.make[Nothing, Unit]
hub <- ZHub.unbounded[Chunk[Int]]
managed =
ZStream.fromChunkHubManagedWithShutdown(hub).tapM(_ => p.succeed(()))
stream = ZStream.unwrapManaged(managed)
_ <- stream.runHead.tap(i => putStrLn(i.toString)).fork
_ <- p.await
_ <- hub.publish(Chunk(1, 2, 3))
s <- hub.isShutdown
_ <- putStrLn(s.toString)
} yield ()
// Some(1)
// true
fromQueue/fromChunkQueue
QueueというZIOの非同期queue実装からstreamを作成する。
中身がChunk版もある。
val fromQueue = for {
q <- Queue.unbounded[Int]
stream = ZStream.fromQueue(q)
fiber <- stream.take(2).runCollect.tap(i => putStrLn(i.toString)).fork
_ <- q.offer(1)
_ <- q.offer(2)
_ <- fiber.join
} yield ()
// Chunk(1,2)
for {
q <- Queue.unbounded[Chunk[Int]]
stream = ZStream.fromChunkQueue(q)
fiber <- stream.foreach(i => putStrLn(i.toString)).fork
_ <- q.offer(Chunk(1, 2, 3))
_ <- fiber.join
} yield ()
// 1
// 2
// 3
fromQueueWithShutdown/fromChunkQueueWithShutdown
Queueからstreamを作成するのだが、streamが閉じたあとにqueueも閉じる。
for {
q <- Queue.unbounded[Int]
stream = ZStream.fromQueueWithShutdown(q)
fiber <- stream.take(2).runCollect.tap(i => putStrLn(i.toString)).fork
_ <- q.offer(1)
_ <- q.offer(2)
_ <- fiber.join
s <- q.isShutdown
_ <- putStrLn(s.toString)
} yield ()
// Chunk(1,2)
// true
for {
q <- Queue.unbounded[Chunk[Int]]
stream = ZStream.fromChunkQueueWithShutdown(q)
fiber <- stream.take(3).runCollect.tap(i => putStrLn(i.toString)).fork
_ <- q.offer(Chunk(1, 2, 3))
_ <- fiber.join
s <- q.isShutdown
_ <- putStrLn(s.toString)
} yield ()
// Chunk(1,2,3)
// true
fromChunks
Chunkたちからstreamを作成する。
ZStream
.fromChunks(Chunk(1, 2), Chunk(3, 4))
.foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
// 4
fromEffect
ZIOからstreamを作成。
ZStream.fromEffect(ZIO(1)).foreach(i => putStrLn(i.toString))
// 1
fromEffectOption
Noneで失敗するZIOからstreamを作成する。
Noneの場合にもstreamは失敗せずに、何も値が入っていないだけ。
ZStream
.fromEffectOption(ZIO.fromOption(Some(1)))
.foreach(i => putStrLn(i.toString))
// 1
ZStream
.fromEffectOption(ZIO.fromOption(None))
.foreach(i => putStrLn(i.toString))
// Nothing to show
fromIterable/fromIterableM
Iterableからstreamを作成する。
MはZIO[_, _, Iterable]から作る。
ZStream.fromIterable(Seq(1, 2, 3)).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
ZStream.fromIterableM(ZIO(Seq(1, 2, 3))).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
fromIterator/fromIteratorEffect
Iteratorから作成。
EffectはZIO[_, _, Iterator]から。
JavaのIteratorから作成する版もある。
ZStream.fromIterator(Seq(1, 2, 3).iterator).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
ZStream
.fromIteratorEffect(ZIO(Seq(1, 2, 3).iterator))
.foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
fromIteratorManaged
ZManaged入のIteratorからstreamを作成する。
Javaのiteratorから作成する版もある。
ZStream
.fromIteratorManaged(ZManaged.succeed(Seq(1, 2, 3).iterator))
.foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
fromIteratorTotal
Iteratorからstreamを作成するのだが、突っ込むiteratorは例外を射出してはいけない。
JavaのIteratorから作成する版もある。
ZStream
.fromIteratorTotal(Seq(1, 2, 3).iterator)
.foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
fromSchedule
Scheduleからstreamを作成する。
ZStream
.fromSchedule(Schedule.recurs(4))
.runCollect
.tap(i => putStrLn(i.toString))
// Chunk(0,1,2,3)
fromTQueue
zioのstmモジュールのTQueueからstreamを作成する。
for {
tq <- STM.atomically(TQueue.unbounded[Int])
stream = ZStream.fromTQueue(tq)
fiber <- stream.take(2).runCollect.tap(i => putStrLn(i.toString)).fork
_ <- STM.atomically(tq.offer(1))
_ <- STM.atomically(tq.offer(2))
_ <- fiber.join
} yield ()
// Chunk(1,2)
halt
Causeからstreamを作成する。
ZStream
.halt(Cause.fail(new Exception("error")))
.runDrain
.foldCause(println, _ => ())
// Fail(java.lang.Exception: error)
iterate
reduceの様に初期値に与えた関数を繰り返し適用した結果の列をstreamとして作成する。
ZStream.iterate(1)(_ + 1).take(3).foreach(i => putStrLn(i.toString))
// Chunk(1,2,3)
managed
ZManagedからstreamを作成する。
ZStream.managed(ZManaged.succeed(1)).foreach(i => putStrLn(i.toString))
// 1
mergeAll
与えたstreamを並列にマージする。
並列数は引数で制御できる。
ZStream
.mergeAll(3)(ZStream(1, 2), ZStream(3, 4), ZStream(5, 6))
.foreach(i => putStrLn(i.toString))
// 実行するたびに順序が違う
// 3
// 4
// 5
// 6
// 1
// 2
ZStream
.mergeAll(2)(ZStream(1, 2), ZStream(3, 4), ZStream(5, 6))
.foreach(i => putStrLn(i.toString))
// 1~4は実行のたびに順序が違うが、5,6はかならず最後
// 3
// 4
// 1
// 2
// 5
// 6
mergeAllUnbounded
並列数が限界突破したmergeAll。
ZStream
.mergeAllUnbounded()(ZStream(1, 2), ZStream(3, 4), ZStream(5, 6))
.foreach(i => putStrLn(i.toString))
// 実行するたびに順序が違う
// 3
// 4
// 5
// 6
// 1
// 2
never
while {}のようなstreamを作成する。
ZStream.never.foreach(i => putStrLn(i.toString))
// 何も出ないし、終了しない
paginate/paginateM
stateを引き回しつつ、Some値のときは次の値も計算するがNoneのときはその回の処理でstreamを終了する。
ZStream
.paginate(1)(i => (i.toString, Option.when(i < 3)(i + 1)))
.foreach(i => putStrLn(i))
// 1
// 2
// 3
ZStream
.paginateM(1)(i => ZIO((i.toString, Option.when(i < 3)(i + 1))))
.foreach(i => putStrLn(i))
// 1
// 2
// 3
paginateChunk/paginateChunkM
paginate系のChunk版。
ZStream
.paginateChunk(1)(i =>
(Chunk(i.toString, i.toString), Option.when(i < 3)(i + 1))
)
.foreach(i => putStrLn(i))
// 1
// 1
// 2
// 2
// 3
// 3
ZStream
.paginateChunkM(1)(i =>
ZIO((Chunk(i.toString, i.toString), Option.when(i < 3)(i + 1)))
)
.foreach(i => putStrLn(i))
// 1
// 1
// 2
// 2
// 3
// 3
range
minとmaxを指定してその間の値を生成する。
minは含まれるがmaxは含まれない。
ZStream.range(1, 4).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
repeat/repeatEffect
同じ値を繰り返すstreamを作成する。
ZStream.repeat(1).take(3).foreach(i => putStrLn(i.toString))
// 1
// 1
// 1
val repeatEffect =
ZStream.repeatEffect(ZIO(1)).take(3).foreach(i => putStrLn(i.toString))
// 1
// 1
// 1
repeatEffectOption
Noneで失敗するZIOをとってstreamを作成する。
NoneでZIOが失敗する場合でもstream自体は失敗せずに終了する。
for {
ref <- Ref.make(1)
stream = ZStream.repeatEffectOption(
ref.get.flatMap(i =>
if (i < 4) ref.update(_ + 1).as(i)
else ZIO.fail(None)
)
)
_ <- stream.foreach(i => putStrLn(i.toString))
} yield ()
// 1
// 2
// 3
repeatEffectChunk
ZIOのChunkからstreamを作成する。
ZStream
.repeatEffectChunk(ZIO(Chunk(1, 2)))
.take(3)
.foreach(i => putStrLn(i.toString))
// 1
// 2
// 1
repeatWith/repeatEffectWith
Scheduleで何度値の取得を行うか制御する。
ZStream.repeatWith(1, Schedule.recurs(2)).foreach(i => putStrLn(i.toString))
// 1
// 1
// 1
ZStream
.repeatEffectWith(ZIO(1), Schedule.recurs(2))
.foreach(i => putStrLn(i.toString))
// 1
// 1
// 1
service
Hasに囲まれた形で依存を宣言する。
ZStream
.service[String]
.foreach(i => putStrLn(i))
.provideCustomLayer(ZLayer.succeed("str"))
// str
services
Has付きで依存を複数宣言できる。
いくつ依存を定義できるかで複数のvariantが存在する。
ZStream
.services[String, Int]
.foreach(a => putStrLn(a.toString()))
.provideCustomLayer(ZLayer.succeed("str") ++ ZLayer.succeed(1))
// (str,1)
serviceWith
依存からZIO付きで値を取り出せる。
trait A {
def a: UIO[String]
}
ZStream
.serviceWith[A](_.a)
.foreach(i => putStrLn(i))
.provideCustomLayer(ZLayer.succeed(new A {
override def a: UIO[String] = UIO("str")
}))
// str
succeed
単一要素を持つstreamを作成。
ZStream.succeed(1).foreach(i => putStrLn(i.toString))
// 1
tick
Durationを渡すとその間隔ごとにUnitを吐き出す。
ZStream.tick(1.seconds).foreach(i => putStrLn(i.toString))
// 1秒おきに()
unit
単一のUnit値をもつstream。
ZStream.unit.foreach(i => putStrLn(i.toString))
// ()
unfold/unfoldM
paginateの様にSome値を返せば継続、Noneならば終了。
paginateと違うのはNoneのときにその回のイテレーションで生まれた値も捨てる。
ZStream
.unfold(1)(i => Option.when(i < 4)((i.toString, i + 1)))
.foreach(i => putStrLn(i))
// 1
// 2
// 3
ZStream
.unfoldM(1)(i => ZIO(Option.when(i < 4)((i.toString, i + 1))))
.foreach(i => putStrLn(i))
// 1
// 2
// 3
unfoldChunk/unfoldChunkM
unfold系の中身がChunk版。
ZStream
.unfoldChunk(1)(i =>
Option.when(i < 4)((Chunk(i.toString, i.toString), i + 1))
)
.foreach(i => putStrLn(i))
// 1
// 1
// 2
// 2
// 3
// 3
ZStream
.unfoldChunkM(1)(i =>
ZIO(Option.when(i < 4)((Chunk(i.toString, i.toString), i + 1)))
)
.foreach(i => putStrLn(i))
// 1
// 1
// 2
// 2
// 3
// 3
unwrap/unwrapManaged
ZIOやZManagedに包まれたstreamを取り出す。
ZStream.unwrap(ZIO(ZStream(1, 2, 3))).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
ZStream
.unwrapManaged(ZManaged.succeed(ZStream(1, 2, 3)))
.foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
when/whenM
Boolの値によってstream引数のstream、もしくはからのstreamを返す。
ZStream.when(true)(ZStream(1, 2, 3)).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
ZStream.when(false)(ZStream(1, 2, 3)).foreach(i => putStrLn(i.toString))
// 何も出ない
ZStream.whenM(ZIO(true))(ZStream(1, 2, 3)).foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
ZStream.whenM(ZIO(false))(ZStream(1, 2, 3)).foreach(i => putStrLn(i.toString))
// 何も出ない
whenCase/whenCaseM
PartialFunctionでZStreamが返すと全体がそのstreamになる。
PartialFunctionが何も返さないときはempty。
ZStream
.whenCase(1) { case 1 => ZStream(1, 2, 3) }
.foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
ZStream
.whenCase(2) { case 1 => ZStream(1, 2, 3) }
.foreach(i => putStrLn(i.toString))
// 何も出ない
ZStream
.whenCaseM(ZIO(1)) { case 1 => ZStream(1, 2, 3) }
.foreach(i => putStrLn(i.toString))
// 1
// 2
// 3
ZStream
.whenCaseM(ZIO(2)) { case 1 => ZStream(1, 2, 3) }
.foreach(i => putStrLn(i.toString))
// 何も出ない
zipN
複数のstreamをzipする。
いくつzipするかでvariantが存在する。
ZStream
.zipN(ZStream(1, 2), ZStream(3, 4))(_ + _)
.foreach(i => putStrLn(i.toString))
// 4
// 6