I’ve been going through the excellent ZIO course at RockTheJVM and I wanted to experiment with what I’ve learned.

A lot of the tools provided by ZIO make sense for actors so I started working on a toy implementation. I got interested in the idea of a single-process actor library after reading about Shardcake. It’s easier for me to think about actors when there’s a clear boundary between in-server and cross-server communication so I focused on that.

I could have started with ZIO Actors but it was fun to try out different API ideas and see how they worked. For enterprise use cases that benefit from transparent actor distribution Akka is the defacto solution on the JVM but I was mostly interested in trying out ideas and not looking into building a full application.

My main takeaway from this experience is that it’s worth exploring single-process actor libraries. Working through the implementation let me hash out some of my ideas about actors and generated a few more. The source code is available in this Github repo.

Warnings:

  • I have not used the actor model or Scala in production before. One of the reasons for going through this was to get some better intuitions about the actor model.
  • This post gets disjointed at some points since I was working through a lot of ideas and there wasn’t really a common thread to tie them together.
  • The default Jekyll theme makes h1 24px and h2 32px and that’s confusing

Still here? Well then let’s go through the main parts of the implementation and the ideas I worked through.

Request-response messages

ZIO Actors has a nice description of the actor model. I’m mostly interested in two pieces:

  • Actors process one message at a time
  • Actors have supervising parent actors that handle faults by restarting supervised child actors.

A ZIO Queue is perfect for processing one message at a time. Before processing the messages I wanted an easy way to send a message to an actor and get a response back.

Since we’re sticking to a single-process we can use a ZIO Promise. With multiple processes we would need a way of matching up a remote reply back to the promise.

package com.slopezerosolutions.zioactortest

import zio._
import zio.test._

class ActorSystemSpec extends zio.test.junit.JUnitRunnableSpec {

  def spec: Spec[Any, Throwable] = suite("ActorSystem tests")(
    test("can send a message") {
      val actorSystem = new ActorSystem

      val sendMessageZIO = for{
        resultPromise <- Promise.make[Throwable, String]
        destination <- ZIO.succeed(actorSystem.promiseMessageDestination(resultPromise))
        _ <- actorSystem.send("Hello world", destination)
        result <- resultPromise.await
      } yield result
      assertZIO(sendMessageZIO)(Assertion.equalTo("Hello world"))
    }
  )
}

The next issue to handle is if the reply needs to be translated before we can process it.

In a normal request-response call we can wait for the response and translate it there. In the actor style, the response should come back to the actor’s main inbox.

The problem is that the receiver should usually respond in the receiver’s format, not in the sender’s format. If a CashRegisterActor sends a PaymentActor a Payment message then:

  1. PaymentActor can respond with a CashRegisterActor format message, but this makes PaymentActor harder to reuse
  2. PaymentActor can respond with a PaymentActor format message, but this makes CashRegisterActor’s message format harder to read.

Akka recommends the Adapted Response Pattern to solve this issue. The actor registers message adapters when it’s initialized and uses these adapter addresses as reply references.

In the single-process case it’s easier to generate adapters on demand. For example, if we want to retry a payment, we could include an internal request id in the adapter so the actor can distinguish between multiple payment responses.

    test("can send a message to an adapted destination") {
      val actorSystem = new ActorSystem

      val sendMessageZIO = for {
        resultPromise <- Promise.make[Throwable, Int]
        destination <- ZIO.succeed(actorSystem.promiseMessageDestination(resultPromise))
        adapterDestination <- ZIO.succeed(actorSystem.adaptedMessageDestination(
          (stringValue:String) => stringValue.length,
          destination))
        _ <- actorSystem.send("Hello world", adapterDestination)
        result <- resultPromise.await
      } yield result
      assertZIO(sendMessageZIO)(Assertion.equalTo(11))
    }

In the initial implementation I implemented a send message method in ActorSystem. This ended up cluttering the code so I changed it later.

package com.slopezerosolutions.zioactortest

import zio._
class ActorSystem {

  def promiseMessageDestination[T](promise: Promise[Throwable, T]): MessageDestination[T] = {
    PromiseMessageDestination(promise)
  }

  def adaptedMessageDestination[I,O](adapter: I => O, messageDestination: MessageDestination[O]): MessageDestination[I] = {
    AdaptedMessageDestination(adapter, messageDestination)
  }

  def send[T](message: T, messageDestination: MessageDestination[T]): UIO[Boolean] = {
    messageDestination match {
      case PromiseMessageDestination(promise) => promise.succeed(message)
      case AdaptedMessageDestination(adapter, messageDestination) => {
        val adaptedMessage = adapter.apply(message)
        send(adaptedMessage, messageDestination)
      }
    }
  }
}

Creating actors and talking to them

