azure-blob-storageazure-durable-functionsblobstorage

append blob with .NET appending data to separate arrays


I have a SubOrchestrator function that calls the following Activity Functions:

    public async Task GetTransitiveMembers(IDurableOrchestrationContext context, GroupMembershipRequest request)
    {
        if (!context.IsReplaying) _ = _log.LogMessageAsync(new LogMessage { RunId = request.RunId, Message = $"Getting results from 1st page" });
        var nextPageUrl = await context.CallActivityAsync<string>(nameof(MembersReaderFunction), new MembersReaderRequest { RunId = request.RunId, GroupId = request.SourceGroup.ObjectId, TargetGroupId = request.GroupId, CurrentPart = request.CurrentPart });
    
    
        while (!string.IsNullOrEmpty(nextPageUrl))
        {
            if (!context.IsReplaying) _ = _log.LogMessageAsync(new LogMessage { RunId = request.RunId, Message = $"Getting results from next page url: {nextPageUrl}" });
            var oldNextPageUrl = nextPageUrl;              
            nextPageUrl = await context.CallActivityAsync<string>(nameof(SubsequentMembersReaderFunction), new SubsequentMembersReaderRequest { RunId = request.RunId, NextPageUrl = nextPageUrl, GroupId = request.SourceGroup.ObjectId, TargetGroupId = request.GroupId, CurrentPart = request.CurrentPart })
        }
    }

Activity functions:

    [FunctionName(nameof(MembersReaderFunction))]
    public async Task<string> GetMembersAsync([ActivityTrigger] MembersReaderRequest request)
    {
        await _log.LogMessageAsync(new LogMessage { Message = $"{nameof(MembersReaderFunction)} function started", RunId = request.RunId }, VerbosityLevel.DEBUG);
        _calculator.RunId = request.RunId;
        var response = await _calculator.GetFirstTransitiveMembersPageAsync(request.GroupId, request.RunId);
        response.Users = new List<AzureADUser> { new AzureADUser { UserPrincipalName = "a" } };
        var fileName = $"/{request.TargetGroupId}/{request.RunId}_GroupMembership_{request.CurrentPart}.json";
        using (MemoryStream logEntryStream = new MemoryStream())
        {
            await JsonSerializer.SerializeAsync(logEntryStream, response.Users);
            logEntryStream.Position = 0;
            await _blobStorageRepository.AppendDataToBlobAsync(logEntryStream, fileName);
        }
        await _log.LogMessageAsync(new LogMessage { Message = $"{nameof(MembersReaderFunction)} function completed", RunId = request.RunId }, VerbosityLevel.DEBUG);
        return response.NextPageUrl;
    }

    [FunctionName(nameof(SubsequentMembersReaderFunction))]
    public async Task<string> GetMembersAsync([ActivityTrigger] SubsequentMembersReaderRequest request)
    {
        await _log.LogMessageAsync(new LogMessage { Message = $"{nameof(SubsequentMembersReaderFunction)} function started", RunId = request.RunId }, VerbosityLevel.DEBUG);
        _calculator.RunId = request.RunId;
        var response = await _calculator.GetNextTransitiveMembersPageAsync(request.NextPageUrl);
        response.Users = new List<AzureADUser> { new AzureADUser { UserPrincipalName = "b" } };
        var fileName = $"/{request.TargetGroupId}/{request.RunId}_GroupMembership_{request.CurrentPart}.json";
        using (MemoryStream logEntryStream = new MemoryStream())
        {
            await JsonSerializer.SerializeAsync(logEntryStream, response.Users);
            logEntryStream.Position = 0;
            await _blobStorageRepository.AppendDataToBlobAsync(logEntryStream, fileName);
        }
        await _log.LogMessageAsync(new LogMessage { Message = $"{nameof(SubsequentMembersReaderFunction)} function completed", RunId = request.RunId }, VerbosityLevel.DEBUG);
        return response.NextPageUrl;
    }

BlobStorageRepository:

    public async Task AppendDataToBlobAsync (MemoryStream logEntryStream, string logBlobName)
    {
        var appendBlobClient = _containerClient.GetAppendBlobClient(logBlobName);
    
        await appendBlobClient.CreateIfNotExistsAsync();
        logEntryStream.Position = 0;
    
        var maxBlockSize = appendBlobClient.AppendBlobMaxAppendBlockBytes;
        var bytesLeft = logEntryStream.Length;
        var buffer = new byte[maxBlockSize];
        while (bytesLeft > 0)
        {
            var blockSize = (int)Math.Min(bytesLeft, maxBlockSize);
            var bytesRead = await logEntryStream.ReadAsync(buffer.AsMemory(0, blockSize));
            await using (MemoryStream memoryStream = new MemoryStream(buffer, 0, bytesRead))
            {
                await appendBlobClient.AppendBlockAsync(memoryStream);
            }
            bytesLeft -= bytesRead;
        }
    }

I see the content in blob like this:

    [
        {
            "ObjectId": "00000000-0000-0000-0000-000000000000",
            "Mail": null,
            "UserPrincipalName": "a",
            "Properties": null,
            "MembershipAction": null,
            "SourceGroup": "00000000-0000-0000-0000-000000000000",
            "SourceGroups": null
        }
    ][
        {
            "ObjectId": "00000000-0000-0000-0000-000000000000",
            "Mail": null,
            "UserPrincipalName": "b",
            "Properties": null,
            "MembershipAction": null,
            "SourceGroup": "00000000-0000-0000-0000-000000000000",
            "SourceGroups": null
        }
    ][
        {
            "ObjectId": "00000000-0000-0000-0000-000000000000",
            "Mail": null,
            "UserPrincipalName": "b",
            "Properties": null,
            "MembershipAction": null,
            "SourceGroup": "00000000-0000-0000-0000-000000000000",
            "SourceGroups": null
        }
    ]

