akka.netakka.net-clusterakka.net-persistence

Akka.NET with persistence dropping messages when CPU in under high pressure?


I make some performance testing of my PoC. What I saw is my actor is not receiving all messages that are sent to him and the performance is very low. I sent around 150k messages to my app, and it causes a peak on my processor to reach 100% utilization. But when I stop sending requests 2/3 of messages are not delivered to the actor. Here is a simple metrics from app insights:

Number of requests sent

Number of messages received

To prove I have almost the same number of event persistent in mongo that my actor received messages.

Mongo

Secondly, performance of processing messages is very disappointing. I get around 300 messages per second. msgpersecond

I know Akka.NET message delivery is at most once by default but I don't get any error saying that message were dropped.

Here is code: Cluster shard registration:

 services.AddSingleton<ValueCoordinatorProvider>(provider =>
 {
   var shardRegion = ClusterSharding.Get(_actorSystem).Start(
                    typeName: "values-actor",
                    entityProps: _actorSystem.DI().Props<ValueActor>(),
                    settings: ClusterShardingSettings.Create(_actorSystem),
                    messageExtractor: new ValueShardMsgRouter());
                   return () => shardRegion;
 });

Controller:

    [ApiController]
    [Route("api/[controller]")]
    public class ValueController : ControllerBase
    {
        private readonly IActorRef _valueCoordinator;

        public ValueController(ValueCoordinatorProvider valueCoordinatorProvider)
        {
            _valueCoordinator = valuenCoordinatorProvider();
        }

        [HttpPost]
        public Task<IActionResult> PostAsync(Message message)
        {
            _valueCoordinator.Tell(message);
            return Task.FromResult((IActionResult)Ok());
        }
    }

Actor:

    public class ValueActor : ReceivePersistentActor
    {
        public override string PersistenceId { get; }
        private decimal _currentValue;

        public ValueActor()
        {
            PersistenceId = Context.Self.Path.Name;
            Command<Message>(Handle);
        }

        private void Handle(Message message)
        {
            Context.IncrementMessagesReceived();
            var accepted = new ValueAccepted(message.ValueId, message.Value);
            Persist(accepted, valueAccepted =>
            {
                _currentValue = valueAccepted.BidValue;
            });
        }

    }

Message router.

    public sealed class ValueShardMsgRouter : HashCodeMessageExtractor
    {
        public const int DefaultShardCount = 1_000_000_000;

        public ValueShardMsgRouter() : this(DefaultShardCount)
        {
        }

        public ValueShardMsgRouter(int maxNumberOfShards) : base(maxNumberOfShards)
        {
        }

        public override string EntityId(object message)
        {
            return message switch
            {
                IWithValueId valueMsg => valueMsg.ValueId,
                _ => null
            };
        }
    }

akka.conf

akka {  
     stdout-loglevel = ERROR
     loglevel = ERROR
     actor {
       debug {  
              unhandled = on
        }
        provider = cluster
         serializers {
              hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
         }
        serialization-bindings {
          "System.Object" = hyperion
         }
        deployment {
            /valuesRouter {
                router = consistent-hashing-group
                routees.paths = ["/values"]
                cluster {
                    enabled = on
                }
            }        
        }
     }
                        
     remote {
        dot-netty.tcp {
            hostname = "desktop-j45ou76"
            port = 5054
        }
     }          

     cluster {
        seed-nodes = ["akka.tcp://valuessystem@desktop-j45ou76:5054"] 
     }
persistence {
    journal {
        plugin = "akka.persistence.journal.mongodb"
        mongodb {
            class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"

            connection-string = "mongodb://localhost:27017/akkanet"

            auto-initialize = off
            plugin-dispatcher = "akka.actor.default-dispatcher"

            collection = "EventJournal"
            metadata-collection = "Metadata"
            legacy-serialization = off
        }
    }

    snapshot-store {
        plugin = "akka.persistence.snapshot-store.mongodb"
        mongodb {
            class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
            connection-string = "mongodb://localhost:27017/akkanet"
            auto-initialize = off
            plugin-dispatcher = "akka.actor.default-dispatcher"
            collection = "SnapshotStore"
            legacy-serialization = off
        }
    }
}     
}