To create actors I started with the simplest case, a lambda that takes in one message. Because this actor only knows about the message it has received, it can only reply to the sender.

case class PingMessage(replyTo: MessageDestination[String])

test("Can send a message to an actor and receive a reply") {
  val sendMessageZIO = for {
    actorSystem <- ActorSystem.initialize
    actorMessageDestination <- actorSystem.startActor((pingMessage: PingMessage) => for {
      result <- actorSystem.send("Pong!", pingMessage.replyTo)
    } yield result)
    resultPromise <- Promise.make[Throwable, String]
    destination <- ZIO.succeed(actorSystem.promiseMessageDestination(resultPromise))
    result <- actorSystem.send(PingMessage(destination), actorMessageDestination)
    promiseResult <- resultPromise.await
  } yield promiseResult
  assertZIO(sendMessageZIO)(Assertion.equalTo("Pong!"))
}

ZIO Queue makes this implementation easy.

def startActor[T](handler: T => Task[Boolean]): UIO[MessageDestination[T]] = for {
  actorId <- ZIO.succeed(java.util.UUID.randomUUID().toString)
  inbox <- zio.Queue.bounded[T](100)
  actor <- ZIO.succeed(new Actor(inbox))
  _ <- actorLoop(actor, handler).forkDaemon
  _ <- STM.atomically {
    actors.put(actorId, actor.asInstanceOf[Actor[Nothing]])
  }
} yield ActorMessageDestination[T](actorId)

private def actorLoop[T](actor: Actor[T], handler: T => Task[Boolean]): Task[Boolean] = {
  val handleMessage: Boolean => Task[Boolean] = (state: Boolean) => for {
    message <- actor.inbox.take
    _ <- handler(message)
  } yield true
  val function: Boolean => Boolean = _ != false
  for {
    _ <- ZIO.loopDiscard[Any,Throwable,Boolean](true)(function, identity)(handleMessage)
  } yield true
}

I stored the actors in transactional memory instead of a plain concurrent data structure. I wanted to implement actor restarts using memory transactions so I started with them at the beginning.

  private def registerActor[T](actor: Actor[T], actors: TMap[String, Actor[Nothing]]): Task[MessageDestination[T]] = for {
    actorId <- ZIO.succeed(java.util.UUID.randomUUID().toString)
    _ <- STM.atomically {
      actors.put(actorId, actor.asInstanceOf[Actor[Nothing]])
    }
  } yield ActorMessageDestination[T](actorId, this)

So far so good, but the implementation is not very useful. The nice thing about the actor references that are returned from actor creation is that they are typed and easy to work with. It’s harder to keep the type signatures if we look up the references using a route string.

Finding actors to talk to

At this point I searched around for more information about actor implementations and found this nice quote in a Clojure article about state and the actor model.

It doesn’t let you fully leverage the efficiencies of being in the same process. It is quite possible to efficiently directly share a large immutable data structure between threads, but the actor model forces intervening conversations and, potentially, copying. Reads and writes get serialized and block each other, etc.

Since I was already optimizing for the single-process case, I decided to try using shared memory to lookup actors.

The problem is that actors are created during the initialization phase, but then messages from the outside world have to lookup these actors again.

In the multi-process case it makes sense to send a message to lookup an actor but I wanted to try using shared memory in the single-process case.

A simple example is a card game server:

  • players create games
  • each game is managed by an actor
  • players send commands to game actors they created

In the single-process case we could create game actors directly and store them in a concurrent data structure. Instead of this, I implemented a game supervisor to experiment with the bootstrapping/initialization experience for actors.

When the actor system is initialized, it’s easy to lose track of the actor references that are created. One option is to wrap the actor system in an object that has references to important actors but then it would be nice if the actors could also see this directory of important actors.

Instead of wrapping the actor system I added an initialization method that stores the initial actor set in a directory data structure.

It’s easier to preserve type signatures when there’s a fixed set of actors.

case class GameDirectory(blackjackSupervisor: Option[MessageDestination[BlackjackSupervisorMessage]],
                         pokerSupervisor: Option[MessageDestination[PokerSupervisorMessage]])
test("Creates a fixed set of actors") {
  val initializeZIO = ActorSystem.initialize(GameDirectory(None, None), List(
    new ActorInitializer[GameDirectory] {
      override type MessageType = BlackjackSupervisorMessage

      override def actorTemplate: Task[ActorTemplate[BlackjackSupervisorMessage]] = {
        val value = ActorTemplate.handler((message: MessageType) => ZIO.succeed(true))
        ZIO.succeed(value)
      }

      override def injectActorReference(messageDestination: MessageDestination[BlackjackSupervisorMessage], directory: GameDirectory): GameDirectory = {
        directory.copy(blackjackSupervisor = Some(messageDestination))
      }
    }
  ))
  val testZIO = for {
    actorSystem <- initializeZIO
    resultPromise <- Promise.make[Throwable, String]
    destination <- ZIO.succeed(actorSystem.promiseMessageDestination(resultPromise))
    directory <- actorSystem.directory
    _ <- actorSystem.send(BlackjackSupervisorMessage("Hello", replyTo = destination),
      directory.blackjackSupervisor.get)
  } yield true
  assertZIO(testZIO)(Assertion.equalTo(true))
}

