multithreadingdelphidelphi-10.3-rio

How do I terminate a thread waiting a return


I need to terminate my running thread in a way that doesn't cause an error or delay. The problem lies in the function "lMsg := lMsgQueue.Get(FQueueGetTimeout);", where it will wait for the defined time (usually 5000 ms). Thus, if I need to call an external terminate, my application will be stuck waiting for the termination.

What would be the best way to terminate it in the middle of the process?

{ TConsumerThread }

constructor TConsumerThread.Create;
begin
  FreeOnTerminate := True;
  InitializeVars;
  inherited Create(True);
end;

procedure TConsumerThread.Execute;
var
  lMsgQueue: TAMQPMessageQueue;
  lMsg: TAMQPMessage;
  lStartTime: TDateTime;
begin
  lMsgQueue := TAMQPMessageQueue.Create;
  FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
  try
    try
      FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer');
      lStartTime := Now;
      repeat
        try

          try
            if not(FConnectionAMQP.IsOpen) then
            BEGIN
              FConnectionAMQP.Connect;
              FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
              FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer';
            END;
          except
            on E: Exception do
              Break;
          end;

          lMsg := lMsgQueue.Get(FQueueGetTimeout);
          if (lMsg = nil) and not(Terminated) then
          begin
            if Assigned(FChannelAMQPThread) then
            begin
              FConnectionAMQP.CloseChannel(FChannelAMQPThread);
              FChannelAMQPThread := nil;
            end;

            FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
            FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer');
          end;

          if not(Terminated) then
          begin
            try
              if not(FConnectionAMQP.IsOpen) then
                FConnectionAMQP.Connect;
            except
              on E: Exception do
                Break;
            end;
          end;

          if not(Terminated) then
          begin
            if ValidateFilter(lMsg) then
            begin
              FCorrelationID := lMsg.Header.PropertyList.CorrelationID.Value;

              FReceivedMessage := lMsg.Body.asString[TEncoding.ASCII];
              lMsg.Ack;
              lMsg.Free;
              Terminate;
            end
            else
            begin
              lMsg.Reject;
              lMsg.Free;

              if not(FTimeout = INFINITE) then
              begin
                if (MilliSecondsBetween(Now, lStartTime) >= (Int64(FTimeout))) then
                begin
                  FReceivedMessage := '';
                  Terminate;
                end;
              end;
            end;
          end
          else
          begin
            Terminate;
          end;
        except
          on E: Exception do
          begin
            if Assigned(lMsg) then
            begin
              lMsg.Free;
              lMsg := nil;
            end;
          end;
        end;
      until (Terminated);
    except
      on E: Exception do
      begin
        FReceivedMessage := '';

        if not(Terminated) then
          Terminate;
      end;
    end;
  finally
    lMsgQueue.Free;
  end;
end;

procedure TConsumerThread.TerminatedSet;
begin
  inherited;
  if Assigned(FChannelAMQPThread) then
  begin
    try
      if FConnectionAMQP.IsOpen then
        FConnectionAMQP.CloseChannel(FChannelAMQPThread);
    except
      on E: Exception do
    end;

    FChannelAMQPThread := nil;
  end;
end;

function TConsumerThread.ValidateFilter(pMsg: TAMQPMessage): Boolean;
begin
  Result := False;

  case FMsgFilter of
    fmsgNone:
      Result := True;
    fmsgMessageID:
      Result := (pMsg.Header.PropertyList.MessageID.Value = FFilterValue);
    fmsgCorrelationID:
      Result := (pMsg.Header.PropertyList.CorrelationID.Value = FFilterValue);
  end;
end;

procedure TConsumerThread.InitializeVars;
begin
  FConnectionAMQP := nil;
  FChannelAMQPThread := nil;
  FQueue := '';
  FTimeout := INFINITE;
  FQueueGetTimeout := 5000;
  FQueuePrefetchSize := 0;
  FQueuePrefetchCount := 10;
  FMsgFilter := fmsgNone;
  FFilterValue := '';
  FReceivedMessage := '';
end;

To check the returned message or exception, I am using a function in OnTerminate.

Also, making it "FreeOnTerminate" in this case is the best alternative?

I start it suspended because I set the properties (initialized in InitializeVars) before the start.

This code is from the "Get" function, I didn't write it, but I can edit it if necessary.

{$I AMQP.Options.inc}
unit AMQP.Classes;

interface

Uses
  SysUtils, Classes, SyncObjs, Generics.Collections,
  AMQP.Frame, AMQP.Message, AMQP.Method, AMQP.Types
  {$IfDef fpc}
  , AMQP.SyncObjs
  {$EndIf}
  ;

