I'm using RavenDB to hold several thousand documents. The data comes from a daily xml feed which I'll update by running a C# console app. Below is the code that processes the feed to keep the database in sync with any changes. I've had quite a few problems with this so I'm wondering if I've picked the wrong strategy. Here are some important things to note.
Can anyone give me some pointers?
public void ProcessFeed(string rawXml)
{
XDocument doc = XDocument.Parse(rawXml);
var items = ExtractItemsFromFeed(doc).OrderBy(x => x.SourceId).Take(500);
using (IDocumentSession session = _store.OpenSession())
{
var dbItems = session.Query<AccItem>().OrderBy(x => x.SourceId).Take(500);
foreach (var item in items)
{
var existingRecord = dbItems.SingleOrDefault(x => x.SourceId == item.SourceId);
if (existingRecord == null)
{
session.Store(item);
_logger.Info("Saved new item {0}.", item.ShortName);
}
else
{
// update just one field for now
existingRecord.Village = item.Village;
_logger.Info("Updated item {0}.", item.ShortName);
}
}
session.SaveChanges();
}
}
Below is the code I ended up with for this. I think the initial problem with the original version was simply that I was trying to use the same session for every item, breaking the 30 limit.
Tipped off by some code on screen in a TekPub screencast I fixed this by batching the whole process into sets of 15 (to allow for one read and one write, so 30 requests in total per batch). This is pretty slow, but not nearly slow as I'd expected. I'm expecting maybe 10,000 records at a time so I'll just leave it ticking away until it's done.
public void ProcessFeed(string rawXml)
{
XDocument doc = XDocument.Parse(rawXml);
var items = ExtractItemsFromFeed(doc).OrderBy(x => x.SourceId);
int numberOfItems = items.Count;
int batchSize = 15;
int numberOfBatchesRequired = numberOfItems / batchSize;
int numberOfBatchesProcessed = 0;
int numberOfItemsInLastBatch = numberOfItems - (numberOfBatchesRequired * batchSize);
for (var i = 0;i <= numberOfBatchesRequired;i++)
{
using (IDocumentSession session = _store.OpenSession())
{
var numberOfItemsProcessedSoFar = numberOfBatchesProcessed * batchSize;
var numberOfItemsRemaining = numberOfItems - numberOfItemsProcessedSoFar;
int itemsToTake = 15;
if (numberOfItemsRemaining > 0 && numberOfItemsRemaining < 15)
itemsToTake = numberOfItemsRemaining;
foreach (var item in items.Skip(numberOfItemsProcessedSoFar).Take(itemsToTake))
{
var existingRecords = session.Query<AccItem>().Where(x => x.SourceId == item.SourceId).ToList();
if (!existingRecords.Any())
{
session.Store(item);
_logger.Info("Saved new item {0}.", item.ShortName);
}
else
{
if (existingRecords.Count() > 1)
_logger.Warn("There's more than one item in the database with the sourceid {0}", item.SourceId);
existingRecords.First().Village = item.Village;
_logger.Info("Updated item {0}.", item.ShortName);
}
session.SaveChanges();
}
}
numberOfBatchesProcessed++;
}
}