delphiindyserver-sent-eventsidhttpdelphi-12-athens

How to terminate a TIdHTTP.Get() request when receiving SSE events?


I need to handle Server-Sent Events (SSE) via an HTTP GET request. Everything is working fine, except that I have no way of terminating the GET request immediately. Instead, I need to wait for some data from the server, which might take several minutes. Currently, I'm setting TThread.Terminated, and calling SysUtils.Abort() in the TIdHTTP.OnChunkReceived event.

So, how do I terminate the GET request immediately?

For bonus points, why do I get disconnected every 330 seconds by the server via an EIdConnClosedGracefully exception?

I can potentially call Free() on the TIdHTTP component from the main thread, but this is a bit of a sledgehammer, and I suspect it might have additional consequences.

My code is below, somewhat edited:

type
  TAPINotificationThread = class(TThread)
  protected
    procedure Execute; override;
    procedure Log(const s: string);
  public
    HTTP : TIdHTTP; //can we somehow make this stop without waiting for data
  end;

procedure TAPINotificationThread.Execute;
begin
  HTTP := CreateHTTP;
  var ms := TMemoryStream.Create;
  try
    try
      repeat
        try
          HTTP.Get(URL, ms);
        except
          on E: EIdConnClosedGracefully do
            Log('EIdConnClosedGracefully reconnecting...');
          on E: EIdReadTimeout do
            Log('EIdReadTimeout reconnecting...');
          else
            Raise
        end;
      until Terminated;

    except
      on E: EAbort do
        Log('Notification thread terminating normally');
      on E: Exception do
        Log('Exception in Notification Thread - ' + E.ClassName + ': ' + E.Message);
    end;
  finally
    HTTP.Free;
    ms.Free;
  end;
end;

function CreateHTTP: TIDHTTP;
begin
  Result := TIdHTTP.Create;
  Result.Request.CustomHeaders.Values['Api-Key'] := APIKey;

  var SSL := TIdSSLIOHandlerSocketOpenSSL.Create(Result);
  SSL.SSLOptions.Method := sslvSSLv23;
  SSL.SSLOptions.SSLVersions := [sslvTLSv1, sslvTLSv1_1, sslvTLSv1_2];

  Result.IOHandler := SSL;

  Result.OnChunkReceived := IdHTTP1ChunkReceived;
  Result.Request.Accept := 'text/event-stream';
  Result.Request.CacheControl := 'no-store';
  Result.Request.TransferEncoding := 'chunked';
  Result.ReadTimeout := 1_800_000; // 30 minutes, ping times seem to be an average of about 2.5 minutes, but are a bit erratic

  //not sure about these
  Result.Request.Connection := 'keep-alive';
  Result.Response.KeepAlive := True;
end;

procedure TdmoDraftAPI.IdHTTP1ChunkReceived(Sender: TObject; var Chunk: TIdBytes);
var
  s: string;
begin
  s := ExtractJSONFromChunkData(Chunk);
  TThread.Queue(nil,
    procedure
    var
      DD: TDraftDetails;
    begin
      if DD.FromJSON(s) then
        DraftEvent(DD)
      else
      begin
        Log(s); //probably a ping
      end;
      LastCommunication := now;
    end
  );
  if NotificationThread.Terminated then
    Abort;
end;