It should be:

    [
        {
            "ObjectId": "00000000-0000-0000-0000-000000000000",
            "Mail": null,
            "UserPrincipalName": "a",
            "Properties": null,
            "MembershipAction": null,
            "SourceGroup": "00000000-0000-0000-0000-000000000000",
            "SourceGroups": null
        },
        {
            "ObjectId": "00000000-0000-0000-0000-000000000000",
            "Mail": null,
            "UserPrincipalName": "b",
            "Properties": null,
            "MembershipAction": null,
            "SourceGroup": "00000000-0000-0000-0000-000000000000",
            "SourceGroups": null
        },
        {
            "ObjectId": "00000000-0000-0000-0000-000000000000",
            "Mail": null,
            "UserPrincipalName": "b",
            "Properties": null,
            "MembershipAction": null,
            "SourceGroup": "00000000-0000-0000-0000-000000000000",
            "SourceGroups": null
        }
    ]

How do I fix this?


Solution

  • Why the output looks wrong at the moment

    To start off, let's explain the difference between your observed output and what you want.

    The reason for this is explained by how you're writing the JSON. The relevant lines of your code are:

    // You create a list
    response.Users = new List<AzureADUser> { new AzureADUser { UserPrincipalName = "b" } };
    
    // ...
    
    // Then you serialise the list and add it onto the end of the blob:
    await JsonSerializer.SerializeAsync(logEntryStream, response.Users);
    logEntryStream.Position = 0;
    await _blobStorageRepository.AppendDataToBlobAsync(logEntryStream, fileName);
    
    

    Put another way: at the start of the function, your blob looks like this:

    [
        {
            // original object
        }
    ]
    

    ...which is a list with 1 object.

    But your current code just serialises another list and tries to glue it onto the end of the JSON. It's not actually adding them into the existing list. So you end up with:

    [
        {
            // original object
        }
    ]
    [
        {
            // new object
        }
    ]
    

    It's not even valid JSON any more.

    Why it's not practical to do exactly what you're asking

    If you think about what's needed, you would need to somehow manipulate the existing JSON, so that your new data gets inserted into that array. Like:

    [
        {
            // original object        
        }
        , // Now insert an extra comma here
        // Now insert your new objects here
        {
    
        }
    ] // And make sure you keep the closing bracket at the end.
    

    This is, really, almost impossible to do. Shaving bytes off the end of the blob, adding more data, putting the bracket back on the end... Quite impractical.

    You can't sensibly add data into an existing JSON array, unless you first read the entire file and deserialise it into your list. That brings us onto the potential solutions.

    Solution 1: deserialise the whole JSON file into a list first

    This is what I suggested in the comments. You'd read it all in, then add your new items to the end of the list, and write it back.

    But I totally get that you don't want to do that, because there's a huge performance hit if you're reading and writing 300,000 items in each list.

    Solution 2: store each batch of items in their own blob file, and bring them together later

    This is a much more scalable solution.

    For your function here:

        [FunctionName(nameof(SubsequentMembersReaderFunction))]
        public async Task<string> GetMembersAsync([ActivityTrigger] SubsequentMembersReaderRequest request)
    

    ...make it write a brand new JSON file with a different name. You can then write those 1,000 new items really quickly; while leaving the previous ones intact in their other files.

    I'm guessing you will say it'll cause problems when you read the JSON files later (for whatever the rest of your application does). You might think it'll be too slow/inconvenient to read in 300 JSON files of 1,000 items; compared to reading 1 JSON file of 300,000 items.

    So, let me convince you that it's easy to read lots of blobs. You can even parallelise them, which I've shown in the example. (note parallelising isn't worth it if each file is very large; but if you end up with lots of small files then it can reduce the latency.)

    Retrieving the files in parallel might look something like this:

    // Get a list of blobs matching a prefix.
    // These would be your 300 JSON files that each have a batch of 1,000 items.        
    var blobs = container.GetBlobsAsync(prefix: "blobfile-");
    var blobReaderTasks = new List<Task<Stream>>();
    
    // Enumerate the list of blobs.
    // This doesn't actually read them - it just gets the streams ready.
    await foreach (BlobItem blobItem in blobs)
    {
        blobReaderTasks.Add(container.GetBlobClient(blobItem.Name).OpenReadAsync());
    }
    var blobStreams = await Task.WhenAll(blobReaderTasks);
    
    // For each stream, create a lambda function that will deserialise them into your object.
    // Again, this doesn't actually parse the data yet - it just gets a bunch of tasks ready.
    var blobDeserialisationTasks = blobStreams
        .Select(async stream => await JsonSerializer.DeserializeAsync<AzureADUser>(stream) ?? throw new Exception("Was null"))
        .ToArray();
    
    // Now do the actual reading of the streams and deserialising,
    // which will be done with a degree of parallelisation if you want that.
    var adUsers = await Task.WhenAll(blobDeserialisationTasks);
    

    Hope that helps.