Effect notes: streams and such
We shipped another bit of Effect-powered code today, for the Val Town user-facing logs pipeline. The initial issue was that we've been seeing high usage on the ClickHouse database used to power logs, and I suspect that part of this is that we're spamming the database with tons of single-row writes. ClickHouse really likes batch writes and doesn't like high-frequency writes, so I wanted to do some basic batching.
Queue
Effect has a lot of nice tools for this kind of problem: in particular, Queues, and Stream operations are really nice expansive API surfaces for this kind of thing.
export const globalLogQueue = Effect.runSync(Queue.unbounded<LogLine>());
runtime.runFork(
Stream.fromQueue(globalLogQueue).pipe(
Stream.groupedWithin(20, "1 seconds"),
Stream.runForEach((batch) =>
Effect.tryPromise(() =>
clickhouseClient.insert<LogLine>({
table: logLinesTableName,
values: Chunk.toArray(batch)
})
).pipe(
Effect.catchAll((e) => Effect.sync(() => Sentry.captureException(e)))
)
)
)
);This is pretty nice stuff: you have a global queue, and can add stuff to it like this:
Queue.offer(globalLogQueue, newLogLines);Pretty nifty APIs here, I think. And I like that there are bounded, dropping, and sliding queues right off the shelf. Effect definitely is built for a world with unexpected scale points, in which any potential buffer or array could fill up unexpectedly, and it comes ready with solutions.
Duration
For all of this timing-related code, I've found the Effect Duration data type to be really, really nifty, and also found the first instance where the pipe() function came in handy. For example, I wanted to update a rate limit counter but set it to decrement in 7 days, at the end of the day. This used to involve a lot of magic numbers and date math to calculate seconds in a day and do multiplication, but after the Effect refactor it looks like:
const expireAt = pipe(
DateTime.unsafeNow(),
DateTime.endOf("day"),
DateTime.add({ days: 7 }),
DateTime.toEpochMillis
);Pretty slick, and I think this is fairly readable - take the current time, then shift it to the end of the day, add 7 days, and convert to milliseconds. Really slick. This is part of the really simple API surface of Effect, but it's nice.
Issues
The Streams documentation is okay, but it's oddly short about interoperability: like how do you convert from existing kind of streams in other ecosystems like Node.js and the web, into Effect streams? This is a common theme in the documentation. See: Stream documentation doesn't mention interop ยท Issue #1254.
Anyway, the short story is that you can convert a Node.js Stream into an Effect Stream by treating it as an async iterable, because it is one of those. And you can convert streams into web streams using Stream.toReadableStreamEffect.