I wanted to have a way for the actor system to pass the directory down to actors but didn’t end up implementing it. The type signatures are tough to get right.

With this basic directory there’s a way to get top level supervisor actors. It doesn’t help with child actors but there’s more on that later.

Supervising actors

Now that there is a game supervisor actor, we can send it a message to start a game actor.

private def blackjackSupervisor(gameHandler: ActorTemplate[BlackjackGameMessage]) = {
  new ActorInitializer[GameDirectory] {
    override type MessageType = BlackjackSupervisorMessage

    override def actorTemplate: Task[ActorTemplate[BlackjackSupervisorMessage]] = {
      val value = ActorTemplate.handler((actorService: ActorService,
                                         message: BlackjackSupervisorMessage) =>
        message match {
          case StartBlackJackGameMessage(replyTo) =>
            for {
              blackjackActor <- actorService.startActor(gameHandler)
              _ <- actorService.send(StartedBlackJackGameMessage(blackjackActor), replyTo)
            } yield true
          case _ => ZIO.succeed(true)
        })
      ZIO.succeed(value)
    }

    override def injectActorReference(messageDestination: MessageDestination[BlackjackSupervisorMessage],
                                      directory: GameDirectory): GameDirectory = {
      directory.copy(blackjackSupervisor = Some(messageDestination))
    }
  }
}

To implement this, the actor’s message handler signature had to be changed.

private def createActor[T](actorTemplate: ActorTemplate[T], parentActor: Option[String]): Task[ActorState[T]] = {
  val actorId = java.util.UUID.randomUUID().toString
  val actorSystem = this
  val actorService = new ActorService {
    override def startActor[M](template: ActorTemplate[M]): Task[MessageDestination[M]] = {
      for {
        actor <- createActor(template, Some(actorId))
        actorMessageDestination <- registerActor(actor)
      } yield actorMessageDestination
    }
  }
  actorTemplate match {
    case HandlerActorTemplate(handler) =>
      for {
        inbox <- zio.Queue.bounded[T](100)
        actor <- ZIO.succeed(new Actor(actorId, inbox))
        actorFibre <- if (parentActor.isEmpty)
          ActorSystem.actorLoop(actorService, actor, handler).forkDaemon
        else
          ActorSystem.actorLoop(actorService, actor, handler).fork
      } yield ActorState(phase = ActorState.Running(), parent = parentActor, children = Set(), actor = actor, fiber = actorFibre)
  }
}

When the game supervisor actor creates a child game actor, this child relationship should be registered internally. To hold on to child information and for some other bookkeeping I created an ActorState class.

Considering actor restarts, there’s a window here where the actor is in the process of stopping or restarting but it’s still processing a message. I should handle the case where this processing creates a child actor but it’s tough to think about so I skipped it.

private def registerActor[T](actorState: ActorState[T]): Task[MessageDestination[T]] = for {
  actorId <- ZIO.succeed(actorState.actor.actorId)
  _ <- STM.atomically {
    for {
      _ <- if (actorState.parent.isDefined)
        for {
          actorRefOption <- actors.get(actorState.parent.get)
          _ <- actorRefOption match {
            case Some(actorRef) => actorRef.update(actorState => {
              // Parent could be stopped or restarting
              actorState.copy(children = actorState.children + actorId)
            })
            case None => STM.succeed(())
          }
        } yield ()
      else
        STM.succeed(())
      actorStateRef <- TRef.make(actorState.asInstanceOf[ActorState[Nothing]])
      _ <- actors.put(actorId, actorStateRef)
    } yield ()
  }
} yield new ActorMessageDestination[T](actorId, this) {
  def send(message: T): Task[Boolean] = for {
    actorOption <- STM.atomically(actors.get(this.actorId))
    actorRef <- ZIO.getOrFail(actorOption)
    actorState <- STM.atomically(actorRef.get)
    _ <- actorState.actor.asInstanceOf[Actor[T]].inbox.offer(message)
  } yield true
}

I could have used ZIO Concurrent Refs at this point but transactional memory will be used for restarts later.

Actors that remember things

The game supervisor can create game actors but these game actors don’t have built in state. We could use an external variable or Ref to hold this state but having some kind of immutable state is a natural use case.

