ocamlocaml-lwtlwt

This expression has type H2_lwt_unix.Client.t but an expression was expected of type 'weak702 Lwt.t


How to deal with 'a Lwt objects in a function?

My code is

Array.map (fun conn -> let* resp = (call_server conn 
              (RequestVoteArg({
                  candidateNumber = myState.myPersistentState.id;
                  term = myState.myPersistentState.currentTerm;
                  lastlogIndex = (Array.get myState.myPersistentState.logs ((Array.length myState.myPersistentState.logs) - 1)).index;
                  lastlogTerm = (Array.get myState.myPersistentState.logs ((Array.length myState.myPersistentState.logs) - 1)).term
              }))) in (match resp with
              | Error(s) -> Printf.printf "requestVote: connection failed: %s" s
              | Ok(repl, s) -> 
                (match repl with
                | RequestVoteRet(repl) ->
                               if repl.voteGranted then current_vote := !current_vote + 1; 
                               if not (repl.term = (-1l)) then myState.myPersistentState.currentTerm <- repl.term;
                               Printf.printf "requestVote: status: %s" s
                | _ -> failwith "Should not reach here")); conn) peers

But there is an error: This expression (conn in the end) has type H2_lwt_unix.Client.t but an expression was expected of type 'weak702 Lwt.t.

peers is an array of connections (type: H2_lwt_unix.Client.t).

The definition of call_server is:

val call_server: H2_lwt_unix.Client.t -> protobufArg -> (Types.protobufRet * Grpc.Status.t, Grpc.Status.t) result Lwt.t

and

let build_connection addr port =
  let* addrs = 
    Lwt_unix.getaddrinfo addr (string_of_int port)
      [ Unix.(AI_FAMILY PF_INET) ]
  in
  let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
  let* () = Lwt_unix.connect socket (List.hd addrs).Unix.ai_addr in 
  let error_handler _ = print_endline "error" in
  let connection =
    H2_lwt_unix.Client.create_connection ~error_handler socket
  in connection

let call_server connection req = 
  let enc = Pbrt.Encoder.create() in
  match req with
  | RequestVoteArg(s) -> 
    let proto_s = 
      Proto.Proto_types.default_request_vote_arg ~candidate_number:s.candidateNumber ~term:s.term ~lastlog_term:s.lastlogTerm ~lastlog_index:s.lastlogIndex ()
    in
    Proto.Proto_pb.encode_request_vote_arg proto_s enc;
    Client.call ~service:"raft.Proto" ~rpc:"RequestVote"
      ~do_request:(H2_lwt_unix.Client.request connection ~error_handler:ignore)
      ~handler:
        (Client.Rpc.unary (Pbrt.Encoder.to_string enc) ~f:(fun decoder ->
            (let+ decoder = decoder in
            (match decoder with 
            | Some (decoder) -> 
                let decoder = Pbrt.Decoder.of_string decoder in
                let reply = Proto.Proto_pb.decode_request_vote_reply decoder in
                RequestVoteRet( { term=reply.term; voteGranted=reply.vote_granted } )
            | None -> RequestVoteRet( { term=(-1l); voteGranted=false } ))))) ()
  | AppendEntriesArg(s) -> 
    let proto_s = 
      Proto.Proto_types.default_append_entries_arg ~term: s.term ~leader_id:s.leaderID ~next_log_index:s.nextLogIndex ~next_log_term:s.nextLogTerm ~entries:(List.map (fun lg -> Proto.Proto_types.default_log ~command: lg.command ~term:lg.term ~index: lg.index ()) (Array.to_list (s.entries)) ) ()
    in
    Proto.Proto_pb.encode_append_entries_arg proto_s enc;
    Client.call ~service:"raft.Proto" ~rpc:"AppendEntries"
    ~do_request:(H2_lwt_unix.Client.request connection ~error_handler:ignore)
    ~handler:
      (Client.Rpc.unary (Pbrt.Encoder.to_string enc) ~f:(fun decoder ->
          let+ decoder = decoder in 
          match decoder with
          | Some(decoder) ->
              let decoder = Pbrt.Decoder.of_string decoder in 
              let reply = Proto.Proto_pb.decode_append_entries_reply decoder in 
              AppendEntriesRet( { term=reply.term; success=reply.success } )
          | None -> AppendEntriesRet( { term=(-1l); success=false } ))) ()


Solution

  • Your code can be simplified to:

    let f (conn:H2_lwt_unix.Client.t) =
      let* resp = ... in
       ...; conn
    

    which fails because conn is a connection and not a promise of returning a connection.

    The easiest fix is to wrap this value inside a Lwt promise with either

    let f (conn:H2_lwt_unix.Client.t) =
      let* resp = ... in
       ...; Lwt.return conn
    

    or

    let f (conn:H2_lwt_unix.Client.t) =
      let+ resp = ... in
       ...; conn