Type
  AMQPException = Class(Exception);
  AMQPTimeout  = class(AMQPException);

  TAMQPServerProperties = Class
  Strict Private
    FCapabilities : TStringList;
    FMechanisms   : TStringList;
    FLocales      : TStringList;
    FClusterName  : String;
    FCopyright    : String;
    FInformation  : String;
    FPlatform     : String;
    FProduct      : String;
    FVersion      : String;
    FKnownHosts   : String;
    FVersionMajor : Integer;
    FVersionMinor : Integer;
    FChannelMax   : Integer;
    FFrameMax     : Integer;
    FHeartbeat    : Integer;
  Public
    Property Capabilities         : TStringList read FCapabilities;
    Property Mechanisms           : TStringList read FMechanisms;
    Property Locales              : TStringList read FLocales;
    Property ClusterName          : String      read FClusterName;
    Property Copyright            : String      read FCopyright;
    Property Information          : String      read FInformation;
    Property &Platform            : String      read FPlatform;
    Property Product              : String      read FProduct;
    Property Version              : String      read FVersion;
    Property KnownHosts           : String      read FKnownHosts;
    Property ProtocolVersionMajor : Integer     read FVersionMajor;
    Property ProtocolVersionMinor : Integer     read FVersionMinor;
    Property ChannelMax           : Integer     read FChannelMax;
    Property FrameMax             : Integer     read FFrameMax;
    Property Heartbeat            : Integer     read FHeartbeat;

    Procedure ReadConnectionStart( AConnectionStart: TAMQPMethod );
    Procedure ReadConnectionTune( AConnectionTune: TAMQPMethod );
    Procedure ReadConnectionOpenOK( AConnectionOpenOK: TAMQPMethod );

    Constructor Create;
    Destructor Destroy; Override;
  End;

  TBlockingQueue<T> = Class
  Strict Protected
    FGuard     : {$IFDEF FPC}TRTLCriticalSection{$ELSE}TCriticalSection{$ENDIF};
    FCondition : TConditionVariableCS;
    FQueue     : TQueue<T>;
  Public
    Function Count: Integer; Virtual;
    Function Get(ATimeOut: LongWord): T; Virtual;
    Procedure Put( Item: T ); Virtual;

    Constructor Create; Virtual;
    Destructor Destroy; Override;
  End;

  TAMQPQueue = TBlockingQueue<TAMQPFrame>;

  TAMQPMessageQueue = TBlockingQueue<TAMQPMessage>;

implementation

{ TAMQPServerProperties }


constructor TAMQPServerProperties.Create;
begin
  FCapabilities := TStringList.Create;
  FMechanisms   := TStringList.Create;
  FLocales      := TStringList.Create;
  FMechanisms.StrictDelimiter := True;
  FMechanisms.Delimiter       := ' ';
  FLocales.StrictDelimiter    := True;
  FLocales.Delimiter          := ' ';
  FClusterName  := '';
  FCopyright    := '';
  FInformation  := '';
  FPlatform     := '';
  FProduct      := '';
  FVersion      := '';
  FKnownHosts   := '';
  FVersionMajor := 0;
  FVersionMinor := 0;
  FChannelMax   := 0;
  FFrameMax     := 0;
  FHeartbeat    := 0;
end;

Procedure TAMQPServerProperties.ReadConnectionStart( AConnectionStart: TAMQPMethod );
var
  ServerProperties: TFieldTable;
  ServerCapabilities: TFieldTable;
  Pair: TFieldValuePair;
begin
  FMechanisms.DelimitedText := AConnectionStart.Field['mechanisms'].AsLongString.Value;
  FLocales.DelimitedText    := AConnectionStart.Field['locales'].AsLongString.Value;
  ServerProperties          := AConnectionStart.Field['server-properties'].AsFieldTable;
  FVersionMajor             := AConnectionStart.Field['version-major'].AsShortShortUInt.Value;
  FVersionMinor             := AConnectionStart.Field['version-minor'].AsShortShortUInt.Value;
  FClusterName              := ServerProperties.Field['cluster_name'].AsShortString.Value;
  FCopyright                := ServerProperties.Field['copyright'].AsShortString.Value;
  FInformation              := ServerProperties.Field['information'].AsShortString.Value;
  FPlatform                 := ServerProperties.Field['platform'].AsShortString.Value;
  FProduct                  := ServerProperties.Field['product'].AsShortString.Value;
  FVersion                  := ServerProperties.Field['version'].AsShortString.Value;
  ServerCapabilities        := ServerProperties.Field['capabilities'].AsFieldTable;
  for Pair in ServerCapabilities do
    FCapabilities.Values[ Pair.Name.Value ] := Pair.Value.AsString('');
end;