I’m guessing that it’s common to process messages that don’t change the actor state so I made updating the actor state explicit.

private def dealerGameActor = {
  ActorTemplate.stateful(()=> BlackjackTable(player1Hand = List()),
    (actorSystem, message: BlackjackGameMessage, table: BlackjackTable) => message match {
    case ShowHand(replyTo) => for {
      _ <- replyTo.send(Hand(table.player1Hand))
    } yield StatefulActor.Continue()
    case Hit(replyTo) => for {
      newTable <- ZIO.attempt {
        table.copy(player1Hand = Card("Hearts", "Queen") +: table.player1Hand)
      }
      _ <- replyTo.send(Hand(newTable.player1Hand))
    } yield StatefulActor.UpdateState(newTable)
    case _ => ZIO.succeed(StatefulActor.Continue())
  })
}
test("Can send messages to a stateful game actor created by the supervisor") {
  val initializeZIO = ActorSystem.initialize(GameDirectory(None, None), List(
    blackjackSupervisor(dealerGameActor),
  ))
  val testZIO = for {
    actorSystem <- initializeZIO
    directory <- actorSystem.directory
    gameStarted <- startBlackjackGame(directory).flatMap(_.await)
    gameActor = gameStarted.get
    gameReply1 <- MessageDestination.promise[BlackjackGameMessage](destination => {
      gameActor.send(Hit(destination))
    }).flatMap(_.await)
    gameReply2 <- MessageDestination.promise[BlackjackGameMessage](destination => {
      gameActor.send(Hit(destination))
    }).flatMap(_.await)
    gameReply3 <- MessageDestination.promise[BlackjackGameMessage](destination => {
      gameActor.send(Hit(destination))
    }).flatMap(_.await)
  } yield List(gameReply1,gameReply2, gameReply3)

  val expectedHand1 = Hand(
    hand = List(Card(
      suit = "Hearts",
      rank = "Queen"
    ))
  )
  val expectedHand2 = Hand(
    hand = List(Card(
      suit = "Hearts",
      rank = "Queen"
    ), Card(
      suit = "Hearts",
      rank = "Queen"
    ))
  )
  val expectedHand3 = Hand(
    hand = List(Card(
      suit = "Hearts",
      rank = "Queen"
    ), Card(
      suit = "Hearts",
      rank = "Queen"
    ), Card(
      suit = "Hearts",
      rank = "Queen"
    ))
  )
  assertZIO(testZIO)(Assertion.equalTo(List(expectedHand1,
    expectedHand2,
    expectedHand3)))
}

Stateful actors were implemented by adding a new actor loop. It was kind of nice to have separate implementations for the stateless and stateful case. Doing it this way also meant that I didn’t have to change my previous tests. It probably makes sense to have only one implementation in general but this makes it easier to experiment with different types of actor creation APIs.

  private def createActorState[T](actorCreator: ActorService, actorTemplate: ActorTemplate[T], daemonFibre: Boolean): Task[Actor[T]] = actorTemplate match {
    case HandlerActorTemplate(handler) =>
      for {
        inbox <- zio.Queue.bounded[T](100)
        actor <- ZIO.succeed(new Actor(inbox))
        _ <- if (daemonFibre)
          actorLoop(actorCreator, actor, handler).forkDaemon
        else
          actorLoop(actorCreator, actor, handler).fork
      } yield actor
    case template: StatefulActorTemplate[T] => for {
      inbox <- zio.Queue.bounded[T](100)
      actor <- ZIO.succeed(new Actor(inbox))
      initialState = template.initialStateSupplier.apply()
      handler  = template.handler
      _ <- if (daemonFibre)
        statefulActorLoop(actorCreator, actor, initialState, handler).forkDaemon
      else
        statefulActorLoop(actorCreator, actor, initialState, handler).fork
    } yield actor
  }

  private def actorLoop[T](actorCreator: ActorService, actor: Actor[T], handler: (ActorService, T) => Task[Boolean]): Task[Boolean] = {
    val handleMessage: Boolean => Task[Boolean] = (state: Boolean) => for {
      message <- actor.inbox.take
      _ <- handler(actorCreator, message)
    } yield true
    for {
      _ <- ZIO.iterate[Any, Throwable, Boolean](true)(_ != false)(handleMessage)
    } yield true
  }

  private def statefulActorLoop[S,T](actorCreator: ActorService, actor: Actor[T], state: S, handler: (ActorService, T, S) => Task[StatefulActor.Result[S]]): Task[Boolean] = {
    val handleMessage = for {
      message <- actor.inbox.take
      result <- handler(actorCreator, message, state)
    } yield result
    handleMessage.flatMap {
      case StatefulActor.Continue() => statefulActorLoop(actorCreator, actor, state, handler)
      case StatefulActor.UpdateState(newState) => statefulActorLoop(actorCreator, actor, newState, handler)
    }
  }

