Suppose in application office
there are two EventSourcedBehavior
actors
trait OfficeFridgeCommand
case object OpenFridge extends OfficeFridgeCommand
case object CloseFridge extends OfficeFridgeCommand
trait OfficeFridgeEvent
case object FridgeOpened extends OfficeFridgeEvent
case object FridgeClosed extends OfficeFridgeEvent
trait OfficeCoffeeMachineCommand
case object MakeCoffee extends OfficeCoffeeMachineCommand
trait OfficeCoffeeMachineEvent
case object CoffeeMade extends OfficeCoffeeMachineEvent
val fridgeEntity =
EventSourcedBehavior[OfficeFridgeCommand, OfficeFridgeEvent, OfficeFridge]()
val frontDoorEntity =
EventSourcedBehavior[OfficeFrontDoorCommand,, OfficeFrontDoorEvent, OfficeFridge]()
val coffeeMachineEntity =
EventSourcedBehavior[OfficeCoffeeMachineCommand, OfficeCoffeeMachineEvent, OfficeFridge]()
Suppose that there was some action with the fridge and there were 1000 events registered with the fridge with various Persistence Ids [0-1000].
Such that the journal like:
ordering | persistence_id | event_ser_manifest |
---|---|---|
1 | 200 | FridgeOpened |
2 | 200 | FridgeClosed |
... | ... | ... |
500 | 500 | FridgeOpened |
... | ... | ... |
1000 | 501 | FridgeClosed |
If there was a GetCoffeeMachineState
message coming in with persistence id 500 to frontDoorEntity
actor. The frontDoorEntity
would attempt to replay the persistence_id = 500
journal events.
It would fail because it will not be able to cast OfficeFridgeEvent
into the OfficeCoffeeMachineEvent
actor (akka typed remember?).
Is this a common setup for this type of system. Or does every entity requires its own db that has an event journal with only "valid" type events that the actor accepts?
I am seeing this exact issue in my system right now. If someone (by accident) were to run 1000 of these queries I would have 1000 entity actors attempting to replay these events forever or until I restarted the pods.
What I end up having is infinite attempts to restart the entity actor with the following stack trace
at akka.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:200)
at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:98)
at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:73)
Caused by: java.lang.ClassCastException
which makes sense because a typed actor is attempting to process a different type events.
You said more than one entity (fridge
, front door
and coffe machine
in your example). Each entity replies to different commands and persist different events.
When you create EventSourcedBehavior
using EventSourcedBehavior.apply()
/**
* Create a `Behavior` for a persistent actor.
*
* @param persistenceId stable unique identifier for the event sourced behavior
* @param emptyState the intial state for the entity before any events have been processed
* @param commandHandler map commands to effects e.g. persisting events, replying to commands
* @param eventHandler compute the new state given the current state when an event has been persisted
*/
def apply[Command, Event, State](
persistenceId: PersistenceId,
emptyState: State,
commandHandler: (State, Command) => Effect[Event, State],
eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = {
val loggerClass = LoggerClass.detectLoggerClassFromStack(classOf[EventSourcedBehavior[_, _, _]], logPrefixSkipList)
EventSourcedBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler, loggerClass)
}
The first parameter is a PersistenceId. You are the one in charge to make that ID is unique. That object offers a factory method asking you for a hint and the entityId
/**
* Constructs a [[PersistenceId]] from the given `entityTypeHint` and `entityId` by
* concatenating them with `|` separator.
*
* Cluster Sharding is often used together with `EventSourcedBehavior` for the entities.
* The `PersistenceId` of the `EventSourcedBehavior` can typically be constructed with:
* {{{
* PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId)
* }}}
*
* That format of the `PersistenceId` is not mandatory and only provided as a convenience of
* a "standardized" format.
*
* Another separator can be defined by using the `apply` that takes a `separator` parameter.
*
* The `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
* in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
* you should use `""` as the separator.
*
* @throws IllegalArgumentException if the `entityTypeHint` or `entityId` contains `|`
*/
def apply(entityTypeHint: String, entityId: String): PersistenceId
As it is detailed in Event Sourcing - PersistenceId
Persistence Id
The PersistenceId is the stable unique identifier for the persistent actor in the backend event journal and snapshot store.
Cluster Sharding is typically used together with
EventSourcedBehavior
to ensure that there is only one active entity for eachPersistenceId
(entityId
). There are techniques to ensure this uniqueness, an example of which can be found in the Persistence example in the Cluster Sharding documentation. This illustrates how to construct thePersistenceId
from theentityTypeKey
andentityId
provided by the EntityContext.The
entityId
in Cluster Sharding is the business domain identifier of the entity. TheentityId
might not be unique enough to be used as thePersistenceId
by itself. For example two different types of entities may have the sameentityId
. To create a uniquePersistenceId
theentityId
should be prefixed with a stable name of the entity type, which typically is the same as theEntityTypeKey.name
that is used in Cluster Sharding. There arePersistenceId.apply
factory methods to help with constructing suchPersistenceId
from anentityTypeHint
andentityId
.
You can take akka persistence shopping cart sample - ShoppingCart Behavior as a good example
def apply(cartId: String): Behavior[Command] = {
EventSourcedBehavior[Command, Event, State](
PersistenceId("ShoppingCart", cartId),
State.empty,
(state, command) => ...,
(state, event) => ...
)
}
Your code should be something like
val fridgeEntity =
EventSourcedBehavior[OfficeFridgeCommand, OfficeFridgeEvent, OfficeFridge](
PersistenceId("Fridge"), UUID.randomUUID(),
Fridge.emptyState,
Fridge.commandHandler,
Fridge.eventHandler
)
val frontDoorEntity =
EventSourcedBehavior[OfficeFrontDoorCommand,, OfficeFrontDoorEvent, OfficeFridge](
PersistenceId("FrontDoor"), UUID.randomUUID(),
FrontDoor.emptyState,
FrontDoor.commandHandler,
FrontDoor.eventHandler
)
val coffeeMachineEntity =
EventSourcedBehavior[OfficeCoffeeMachineCommand, OfficeCoffeeMachineEvent, OfficeFridge](
PersistenceId("CoffeeMachine"), UUID.randomUUID(),
CoffeeMachine.emptyState,
CoffeeMachine.commandHandler,
CoffeeMachine.eventHandler
)
Once you persist an event for the entity in the journal, you would be able to see something like the following in the database
ordering | persistence_id | event_ser_manifest |
---|---|---|
1 | Fridge|<uuid> | FridgeOpened |
2 | Fridge|<uuid> | FridgeClosed |
... | ... | ... |
1 | CoffeMachine|<uuid> | CoffeeMade |
2 | CoffeMachine|<uuid> | CoffeeMade |
... | ... | ... |
1 | FrontDoor|<uuid> | FrontDoorOpended |
2 | FrontDoor|<uuid> | FrontDoorClosed |
As always, it depends. If you need persist millions of events per second and those events comes from millones of different devices, the answer will be NO. Akka persistence offers different persistence plugins
Each of them has its own pros and cons.
There are more plugins that you can find at scala index - akka persistence but they could be outdated, not having commercial support and else.