I need to handle Server-Sent Events (SSE) via an HTTP GET
request. Everything is working fine, except that I have no way of terminating the GET
request immediately. Instead, I need to wait for some data from the server, which might take several minutes. Currently, I'm setting TThread.Terminated
, and calling SysUtils.Abort()
in the TIdHTTP.OnChunkReceived
event.
So, how do I terminate the GET
request immediately?
For bonus points, why do I get disconnected every 330 seconds by the server via an EIdConnClosedGracefully
exception?
I can potentially call Free()
on the TIdHTTP
component from the main thread, but this is a bit of a sledgehammer, and I suspect it might have additional consequences.
My code is below, somewhat edited:
type
TAPINotificationThread = class(TThread)
protected
procedure Execute; override;
procedure Log(const s: string);
public
HTTP : TIdHTTP; //can we somehow make this stop without waiting for data
end;
procedure TAPINotificationThread.Execute;
begin
HTTP := CreateHTTP;
var ms := TMemoryStream.Create;
try
try
repeat
try
HTTP.Get(URL, ms);
except
on E: EIdConnClosedGracefully do
Log('EIdConnClosedGracefully reconnecting...');
on E: EIdReadTimeout do
Log('EIdReadTimeout reconnecting...');
else
Raise
end;
until Terminated;
except
on E: EAbort do
Log('Notification thread terminating normally');
on E: Exception do
Log('Exception in Notification Thread - ' + E.ClassName + ': ' + E.Message);
end;
finally
HTTP.Free;
ms.Free;
end;
end;
function CreateHTTP: TIDHTTP;
begin
Result := TIdHTTP.Create;
Result.Request.CustomHeaders.Values['Api-Key'] := APIKey;
var SSL := TIdSSLIOHandlerSocketOpenSSL.Create(Result);
SSL.SSLOptions.Method := sslvSSLv23;
SSL.SSLOptions.SSLVersions := [sslvTLSv1, sslvTLSv1_1, sslvTLSv1_2];
Result.IOHandler := SSL;
Result.OnChunkReceived := IdHTTP1ChunkReceived;
Result.Request.Accept := 'text/event-stream';
Result.Request.CacheControl := 'no-store';
Result.Request.TransferEncoding := 'chunked';
Result.ReadTimeout := 1_800_000; // 30 minutes, ping times seem to be an average of about 2.5 minutes, but are a bit erratic
//not sure about these
Result.Request.Connection := 'keep-alive';
Result.Response.KeepAlive := True;
end;
procedure TdmoDraftAPI.IdHTTP1ChunkReceived(Sender: TObject; var Chunk: TIdBytes);
var
s: string;
begin
s := ExtractJSONFromChunkData(Chunk);
TThread.Queue(nil,
procedure
var
DD: TDraftDetails;
begin
if DD.FromJSON(s) then
DraftEvent(DD)
else
begin
Log(s); //probably a ping
end;
LastCommunication := now;
end
);
if NotificationThread.Terminated then
Abort;
end;
So, how do I terminate the
GET
request immediately?
I can offer a couple of suggestions.
One way is to create the TIdHTTP
object in the thread's constructor instead of its Execute()
method, and then override the thread's virtual TerminatedSet()
method and have it close the TIdHTTP
's underlying socket, eg:
type
TAPINotificationThread = class(TThread)
protected
procedure Execute; override;
procedure TerminatedSet; override;
...
public
constructor Create; reintroduce;
destructor Destroy; override;
HTTP : TIdHTTP;
end;
constructor TAPINotificationThread.Create;
begin
inherited Create(False);
HTTP := CreateHTTP;
end;
destructor TAPINotificationThread.Destroy; override;
begin
HTTP.Free;
inherited;
end;
procedure TAPINotificationThread.Execute;
begin
// everything you already have, just without the CreateHTTP ...
end;
procedure TAPINotificationThread.TerminatedSet;
begin
inherited;
if HTTP.Socket.BindingAllocated then
HTTP.Socket.Binding.CloseSocket;
end;
Another way is to enable the hoNoReadChunked
option in the TIdHTTP.HTTPOptions
property. TIdHTTP.Get()
will exit after it receives the initial HTTP response headers, but will leave the TCP connection open. You can then manually read the SSE events directly from your IOHandler
object and not use the TIdHTTP.OnChunkReceived
event at all. This will allow you more control over the reading, such as using your own timeout logic when waiting for a new event to arrive, eg:
type
TAPINotificationThread = class(TThread)
protected
procedure Execute; override;
...
public
HTTP : TIdHTTP;
end;
procedure TAPINotificationThread.Execute;
var
ChunkSize: Integer;
Chunk: TIdBytes;
s: string;
j: Integer;
begin
HTTP := CreateHTTP;
try
...
HTTP.Get(URL, TStream(nil));
if (HTTP.ResponseCode = 200) and
(Pos('chunked', LowerCase(HTTP.Response.TransferEncoding)) > 0) then
begin
while not Terminated do
begin
// wait for data to arrive
if HTTP.IOHandler.InputBufferIsEmpty then
begin
HTTP.IOHandler.CheckForDataOnSource(100);
if HTTP.IOHandler.InputBufferIsEmpty then Continue;
end;
// read chunk size
s := HTTP.IOHandler.ReadLn;
if HTTP.IOHandler.ReadLnTimedout then Break;
j := Pos(';', s);
if j > 0 then begin
s := Copy(s, 1, j - 1);
end;
ChunkSize := StrToIntDef('$' + Trim(s), 0);
if ChunkSize = 0 then Break;
// read chunk data
SetLength(Chunk, ChunkSize);
HTTP.IOHandler.ReadBytes(Chunk, ChunkSize, False);
s := ExtractJSONFromChunkData(Chunk);
...
// read CRLF at end of chunk data
HTTP.IOHandler.ReadLn;
if HTTP.IOHandler.ReadLnTimedout then Break;
end;
end;
...
finally
HTTP.Free;
end;
end;
function CreateHTTP: TIdHTTP;
begin
Result := TIdHTTP.Create;
...
Result.HTTPOptions := Result.HTTPOptions + [hoNoReadChunked];
...
end;
why do I get disconnected every 330 seconds by the server via an
EIdConnClosedGracefully
exception?
Are you sure it is the server itself that is closing the connection? It might be that a firewall or NAT router is sitting in between your client and the server and is the one closing the TCP connection after ~5 minutes of inactivity.
For example, this is a common problem for FTP clients that sit behind FTP-unaware NATs and perform a long file transfers. Their command channel would be sitting idle and might get disconnected by the NAT while their data channel is transmitting. Indy's TIdFTP
component has a NATKeepAlive
property to address this issue. You might need similar behavior for your HTTP connection.
So, if your SSE events are not arriving frequent enough to keep the TCP connection alive, then try enabling TCP keep-alives (not HTTP keep-alives) on the underlying socket, eg:
function CreateHTTP: TIDHTTP;
begin
Result := TIdHTTP.Create;
...
Result.OnConnected := IdHTTP1Connected;
...
end;
procedure TdmoDraftAPI.IdHTTP1Connected(Sender: TObject);
begin
TIdHTTP(Sender).Socket.Binding.SetKeepAliveValues(True, 30000, 5000); // ie, probe every 5s after 30s of idleness
end;
On a side note: your use of a TMemoryStream
to receive SSE events is wasting memory. Even though you are processing each HTTP chunk individually in the TIdHTTP.OnChunkReceived
event, the chunks are still going to be written to the TMemoryStream
, growing its memory buffer over time.
If you don't use the hoNoReadChunked
approach above, then you can avoid this memory issue by having your OnChunkReceived
handler release each chunk so there is nothing for TIdHTTP
to write into the TMemoryStream
, eg:
procedure TdmoDraftAPI.IdHTTP1ChunkReceived(Sender: TObject; var Chunk: TIdBytes);
var
s: string;
begin
s := ExtractJSONFromChunkData(Chunk);
Chunk := nil; // <-- add this
//SetLength(Chunk, 0); // <-- or this
...
end;
Alternatively, you can use a TIdEventStream
instead of a TMemoryStream
, and then simply don't assign any handler to the TIdEventStream.OnWrite
event.