Waking up sleeping actors

Having in-memory actors is nice because they can hold on to some state without loading it from disk or a database.

The problem is that we want to restore these actors from storage to memory after a restart or if they are shut down due to inactivity.

In this test case, we have a 3-level supervision structure for a simple home automation/monitoring system:

  • The (singleton) houseSupervisor actor supervises all house actors
  • Many house actors supervise their own set of temperatureSensor actors
  • temperatureSensor actors just receive sensor measurements with the current temperature and keep track of the max temperature.

There are few reasons why we might want to activate actors on demand and deactivate them when they haven’t received messages for a while. We might expect a lot of sensor owners to turn off their sensors for long periods of time, some owners might buy newer versions and stop using old ones. Also, in a prototype stage, we might just restart the server and let the system reactivate sensors when it receives messages from them.

test("Reactivates a persisted temperature sensor actor and sends it a message") {
  val houseDatabase: List[House] = List(House("123GreenStreet"))
  val temperatureSensorsDatabase: List[TemperatureSensor] = List(TemperatureSensor("123GreenStreet", "MainFloorSensor"))

  val temperatureActorTemplate = ActorTemplate.stateful[TemperatureSensorCommand, Double](() => Double.MinValue, (actorService, message, maxTemp) => message match {
    case RecordTemperature(celsius, maxTempReply) =>
      for {
        newMaxTemp <- ZIO.succeed(Math.max(celsius, maxTemp))
        _ <- maxTempReply.send(MaxTemperature(newMaxTemp))
      } yield StatefulActor.UpdateState(newMaxTemp)
    case _ => ZIO.succeed(StatefulActor.Continue())
  })

  val homeDirectoryInitializer = houseWithSensorsSupervisor(houseDatabase,
    temperatureSensorsDatabase,
    temperatureActorTemplate)
  for {
    actorSystem <- ActorSystem.initialize(HomeAutomationDirectory(None, None), List(
      homeDirectoryInitializer
    ))
    sensorId = "MainFloorSensor"
    temperatureSensorOption <- getOrActivateTemperatureSensor(sensorId, actorSystem, temperatureSensorsDatabase)
    temperatureSensor = temperatureSensorOption.get
    directory <- actorSystem.directory 
    maxTemp1 <- MessageDestination.promise[MaxTemperature](destination => {
      temperatureSensor.send(RecordTemperature(21, destination))
    }).flatMap(_.await)
    maxTemp2 <- MessageDestination.promise[MaxTemperature](destination => {
      temperatureSensor.send(RecordTemperature(23, destination))
    }).flatMap(_.await)
    maxTemp3 <- MessageDestination.promise[MaxTemperature](destination => {
      temperatureSensor.send(RecordTemperature(20, destination))
    }).flatMap(_.await)
  } yield assertTrue(maxTemp1.celsius == 21 &&
    maxTemp2.celsius == 23 &&
    maxTemp3.celsius == 23)
}

In the test case I’m trying to model a situation where the houses and temperature sensor configuration is stored in some sort of database. In a real implementation we could probably manage this configuration with a non-actor web application or service.

The key here is how we know if the temperature sensor actor we’re trying to send a message to is inactive. As part of the experiment, I decided to use a transactional map to store this route to the sensor. The nice thing about this is that if the sensor actor is active, then recording a temperature just requires one memory lookup and one message send.