Solution

  • So, how do I terminate the GET request immediately?

    I can offer a couple of suggestions.

    One way is to create the TIdHTTP object in the thread's constructor instead of its Execute() method, and then override the thread's virtual TerminatedSet() method and have it close the TIdHTTP's underlying socket, eg:

    type
      TAPINotificationThread = class(TThread)
      protected
        procedure Execute; override;
        procedure TerminatedSet; override;
        ...
      public
        constructor Create; reintroduce;
        destructor Destroy; override;
        HTTP : TIdHTTP;
      end;
    
    constructor TAPINotificationThread.Create;
    begin
      inherited Create(False);
      HTTP := CreateHTTP;
    end;
    
    destructor TAPINotificationThread.Destroy; override;
    begin
      HTTP.Free;
      inherited;
    end;
    
    procedure TAPINotificationThread.Execute;
    begin
      // everything you already have, just without the CreateHTTP ...
    end;
    
    procedure TAPINotificationThread.TerminatedSet;
    begin
      inherited;
      if HTTP.Socket.BindingAllocated then
        HTTP.Socket.Binding.CloseSocket;
    end;
    

    Another way is to enable the hoNoReadChunked option in the TIdHTTP.HTTPOptions property. TIdHTTP.Get() will exit after it receives the initial HTTP response headers, but will leave the TCP connection open. You can then manually read the SSE events directly from your IOHandler object and not use the TIdHTTP.OnChunkReceived event at all. This will allow you more control over the reading, such as using your own timeout logic when waiting for a new event to arrive, eg:

    type
      TAPINotificationThread = class(TThread)
      protected
        procedure Execute; override;
        ...
      public
        HTTP : TIdHTTP;
      end;
    
    procedure TAPINotificationThread.Execute;
    var
      ChunkSize: Integer;
      Chunk: TIdBytes;
      s: string;
      j: Integer;
    begin
      HTTP := CreateHTTP;
      try
        ...
        HTTP.Get(URL, TStream(nil));
        if (HTTP.ResponseCode = 200) and
           (Pos('chunked', LowerCase(HTTP.Response.TransferEncoding)) > 0) then
        begin
          while not Terminated do
          begin
            // wait for data to arrive
            if HTTP.IOHandler.InputBufferIsEmpty then
            begin
              HTTP.IOHandler.CheckForDataOnSource(100);
              if HTTP.IOHandler.InputBufferIsEmpty then Continue;
            end;
    
            // read chunk size
            s := HTTP.IOHandler.ReadLn;
            if HTTP.IOHandler.ReadLnTimedout then Break;
            j := Pos(';', s);
            if j > 0 then begin
              s := Copy(s, 1, j - 1);
            end;
            ChunkSize := StrToIntDef('$' + Trim(s), 0);
            if ChunkSize = 0 then Break;
    
            // read chunk data
            SetLength(Chunk, ChunkSize);
            HTTP.IOHandler.ReadBytes(Chunk, ChunkSize, False);
            s := ExtractJSONFromChunkData(Chunk);
            ...
    
            // read CRLF at end of chunk data
            HTTP.IOHandler.ReadLn;
            if HTTP.IOHandler.ReadLnTimedout then Break;
          end;
        end;
        ...
      finally
        HTTP.Free;
      end;
    end;
    
    function CreateHTTP: TIdHTTP;
    begin
      Result := TIdHTTP.Create;
      ...
      Result.HTTPOptions := Result.HTTPOptions + [hoNoReadChunked];
      ...
    end;
    

    why do I get disconnected every 330 seconds by the server via an EIdConnClosedGracefully exception?

    Are you sure it is the server itself that is closing the connection? It might be that a firewall or NAT router is sitting in between your client and the server and is the one closing the TCP connection after ~5 minutes of inactivity.

    For example, this is a common problem for FTP clients that sit behind FTP-unaware NATs and perform a long file transfers. Their command channel would be sitting idle and might get disconnected by the NAT while their data channel is transmitting. Indy's TIdFTP component has a NATKeepAlive property to address this issue. You might need similar behavior for your HTTP connection.

    So, if your SSE events are not arriving frequent enough to keep the TCP connection alive, then try enabling TCP keep-alives (not HTTP keep-alives) on the underlying socket, eg:

    function CreateHTTP: TIDHTTP;
    begin
      Result := TIdHTTP.Create;
      ...
      Result.OnConnected := IdHTTP1Connected;
      ...
    end;
    
    procedure TdmoDraftAPI.IdHTTP1Connected(Sender: TObject);
    begin
      TIdHTTP(Sender).Socket.Binding.SetKeepAliveValues(True, 30000, 5000); // ie, probe every 5s after 30s of idleness
    end;
    

    On a side note: your use of a TMemoryStream to receive SSE events is wasting memory. Even though you are processing each HTTP chunk individually in the TIdHTTP.OnChunkReceived event, the chunks are still going to be written to the TMemoryStream, growing its memory buffer over time.

    If you don't use the hoNoReadChunked approach above, then you can avoid this memory issue by having your OnChunkReceived handler release each chunk so there is nothing for TIdHTTP to write into the TMemoryStream, eg:

    procedure TdmoDraftAPI.IdHTTP1ChunkReceived(Sender: TObject; var Chunk: TIdBytes);
    var
      s: string;
    begin
      s := ExtractJSONFromChunkData(Chunk);
      Chunk := nil;          // <-- add this
      //SetLength(Chunk, 0); // <-- or this
      ...
    end;
    

    Alternatively, you can use a TIdEventStream instead of a TMemoryStream, and then simply don't assign any handler to the TIdEventStream.OnWrite event.