Solution

  • So there are two issues going on here: actor performance and missing messages.

    It's not clear from your writeup, but I'm going to make an assumption: 100% of these messages are going to a single actor.

    Actor Performance

    The end-to-end throughput of a single actor depends on:

    1. The amount of work it takes to route the message to the actor (i.e. through the sharding system, hierarchy, over the network, etc)
    2. The amount of time it takes the actor to process a single message, as this determines the rate at which a mailbox can be emptied; and
    3. Any flow control that affects which messages can be processed when - i.e. if an actor uses stashing and behavior switching, the amount of time an actor spends stashing messages while waiting for its state to change will have a cumulative impact on the end-to-end processing time for all stashed messages.

    You will have poor performance due to item 3 on this list. The design that you are implementing calls Persist and blocks the actor from doing any additional processing until the message is successfully persisted. All other messages sent to the actor are stashed internally until the previous one is successfully persisted.

    Akka.Persistence offers four options for persisting messages from the point of view of a single actor:

    To get an idea as to how the performance characteristics of Akka.Persistence changes with each of these methods, take a look at the detailed benchmark data the Akka.NET organization has put together around Akka.Persistence.Linq2Db, the new high performance RDBMS Akka.Persistence library: https://github.com/akkadotnet/Akka.Persistence.Linq2Db#performance - it's a difference between 15,000 per second and 250 per second on SQL; the write performance is likely even higher in a system like MongoDB.

    One of the key properties of Akka.Persistence is that it intentionally routes all of the persistence commands through a set of centralized "journal" and "snapshot" actors on each node in a cluster - so messages from multiple persistent actors can be batched together across a small number of concurrent database connections. There are many users running hundreds of thousands of persistent actors simultaneously - if each actor had their own unique connection to the database it would melt even the most robustly vertically scaled database instances on Earth. This connection pooling / sharing is why the individual persistent actors rely on flow control.

    You'll see similar performance using any persistent actor framework (i.e. Orleans, Service Fabric) because they all employ a similar design for the same reasons Akka.NET does.

    To improve your performance, you will need to either batch received messages together and persist them in a group with PersistAll (think of this as de-bouncing) or use asynchronous persistence semantics using PersistAsync.

    You'll also see better aggregate performance if you spread your workload out across many concurrent actors with different entity ids - that way you can benefit from actor concurrency and parallelism.

    Missing Messages

    There could be any number of reasons why this might occur - most often it's going to be the result of:

    1. Actors being terminated (not the same as restarting) and dumping all of their messages into the DeadLetter collection;
    2. Network disruptions resulting in dropped connections - this can happen when nodes are sitting at 100% CPU - messages that are queued for delivery at the time can be dropped; and
    3. The Akka.Persistence journal receiving timeouts back from the database will result in persistent actors terminating themselves due to loss of consistency.

    You should look for the following in your logs:

    You'll usually see both of those appear together - I suspect that's what is happening to your system. The other possibility could be Akka.Remote throwing DisassociationExceptions, which I would also look for.

    You can fix the Akka.Remote issues by changing the heartbeat values for the Akka.Cluster failure-detector in configuration https://getakka.net/articles/configuration/akka.cluster.html:

    akka.cluster.failure-detector {
    
          # FQCN of the failure detector implementation.
          # It must implement akka.remote.FailureDetector and have
          # a public constructor with a com.typesafe.config.Config and
          # akka.actor.EventStream parameter.
          implementation-class = "Akka.Remote.PhiAccrualFailureDetector, Akka.Remote"
    
          # How often keep-alive heartbeat messages should be sent to each connection.
          heartbeat-interval = 1 s
    
          # Defines the failure detector threshold.
          # A low threshold is prone to generate many wrong suspicions but ensures
          # a quick detection in the event of a real crash. Conversely, a high
          # threshold generates fewer mistakes but needs more time to detect
          # actual crashes.
          threshold = 8.0
    
          # Number of the samples of inter-heartbeat arrival times to adaptively
          # calculate the failure timeout for connections.
          max-sample-size = 1000
    
          # Minimum standard deviation to use for the normal distribution in
          # AccrualFailureDetector. Too low standard deviation might result in
          # too much sensitivity for sudden, but normal, deviations in heartbeat
          # inter arrival times.
          min-std-deviation = 100 ms
    
          # Number of potentially lost/delayed heartbeats that will be
          # accepted before considering it to be an anomaly.
          # This margin is important to be able to survive sudden, occasional,
          # pauses in heartbeat arrivals, due to for example garbage collect or
          # network drop.
          acceptable-heartbeat-pause = 3 s
    
          # Number of member nodes that each member will send heartbeat messages to,
          # i.e. each node will be monitored by this number of other nodes.
          monitored-by-nr-of-members = 9
    
          # After the heartbeat request has been sent the first failure detection
          # will start after this period, even though no heartbeat mesage has
          # been received.
          expected-response-after = 1 s
    
        }
    

    Bump the acceptable-heartbeat-pause = 3 s value to something larger like 10,20,30 if needed.

    Sharding Configuration

    One last thing I want to point out with your code - the shard count is way too high. You should have about ~10 shards per node. Reduce it to something reasonable.