How to deal with 'a Lwt objects in a function?
My code is
Array.map (fun conn -> let* resp = (call_server conn
(RequestVoteArg({
candidateNumber = myState.myPersistentState.id;
term = myState.myPersistentState.currentTerm;
lastlogIndex = (Array.get myState.myPersistentState.logs ((Array.length myState.myPersistentState.logs) - 1)).index;
lastlogTerm = (Array.get myState.myPersistentState.logs ((Array.length myState.myPersistentState.logs) - 1)).term
}))) in (match resp with
| Error(s) -> Printf.printf "requestVote: connection failed: %s" s
| Ok(repl, s) ->
(match repl with
| RequestVoteRet(repl) ->
if repl.voteGranted then current_vote := !current_vote + 1;
if not (repl.term = (-1l)) then myState.myPersistentState.currentTerm <- repl.term;
Printf.printf "requestVote: status: %s" s
| _ -> failwith "Should not reach here")); conn) peers
But there is an error: This expression (conn in the end) has type H2_lwt_unix.Client.t but an expression was expected of type 'weak702 Lwt.t.
peers is an array of connections (type: H2_lwt_unix.Client.t).
The definition of call_server is:
val call_server: H2_lwt_unix.Client.t -> protobufArg -> (Types.protobufRet * Grpc.Status.t, Grpc.Status.t) result Lwt.t
and
let build_connection addr port =
let* addrs =
Lwt_unix.getaddrinfo addr (string_of_int port)
[ Unix.(AI_FAMILY PF_INET) ]
in
let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
let* () = Lwt_unix.connect socket (List.hd addrs).Unix.ai_addr in
let error_handler _ = print_endline "error" in
let connection =
H2_lwt_unix.Client.create_connection ~error_handler socket
in connection
let call_server connection req =
let enc = Pbrt.Encoder.create() in
match req with
| RequestVoteArg(s) ->
let proto_s =
Proto.Proto_types.default_request_vote_arg ~candidate_number:s.candidateNumber ~term:s.term ~lastlog_term:s.lastlogTerm ~lastlog_index:s.lastlogIndex ()
in
Proto.Proto_pb.encode_request_vote_arg proto_s enc;
Client.call ~service:"raft.Proto" ~rpc:"RequestVote"
~do_request:(H2_lwt_unix.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.unary (Pbrt.Encoder.to_string enc) ~f:(fun decoder ->
(let+ decoder = decoder in
(match decoder with
| Some (decoder) ->
let decoder = Pbrt.Decoder.of_string decoder in
let reply = Proto.Proto_pb.decode_request_vote_reply decoder in
RequestVoteRet( { term=reply.term; voteGranted=reply.vote_granted } )
| None -> RequestVoteRet( { term=(-1l); voteGranted=false } ))))) ()
| AppendEntriesArg(s) ->
let proto_s =
Proto.Proto_types.default_append_entries_arg ~term: s.term ~leader_id:s.leaderID ~next_log_index:s.nextLogIndex ~next_log_term:s.nextLogTerm ~entries:(List.map (fun lg -> Proto.Proto_types.default_log ~command: lg.command ~term:lg.term ~index: lg.index ()) (Array.to_list (s.entries)) ) ()
in
Proto.Proto_pb.encode_append_entries_arg proto_s enc;
Client.call ~service:"raft.Proto" ~rpc:"AppendEntries"
~do_request:(H2_lwt_unix.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.unary (Pbrt.Encoder.to_string enc) ~f:(fun decoder ->
let+ decoder = decoder in
match decoder with
| Some(decoder) ->
let decoder = Pbrt.Decoder.of_string decoder in
let reply = Proto.Proto_pb.decode_append_entries_reply decoder in
AppendEntriesRet( { term=reply.term; success=reply.success } )
| None -> AppendEntriesRet( { term=(-1l); success=false } ))) ()
Your code can be simplified to:
let f (conn:H2_lwt_unix.Client.t) =
let* resp = ... in
...; conn
which fails because conn
is a connection and not a promise of returning a connection.
The easiest fix is to wrap this value inside a Lwt
promise with either
let f (conn:H2_lwt_unix.Client.t) =
let* resp = ... in
...; Lwt.return conn
or
let f (conn:H2_lwt_unix.Client.t) =
let+ resp = ... in
...; conn