Procedure TAMQPServerProperties.ReadConnectionTune( AConnectionTune: TAMQPMethod );
begin
  FChannelMax               := AConnectionTune.Field['channel-max'].AsShortUInt.Value;
  FFrameMax                 := AConnectionTune.Field['frame-max'].AsLongUInt.Value;
  FHeartbeat                := AConnectionTune.Field['heartbeat'].AsShortUInt.Value;
end;

Procedure TAMQPServerProperties.ReadConnectionOpenOK( AConnectionOpenOK: TAMQPMethod );
begin
  FKnownHosts               := AConnectionOpenOK.Field['known-hosts'].AsShortString.Value;
end;

destructor TAMQPServerProperties.Destroy;
begin
  FCapabilities.Free;
  FMechanisms.Free;
  FLocales.Free;
  inherited;
end;

{ TBlockingQueue<T> }

function TBlockingQueue<T>.Count: Integer;
begin
  {$IFDEF FPC}
  EnterCriticalSection(FGuard);
  {$ELSE}
  FGuard.Acquire;
  {$ENDIF}
  try
    Result := FQueue.Count;
  finally
    {$IFDEF FPC}
     LeaveCriticalSection(FGuard);
    {$ELSE}
    FGuard.Release;
    {$ENDIF}
  end;
end;

constructor TBlockingQueue<T>.Create;
begin
  inherited;
  {$IFDEF FPC}
  InitCriticalSection(FGuard);
  {$ELSE}
  FGuard     := TCriticalSection.Create;
  {$ENDIF}
  FCondition := TConditionVariableCS.Create;
  FQueue     := TQueue<T>.Create;
end;

destructor TBlockingQueue<T>.Destroy;
begin
  FQueue.Free;
  FQueue := nil;
  FCondition.Free;
  FCondition := nil;
  {$IFDEF FPC}
  DoneCriticalSection(FGuard);
  {$ELSE}
  FGuard.Free;
  FGuard := nil;
  {$ENDIF}
  inherited;
end;

function TBlockingQueue<T>.Get(ATimeOut: LongWord): T;
begin
  {$IFDEF FPC}
  EnterCriticalSection(FGuard);
  {$ELSE}
  FGuard.Acquire;
  {$ENDIF}
  try
    while FQueue.Count = 0 do
    begin
     {$IFDEF FPC}
      if FCondition.WaitForRTL(FGuard, ATimeOut) = wrTimeout then
     {$Else}
      if FCondition.WaitFor(FGuard, ATimeOut) = wrTimeout then
     {$EndIf}
       raise AMQPTimeout.Create('Timeout!');
    end;
    Result := FQueue.Dequeue
  finally
  {$IFDEF FPC}
   LeaveCriticalSection(FGuard);
  {$ELSE}
  FGuard.Release;
  {$ENDIF}
  end;
end;

procedure TBlockingQueue<T>.Put(Item: T);
begin
  {$IFDEF FPC}
  EnterCriticalSection(FGuard);
  {$ELSE}
  FGuard.Acquire;
  {$ENDIF}
  try
    FQueue.Enqueue( Item );
    FCondition.ReleaseAll;
  finally
    {$IFDEF FPC}
     LeaveCriticalSection(FGuard);
    {$ELSE}
    FGuard.Release;
    {$ENDIF}
  end;
end;

end.

Solution

  • TBlockingQueue<T>.Get() is waiting on a TConditionVariableCS to be signaled up to the specified timeout. To make that exit more quickly, you would have to signal the ConditionVariable even if nothing was added to the queue. If you did that, you would have to update Get() to make sure the queue is not empty before dequeueing it, and have a flag somewhere that the cancel is intentional. Then you would just set that flag and signal the ConditionVariable when you want to cancel the wait.

    But in your case, it looks like your thread is queuing object pointers, and already handling nil pointers, so updating Get() is unnecessary. When terminating the thread, simply queue up a nil object pointer as your flag, and then the thread can check its Terminated property when it receives a nil pointer from the queue.

    Something like this:

    // move the MsgQueue to be a member of the thread class
    // instead of being a local variable of Execute...
    private
      FMsgQueue: TAMQPMessageQueue;
    ...
    
    constructor TConsumerThread.Create;
    begin
      FMsgQueue := TAMQPMessageQueue.Create;
      ...
    end;
    
    destructor TConsumerThread.Destroy;
    begin
      FMsgQueue.Free;
      ...
    end;
    
    procedure TConsumerThread.Execute;
    var
      ...
    begin
      ...
      lMsg := FMsgQueue.Get(FQueueGetTimeout);
      if lMsg = nil then
      begin
        if Terminated then Exit;
        ...
      end;
      ...
    end;
        
    procedure TConsumerThread.TerminatedSet;
    begin
      inherited;
      FMsgQueue.Put(nil);
      ...
    end;