I am trying to implement request streaming from my python client to my C# server using gRPC. This is my protofile:
syntax = "proto3";
service PerceiveAPIDataService {
rpc UploadResource (stream UploadResourceRequest) returns (UploadResourceResponse);
}
message UploadResourceRequest {
oneof request_data {
ResourceChunk resource_chunk = 1;
UploadResourceParameters parameters = 2;
}
}
message ResourceChunk {
bytes content = 1;
}
message UploadResourceParameters {
string path = 1;
}
This is my c# implementation:
public override async Task<UploadResourceResponse> UploadResource(IAsyncStreamReader<UploadResourceRequest> requestStream, ServerCallContext context)
{
if (!await requestStream.MoveNext())
{
throw new RpcException(new Status(StatusCode.FailedPrecondition, "No upload parameters found."));
}
var initialMessage = requestStream.Current;
if (initialMessage.RequestDataCase != UploadResourceRequest.RequestDataOneofCase.Parameters)
{
throw new RpcException(new Status(StatusCode.FailedPrecondition, "First message must contain upload parameters."));
}
var path = initialMessage.Parameters.Path;
if (string.IsNullOrWhiteSpace(path))
{
throw new RpcException(new Status(StatusCode.InvalidArgument, "Upload path is required."));
}
using (var ms = new MemoryStream())
{
while (await requestStream.MoveNext())
{
var chunk = requestStream.Current.ResourceChunk;
if (chunk == null)
{
continue; // Skip any messages that are not resource chunks
}
await ms.WriteAsync(chunk.Content.ToByteArray().AsMemory(0, chunk.Content.Length));
}
ms.Seek(0, SeekOrigin.Begin); // Reset memory stream position to the beginning for reading during upload
var uploadResult = await _dataService.UploadResourceAsync(path, ms);
return new UploadResourceResponse { Succeeded = uploadResult.IsSuccessful };
}
}
And this is my python client code:
def generate_request(self, data: bytearray, next_cloud_path: str) -> Generator:
first_req = perceive_api_data_service_pb2.UploadResourceRequest(
parameters=perceive_api_data_service_pb2.UploadResourceParameters(path=next_cloud_path)
)
yield first_req
print("Sent initial request with path:", next_cloud_path)
chunk_size = 2048
total_chunks = (len(data) + chunk_size - 1) // chunk_size # Ceiling division to get total number of chunks
print(f"Data size: {len(data)} bytes, chunk size: {chunk_size} bytes, total chunks: {total_chunks}")
for i in range(0, len(data), chunk_size):
chunk = data[i:i+chunk_size]
yield perceive_api_data_service_pb2.UploadResourceRequest(
resource_chunk=perceive_api_data_service_pb2.ResourceChunk(content=chunk)
)
print(f"Sent chunk {((i // chunk_size) + 1)} of {total_chunks}")
async def upload_file(self, data: bytearray, next_cloud_path: str) -> bool:
async with grpc.aio.insecure_channel("localhost:5228") as channel:
stub = perceive_api_data_service_pb2_grpc.PerceiveAPIDataServiceStub(channel)
request_iterator = self.generate_request(data, next_cloud_path)
response = await stub.UploadResource(request_iterator)
return response.succeeded
The error I get on the server is this:
Grpc.AspNetCore.Server.ServerCallHandler: Error: Error when executing service method 'UploadResource'.
System.IO.IOException: The client reset the request stream. at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) at System.IO.Pipelines.Pipe.GetReadAsyncResult()
And the error I get on the client is this:
File "C:\Users\user_name\AppData\Local\Programs\Python\Python311\Lib\site-packages\grpc\aio_call.py", line 690, in _conduct_rpc serialized_response = await self._cython_call.stream_unary( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "src\python\grpcio\grpc_cython_cygrpc/aio/call.pyx.pxi", line 458, in stream_unary File "src\python\grpcio\grpc_cython_cygrpc/aio/callback_common.pyx.pxi", line 166, in _receive_initial_metadata File "src\python\grpcio\grpc_cython_cygrpc/aio/callback_common.pyx.pxi", line 99, in execute_batch asyncio.exceptions.CancelledError
The first message is sent successfully. So the parameters. However, as soon as the server tries to do requestStream.MoveNext()
, it throws this error.
I already tried numerous of different solutions, but I cannot find anything that works. Does anyone see where I am making an error?
Turns out, to stream a file, it must be base64 encoded first. That was the whole issue after all.