If the sensor’s associated house actor is not active, it also needs to activate that.

  private def houseWithSensorsSupervisor(houseDatabase: List[House],
                                         temperatureSensorsDatabase: List[TemperatureSensor],
                                         temperatureSensorTemplate: ActorTemplate[TemperatureSensorCommand]) = {
    val houseActorTemplate = (houseRoutes: HouseRoutes) => ActorTemplate.handler((actorService: ActorService, houseCommand: HouseCommand) => houseCommand match {
      case GetHouseSummary(replyTo) => replyTo.send(HouseSummary())
      case ActivateTemperatureSensor(temperatureSensorId, replyTo) => for {
        temperatureSensorOption <- ZIO.succeed(temperatureSensorsDatabase.find(_.sensorId == temperatureSensorId))
        _ <- if (temperatureSensorOption.isEmpty)
          replyTo.send(None)
        else
          for {
            temperatureActor <- actorService.startActor(temperatureSensorTemplate)
            _ <- STM.atomically(houseRoutes.temperatureSensorRoutes.routes.put(temperatureSensorId, temperatureActor))
            _ <- replyTo.send(Some(temperatureActor))
          } yield ()
      } yield true
    })
    new ActorInitializer[HomeAutomationDirectory] {
      override type MessageType = HouseSupervisorMessage

      override def initialize: Task[(ActorTemplate[HouseSupervisorMessage], (MessageDestination[HouseSupervisorMessage], HomeAutomationDirectory) => Task[HomeAutomationDirectory])] = {
        for {
          temperatureSensorMap <- STM.atomically(TMap.empty[String, MessageDestination[TemperatureSensorCommand]])
          temperatureSensorRoutes = TemperatureSensorRoutes(temperatureSensorMap)
          routes <- STM.atomically(TMap.empty[String, MessageDestination[HouseCommand]])
          houseRouteObject = HouseRoutes(routes, temperatureSensorRoutes)
          value = ActorTemplate.stateful(() => houseRouteObject,
            (actorService: ActorService, message: HouseSupervisorMessage, houseRoutes: HouseRoutes) => {
              message match {
                case ActivateHouse(houseId, replyTo) => for {
                  lookupHouse <- ZIO.succeed(houseDatabase.find(_.houseId == houseId))
                  _ <- lookupHouse match {
                    case Some(house) =>
                      for {
                        houseActor <- actorService.startActor(houseActorTemplate(houseRoutes))
                        _ <- STM.atomically(routes.put(houseId, houseActor))
                        _ <- replyTo.send(Some(houseActor))
                      } yield true
                    case None => replyTo.send(None)
                  }

                } yield StatefulActor.Continue()
              }
            }
          )
        } yield (value, (houseSupervisor, directory) => ZIO.succeed(directory.copy(
          homeSupervisor = Some(houseSupervisor),
          houseRoutes = Some(houseRouteObject)
        )))
      }
    }
  }

From the incoming message side, every time the message comes in we need to check if the sensor actor is active. If the sensor actor isn’t active, the house actor may also need to be activated. It’s nice to be in a single process here because finding out if an actor is active is faster.

private def getOrActivateTemperatureSensor(temperatureSensorId: String,
                                           actorSystem: ActorSystem[HomeAutomationDirectory],
                                           temperatureSensorDatabase: List[TemperatureSensor]): Task[Option[MessageDestination[TemperatureSensorCommand]]] = {
  for {
    directory <- actorSystem.directory
    maybeTemperatureSensorRoutes = directory.houseRoutes
      .map(_.temperatureSensorRoutes)
    temperatureActorOption <- maybeTemperatureSensorRoutes.get
      .temperatureActorOption(temperatureSensorId)
    temperatureSensorOption <- temperatureActorOption match {
      case Some(temperatureSensor) => ZIO.succeed(Some(temperatureSensor))
      case None => for {
        temperatureSensor <- ZIO.succeed(temperatureSensorDatabase.find(_.sensorId == temperatureSensorId))
        sensorOption <- temperatureSensor match {
          case Some(sensor) => for {
            houseActorOption <- getOrActivateHouse(sensor.houseId, actorSystem)
            sensor <- MessageDestination.promise[Option[MessageDestination[TemperatureSensorCommand]]](destination =>
              houseActorOption.get.send(ActivateTemperatureSensor(temperatureSensorId, destination))
            ).flatMap(_.await)

          } yield sensor
          case None => ZIO.succeed(None)
        }
      } yield sensorOption
    }
  } yield temperatureSensorOption
}

private def getOrActivateHouse(houseId: String, actorSystem: ActorSystem[HomeAutomationDirectory]): Task[Option[MessageDestination[HouseCommand]]] = {
  for {
    directory <- actorSystem.directory
    houseActorOption <- directory.houseRoutes.get.houseActorOption(houseId)
    houseActorActivateOption <- houseActorOption match {
      case Some(houseActor) => ZIO.succeed(Some(houseActor))
      case None => MessageDestination.promise[Option[MessageDestination[HouseCommand]]](destination =>
        directory.homeSupervisor.get.send(ActivateHouse(houseId, destination))
      ).flatMap(_.await)
    }
  } yield houseActorActivateOption
}

Using a shared data structure to store these routes doesn’t fit the actor model, but this was an interesting experiment to try out.

Waking up is hard to do

With a deep supervision hierarchy, waking up a descendant actor that is down a few levels requires waking up ancestor actors up the chain. Having a flatter hierarchy means less levels to traverse.

Another question is if an ancestor actor wakes up, should it wake up child actors eagerly ahead of time or lazily as messages come in. If the ancestor has a large number of descendants then a mix of eager activation and lazy activation will probably be the best.

Talking through the supervisor

The safest way to communicate with child actors is to route messages through the supervisor. I’m not sure what the performance/latency overhead of this ends up looking like but a lot of use cases don’t need that level of safety.

At a minimum it seems like the top-level actor’s inbox could fill up quickly if it’s forwarding all the messages for all its descendant actors. Any blocking action in a supervising actor would hold up messages for descendant actors.

