Stream creation, transformation, sinks, batching, and resilience. Use when building data pipelines with concurrency and backpressure.
const s = Stream.fromIterable(items)
const out = s.pipe(
Stream.mapEffect(processItem, { concurrency: 4 }),
Stream.filter((a) => a.valid),
Stream.grouped(100)
)
yield* Stream.runDrain(out)
// or
const all = yield* Stream.runCollect(out)
const fileLines = Stream.acquireRelease(open(), close).pipe(
Stream.flatMap(readLines)
)
const resilient = s.pipe(
Stream.mapEffect((x) => op(x).pipe(Effect.retry(retry)))
)
let downloadedBytes = 0
yield* Effect.gen(function* () {
// background progress ticker
yield* Effect.repeat(
Effect.gen(function* () {
const bytes = yield* Effect.succeed(downloadedBytes)
yield* Effect.log(`Downloaded ${bytes}/${contentLength} bytes`)
}),
Schedule.forever.pipe(Schedule.delayed(() => "2 seconds"))
).pipe(Effect.delay("100 millis"), Effect.forkScoped)
yield* s3.putObject(key,
resp.stream.pipe(
Stream.tap((chunk) => { downloadedBytes += chunk.length; return Effect.void })
),
{ contentLength }
)
}).pipe(Effect.scoped)
Stream.mapEffect with concurrency to control parallel workgrouped(n) for batching network/DB operationsacquireReleaserunDrain or chunked writesCRITICAL: Search local Effect source before implementing
The full Effect source code is available at docs/effect-source/. Always search the actual implementation before writing Effect code.
docs/effect-source/effect/src/Stream.tsdocs/effect-source/effect/src/Sink.tsdocs/effect-source/effect/src/Channel.ts# Find Stream creation patterns
grep -F "fromIterable" docs/effect-source/effect/src/Stream.ts
grep -F "make" docs/effect-source/effect/src/Stream.ts
grep -F "fromEffect" docs/effect-source/effect/src/Stream.ts
# Study Stream transformations
grep -F "mapEffect" docs/effect-source/effect/src/Stream.ts
grep -F "filter" docs/effect-source/effect/src/Stream.ts
grep -F "grouped" docs/effect-source/effect/src/Stream.ts
# Find Stream consumption
grep -F "runDrain" docs/effect-source/effect/src/Stream.ts
grep -F "runCollect" docs/effect-source/effect/src/Stream.ts
# Look at Stream test examples
grep -F "Stream." docs/effect-source/effect/test/Stream.test.ts
docs/effect-source/effect/src/Stream.ts for the implementationReal source code > documentation > assumptions