I've got Event hubs outputting to Blob Storage using the Capture functionality - this outputs whatever we throw into the queue as a .avro file.
If I download this file and try to parse it with a library like avro-js, I have no problems - I can read the file just fine and handle the contents as I see fit.
However - when handling Azure Blob Storage with Node, I'd like to process the files as downlaoded. The format I get back when reading the file is a Buffer, but I can't find a way of successfully parsing this file using a library (can't find the right method, if there is one).
Code used to download the blob from Azure with a few bits omitted:
const { BlobServiceClient } = require('@azure/storage-blob');
const blobServiceClient = BlobServiceClient.fromConnectionString(AZURE_STORAGE_CONNECTION_STRING);
const containerClient = blobServiceClient.getContainerClient("data");
const blockBlobClient = containerClient.getBlockBlobClient(blob.name);
const downloadBlockBlobResponse = await blockBlobClient.download(0);
Buffer snippet when output to console:
<Buffer 4f 62 6a 01 04 14 61 76 72 6f 2e 63 6f 64 65 63 08 6e 75 6c 6c 16 61 76 72 6f 2e 73 63 68 65 6d 61 ec 06 7b 22 74 79 70 65 22 3a 22 72 65 63 6f 72 64 ... 589 more bytes>
Content when converted to string (pasting image as the garbled characters don't output properly):
Have tried to read the .avro files as plain text, and whilst they're mostly ok, there's a few characters which are garbled and so it doesn't read out as JSON (and I don't want to make assumptions on the content to try and pull the message bodies out).
Has anyone successfully lifted .avro content from Azure based on Buffers? I see a lot of guidance online for loading these into Spark or Kafka, but not to just read the files in a stream.
Thanks!
Regarding the issue, we can use the package avsc
to parse the avro file with buffer. For more details, please refer to here.
For example
const avro = require("avsc");
const {
BlobServiceClient,
StorageSharedKeyCredential,
} = require("@azure/storage-blob");
const accountName = "";
const accountKey =
"";
async function main() {
const creds = new StorageSharedKeyCredential(accountName, accountKey);
const blobServiceClient = new BlobServiceClient(
`https://${accountName}.blob.core.windows.net`,
creds
);
const containerClient = blobServiceClient.getContainerClient("");
const blockBlobClient = containerClient.getBlockBlobClient(
""
);
const downloadBlockBlobResponse = await blockBlobClient.download(0);
const buf = await streamToBuffer(
downloadBlockBlobResponse.readableStreamBody
);
const decoder = new avro.streams.BlockDecoder({
parseHook: (schema) => {
console.log("the avro file schema:");
console.log(schema);
return avro.Type.forSchema(schema, { wrapUnions: true });
},
});
decoder.on("data", (data) => {
console.log(data);
});
decoder.end(buf);
}
async function streamToBuffer(readableStream) {
return new Promise((resolve, reject) => {
const chunks = [];
readableStream.on("data", (data) => {
chunks.push(data instanceof Buffer ? data : Buffer.from(data));
});
readableStream.on("end", () => {
resolve(Buffer.concat(chunks));
});
readableStream.on("error", reject);
});
}
main();