Users playing an interactive game could probably tolerate some message loss/latency while a game actor initializes in exchange for better latency while playing the game.

A home automation system could probably miss some sensor readings while initializing its actors. Consumer device users are likely to put away a device for long periods of time and start them up again randomly.

Maintaining supervised actor collections in the supervisor

Supervised actors are tracked by the actor system, but it looks like it’s common practice to manually keep track of supervised actors in the actor state at the same time.

One issue is if an exception is thrown in the message handler after the supervised actor is created then this actor might not be tracked in the supervisor’s state. This might not cause any logic bugs but these untracked actors could leak resources.

I thought of experimenting with different ways of maintaining the collection of child actors but my examples were too small to get any insight from doing this.

Restarting actors with software transactional memory

After I had basic actor hierarchies implemented and worked through activating actors from a persistent store, the last thing to do was handle actor restarts.

I implemented the basic version of supervised actor restarts where all descendant actors are stopped. Since the child actor ZIO fibers are child fibers of the parent actor ZIO fibers

  def restartActor[T](messageDestination: MessageDestination[T]): Task[Option[ActorState[T]]] = messageDestination match {
    case actorDestination: ActorMessageDestination[T] => for {
      _ <- suspendActorById(actorDestination.actorId, ActorState.Suspended())
      suspendedActorState <- STM.atomically {
        for {
          actorStateOptionRef <- actors.get(actorDestination.actorId)
          actorStateRef = actorStateOptionRef.get
          actorState <- actorStateRef.get
        } yield actorState
      }
      // The restarted actor fiber is a child of the fiber that calls restart, this may not be the parent
      newActorState <- createActor(suspendedActorState.actorTemplate, suspendedActorState.parent, suspendedActorState.actor.actorId)
      _ <- STM.atomically {
        for {
          actorStateOptionRef <- actors.get(actorDestination.actorId)
          actorStateRef = actorStateOptionRef.get
          _ <- actorStateRef.update(actorState => {
            if (actorState.phase == ActorState.Suspended())
              newActorState
            else
              actorState

          })
        } yield ()
      }
    } yield Some(newActorState.asInstanceOf[ActorState[T]])
    case _ => ZIO.succeed(None)
  }

Here I use ActorTemplate to create a new ZIO fiber for the restarted actor. It creates a new ZIO queue for the new inbox, but it could easily reuse the old one.

ActorTemplate isn’t a great name for what this class does because it has all the initial state of the actor. So if there was some kind of entity id associated with this actor, the ActorTemplate will always make an actor with the same entity id.

Transactional memory feels like a good way to get a lot of the corner cases here right earlier.

I used an initial transaction to mark the actor as suspended before doing the real restart work to prevent concurrent restarts. I’m not sure how that would work out in production but it was nice that it was easy to do.

The code to suspend the actor is also used for a full actor stop, all child actors are stopped which should recursively stop any descendants.

  private def suspendActorById(actorId: String, nextPhase: ActorState.Phase): Task[Option[ActorState[Nothing]]] =
    for {
      actorState <- STM.atomically {
        for {
          actorStateOptional <- actors.get(actorId)
          updatedState <- actorStateOptional match {
            case Some(actorStateRef) => for {
              actorState <- actorStateRef.get
              newActorState = actorState.copy(phase = nextPhase)
              _ <- actorStateRef.set(newActorState)
            } yield Some(newActorState)
            case None => STM.succeed(None)
          }
        } yield updatedState
      }
      _ <- actorState match {
        case Some(actorState) => (for {
          _ <- actorState.fiber.interrupt
          _ <- actorState.actor.inbox.shutdown
        } yield ())
        case None => ZIO.succeed(())
      }
      _ <- ZIO.foreachDiscard(actorState.toList.flatMap(_.children))(childActorId => for {
        _ <- suspendActorById(childActorId, ActorState.Stopped())
        _ <- STM.atomically {
          actors.delete(childActorId)
        }
      } yield ())
    } yield actorState

Top-level actors

It’s standard advice but this implementation showed me the value of having a limited set of solid top-level supervisor actors. It makes things a lot easier when you can assume that there’s a set of actors that will always be there, always running.

I think that top-level actors should never restart or stop, and if they have to it’s probably better to restart the whole actor process instead of trying to handle this case everywhere.

Are actor restarts useful?

Going through the exercise of implementing restarts made me think about the benefits of using this fault tolerance mechanism. Actors can easily have large number of supervised descendant actors. Even though ZIO fibers are lightweight, recreating 1000 or 10,000 descendant actors should probably be a rare event.

It seems like the main faults that restarts would help with are:

  • The message handler is stuck on a blocking operation
  • The message handler is waiting for a message that will never come (message deadlock)

