The most important Streaming abstraction

By William "Scala William" Narmontas

accept: (State, Input) => State

Any mutable, side-effect free code can be rewritten into this form and be reused on any iterator with a `scanLeft` method.

Any mutable, side-effecting code can be rewritten into this form, followed by your side-effecting code.

Because this function is not tied to any particular streaming implementation, you can plug it into akka-streams Flow (and thus akka-streams-kafka), as well as Scala's FS2 functional streams or a ScanLeft/ScanRight, or FoldLeft/FoldRight, or a LazyList. into a mutable while loop, a pure test, a tail recursive function, whatever — you are never locked into any one particular orchestration.

I shall give you some examples of this:

Base example

while-loop

var state: State = initialState
while (true) {
  dumpState(state)
  state = accept(state, fetchInput())
}

Iterator

iteratorInput()
  .scanLeft(initialState)(accept)

akka-streams

flowInput
  .scan(initialState)(accept)
  .runForeach(dumpState)

tail recursion

@tailrec
def go(state: State): Unit = {
  dumpState(state)
  go(accept(state, fetchInput()))
}

go(initialState)

Transforming state

But suppose you don't want to output the full state however, but just want to emit an Output. Your Output can be Iterator[Stuff], or Either[Bad, Good] Or a Try[Good] or SomeTrait or anything you like.

while-loop

var state: State = initialState
while (true) {
  dump(extract(state))
  state = accept(state, fetchInput())
}

Iterator

iteratorInput()
  .scanLeft(initialState)(accept)
  .map(extract)
  .foreach(dump)

akka-streams

flowInput
  .scan(initialState)(accept)
  .map(extract)
  .runForeach(dump)

tail recursion

@tailrec
def go(state: State): Unit = {
  dump(extract(state))
  go(accept(state, fetchInput()))
}

go(initialState)

Deduplication state machine

We'll deduplicate consecutive items here. Sample usage:

Iterator.apply[String]("X", "Y", "Y").scanLeft(Deduplicate.initial[String])(_.accept(_)).flatMap(_.emit)

Implementation:

/**
  * accept: (Deduplicate[T], T) => Deduplicate[T]
  * extract: emit: Deduplicate[T] => Option[T]
  */
case class Deduplicate[T](lastSeen: Option[T], emit: Option[T]) {
  def accept(input: T): Deduplicate[T] = {
    if (lastSeen.contains(input)) copy(emit = None)
    else Deduplicate(Some(input), Some(input))
  }
}

object Deduplicate {
  def initial[T]: Deduplicate[T] = Deduplicate(None, None)
}

Testing

Look at just how easy it is!

class DeduplicationSample$Test extends FunSuite {

  import DeduplicationSample.Deduplicate.initial

  test("Empty is empty") {
    initial.emit shouldBe empty
  }
  test("Single is emmitted") {
    initial.accept("Stuff").emit shouldBe Some("Stuff")
  }
  test("Two in a row makes one emit") {
    initial.accept("Stuff").accept("Stuff").emit shouldBe empty
  }
  test("Three in a row makes one emit") {
    initial.accept("Stuff").accept("Stuff").accept("Stuff").emit shouldBe empty
  }
  test("X and then Y gives Y at the end") {
    initial.accept("X").accept("Y").emit shouldBe Some("Y")
  }
  test("X and then Y and then Y gives Y") {
    initial.accept("X").accept("Y").accept("Y").emit shouldBe empty
  }
}

Corollary

The input type can be a List[I], so now you can support batching. Your lists can have size 0, size 1 or size n.

Conclusion

By using this pure immutable approach, your code can be used from multiple places and even extremely easily tested. Overcomplicate it, and your wiring becomes your "domain logic".

Note that you can reuse this code from now on, in any project, in any sort of implementation. You can even plonk it inside a Free Monad if you wish to do so. Because it's simple and is the minimal necessary abstraction to get most work done.

You can test the whole thing step-by-step with simple pure functions.

These things can also be called State Machines. Separate pure logic from side effects, that's all.

Social media

Share on Twitter

Just wrote this: https://t.co/TXQbzeKL2y
It's a draft but important enough to go out :) #streaming #kafka #reactive #akka #scala #jvm

— William Narmontas (@ScalaWilliam) December 14, 2016

My other articles