I am writing an implementation of the Akka.Persistence for Service Fabric, and I don't seem to be able to get the snapshotting working. When it attempts to recover state it gets the latest snapshot but it does not replay the events since the latest snapshot. Its not clear to me If I have simply not wired up the components correctly or if my Implementation of the persistence library is incorrect. My actor is a simple counter, my state is just the current count. I expect that the Recover should get called first and then the Recover would get called for each journal entry between the last snapshot and the highest sequence number. There is a function ReplayMessagesAsync(...) in the journal that looks like it should do this but it does not get called. The code for my counter is below, the rest of my code is: Code
using Akka.Actor;
using Akka.Persistence;
using Akka.Persistence.ServiceFabric.Journal;
using Akka.Persistence.ServiceFabric.Snapshot;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace AkkaPersistence.Actors
{
public class Counter : ReceivePersistentActor
{
public class GetCount { }
private int counter;
private CounterState State = new CounterState();
private int _msgsSinceLastSnapshot = 0;
public Counter()
{
Recover<Evt>(evt =>
{
State.Update(evt);
});
Recover<SnapshotOffer>(offer => {
var snapshotEntry = offer.Snapshot as SnapshotEntry;
if (snapshotEntry != null)
{
State = (CounterState)snapshotEntry.Snapshot;
}
});
Command<string>(str => Persist(str, s =>
{
++counter;
var evt = new Evt(s);
State.Update(evt);
if (++_msgsSinceLastSnapshot % 10 == 0)
{
//time to save a snapshot
SaveSnapshot(State.Copy());
}
}));
Command<GetCount>(get => Sender.Tell(State.Count));
Command<SaveSnapshotSuccess>(success =>
{
ServiceEventSource.Current.Message($"Saved snapshot");
DeleteMessages(success.Metadata.SequenceNr);
});
Command<SaveSnapshotFailure>(failure => {
// handle snapshot save failure...
ServiceEventSource.Current.Message($"Snapshot failure");
});
}
public override string PersistenceId
{
get
{
return "counter";
}
}
}
internal class CounterState
{
private long count = 0L;
public long Count
{
get { return count; }
set { count = value; }
}
public CounterState(long count)
{
this.Count = count;
}
public CounterState() : this(0)
{
}
public CounterState Copy()
{
return new CounterState(count);
}
public void Update(Evt evt)
{
++Count;
}
}
public class Evt
{
public Evt(string data)
{
Data = data;
}
public string Data { get; }
}
public class Cmd
{
public Cmd(string data)
{
Data = data;
}
public string Data { get; }
}
}
there were a couple of things I had wrong: 1) I needed to return what was passed in, not my SnapshotEntry which is an implementation detail of my persistence mechanism. 2) A simple miss as I translated from saving strings to attempting to save objects as part of the Journal. 3) Finally there was one more issue, that was the underlying issue, and that was, that the serialization was failing with child objects. In this piece of code I did not want to have to include the type of child object so instead I added a custom serializer (the Wire serializer) for the Journal as well as the already existing SnapshotSerializer and it is now working.