These situations could probably be solved with restarts that don’t stop child actors. Restarts that stop child actors are probably best when they’re limited to actors that don’t have a lot of descendants. In that case it could save a lot of time chasing down rare and heisenbug faults.

Invalidating actor references

If restarting an actor stops all its descendant actors then any references to those descendants become invalid. Going back to the earlier section, this is ok if all messages are forwarded from supervisor actors to descendant actors. Unfortunately it kind of limits opportunities for caching actor references.

Missing stuff

I implemented most of the things that I wanted to try out but few things I missed come to mind:

  • Dead letter mailboxes
  • Actor inbox sizing and overflow behavior
  • Sending exception errors to the supervisor
  • Actors that can prevent themselves from stopping or restarting

Future ideas for actor implementations

Implementing actor messaging, actor supervision and actor restarts worked through most of the ideas I wanted to try out. I tried out shared memory for managing actor routing and it looked promising.

Still, at the end of the experiment I got a few more ideas that I don’t think I’ll get to try.

Communicating between different servers

Even when using a single process actor library, with enough scale, multiple servers will be required at some point.

For some applications it might be useful to make messaging between actor systems on different processes explicit.

For example if actor processes could only talk to each other through grpc then this communication could be consolidated in adapter/bridge actors that just handle grpc calls. An advantage of doing it this way is that only the adapter/bridge has to communicate with serializable messages, the rest of the system can use values like lambdas in their messages.

This would also open up some options for plugging in different sharding and clustering methods for different use cases.

For some cases you want to be ensure that only one actor associated with an entity is running at one time, a game server fits this use case. In other cases it could be ok to have multiple actors associated with an entity running for short periods of time as long as it eventually settles down to one actor. Some simple sensor applications might be ok with multiple actors running for one entity for short interval.

Breaking the actor model

Based on what I’ve read, most applications that use the actor model have to break out of it in some way. Erlang applications are the exception but on the JVM it seems like using a non-actor library for API calls or database access is bound to happen at some point.

Given that two other cases came to mind where it might be worth breaking out of the actor model.

Stateless message processors

Code that wakes up actors, and routes messages to actors doesn’t benefit from single-threaded state. It seems reasonable to have message processors that are not single-threaded (single-fibered?). These processors could be stateless in the REST service sense.

If we have to wake up a sensor actor, and this requires a database call, we should be able to do a blocking database lookup in the wakeup case. A multi-fibered router/wakeup handler could do the blocking call while handling the active sensor case in separate fibers.

Akka recommends using multiple actors to handle this case, but it seems like a common use case where we could break out of the actor model and have things that process messages concurrently.

Actors and blackboards

The actor model is at its best with messages that change state. Within a message handler you’re free to do any state updates. For read heavy data, it seems like a version of the blackboard model would be useful to reduce the number of request-reply messages.

Even in the multi-process case, if there’s enough tolerance for latency, large blackboards could be synchronized between servers using CRDTs.

Hiding supervisor hierarchies

I have a feeling that supervisor hierarchies should be hidden. If the temperature sensor hierarchy is exposed as a route /houseSupervisor/house123/temperature then we might have to change a lot of code if we add another level of supervision /houseSupervisor/stateHouseSupervisorCA/house123/temperature.

On the other hand some routes like /house123/sensor456 might be useful for sensors that can move between houses, but this should also be decoupled from the supervision hierarchy.

Leftover ideas

  • Passing the directory to actors, including updates that happen while the system is running
  • Giving actors initialization/termination software transactions to register/deregister related routes
  • Basic performance testing with different supervision hierarchies/routing strategies
  • Actors need to be able to eagerly start other actors on restart
  • Backpressure when actors are activated from persistence or lazy activated
  • Actor timeouts for blocking operations
  • Scheduling “tick” messages for polling or time based state changes

Conclusions

TLDR, going through a toy implementation of actors was a useful exercise that left me with the following thoughts:

  • Single-process actor libraries offer interesting possibilities
  • If restarting an actor stops supervised/child actors, then child actor references are not stable
  • Consider shared memory or multiple copies of actor directories
    • Calling the phone operator vs mailing out phone books
    • CRDTs might be useful to maintain directory copies
  • It might be useful to separate actor routing concerns from the actor system
  • It might be useful to break out of the actor model and allow concurrent message processing for special cases like routing
  • Actors might be able to share read-heavy data using a blackboard model
  • ZIO is nice to work with
  • Software Transactional Memory is useful for control-plane functionality
    • The railway switching station vs the rails and trains
  • Working with Scala 3 was better than my previous experiences with earlier versions
    • Better compiler errors
    • More type system options
  • I revised this post a couple times but the ideas I was left with were not as organized as the ideas that I started with.
    • Thanks for reading!