jsonmultithreadingdelphidwscript

Why multithreaded memory allocate/deallocate intensive application does not scale with number of threads?


Notice:

Original post title

Why multithreaded JSON parser from DWScript does not scale with number of threads?

was changed because this problem is not related to processing JSON data with DWScript. The problem is in default memory manager in Delphi XE2 to XE7 ( tested were XE2 and trial XE7 ), but problem appeared first in such type of application.


I have multithreaded Win32/Win64 vcl application which process JSON data in Delphi XE2.

Each thread parses JSON data using TdwsJSONValue.ParseString(sJSON) from DWScript, reads values using DWScript methods and stores result as records.

For testing purposes I process same JSON data in each thread.

Single thead run takes N seconds within thread to process data. Increasing number of threads to M lineary (approx. M * N) increases time within single thread necessary to process same data.

In result there is no speed improvment. Other parts of this applications ( JSON data delivery, storing results in target environment ) - scale as expected.

What could be a reason ? Any ideas appreciated.

Supplemental information:

  1. Tested on Win7/32 and Win7/64, Win8/64 from 2-core to 12-core (w/w-out HT) systems

  2. DWScript was choosen as fastest available (tested a bunch, among them: Superobject, build-in Delphi). SO behaves similar as JSON unit from DWS.

  3. Below is complete console app illustrating the problem. To run it we need sample json data available here: https://www.dropbox.com/s/4iuv87ytpcdugk6/json1.zip?dl=0 This file contains data json1.dat for first thread. For threads up to 16 just copy json1.dat to json2.dat...json16.dat.

    Program and data shoule be in the same folder. To run: convert.exe N, where N is number of threads.

    Program writes time of execution in msecs to stout - spent in thread, time of parsing data and time of releasing (Destroy) TdwsJSONValue object. Statement _dwsjvData.Destroy; does not scale.


program Convert;

{$APPTYPE CONSOLE}

{$R *.res}

uses
  System.SysUtils,
  System.Diagnostics,
  System.Classes,
  dwsJSON in 'dwsJSON.pas',
  dwsStrings in 'dwsStrings.pas',
  dwsUtils in 'dwsUtils.pas',
  dwsXPlatform in 'dwsXPlatform.pas';

type

  TWorkerThread = class (TThread)
  private
    _iUid:  Integer;
    _swWatch:  TStopwatch;
    _lRunning:  Boolean;

    _sFileJSonData:  String;
    _fJsonData:  TextFile;

  protected
    constructor Create (AUid: Integer);
    procedure Execute; override;

  published
    property Running: Boolean read _lRunning;

  end;

  TConverter = class (TObject)
  private
    _swWatch0, _swWatch1, _swWatch2:  TStopwatch;

    _dwsjvData:  TdwsJSONValue;

  protected
    constructor Create;
    destructor Destroy; override;

    function Calculate (AUid: Integer; AJSonData: String; var AParse, ADestroy: Integer): Integer;
  end;

const
  MAX_THREADS = 16;

var
  iHowMany:  Integer;
  athWorker:  array [1..MAX_THREADS] of Pointer;
  aiElapsed:  array [1..MAX_THREADS] of Integer;
  aiElapsedParse:  array [1..MAX_THREADS] of Integer;
  aiElapsedDestroy:  array [1..MAX_THREADS] of Integer;
  aiFares:  array [1..MAX_THREADS] of Integer;
  swWatchT, swWatchP:  TStopwatch;


constructor TWorkerThread.Create (AUid: Integer);
begin
  inherited Create (True);

  _iUid := AUid;
  _swWatch := TStopwatch.Create;
  _sFileJSonData := ExtractFilePath (ParamStr (0)) + 'json' + Trim (IntToStr (_iUid)) + '.dat';

  _lRunning := False;

  Suspended := False;
end;

procedure TWorkerThread.Execute;
var
  j:  Integer;
  sLine:  String;
  slLines:  TStringList;

  oS:  TConverter;
begin
  _lRunning := True;

  oS := TConverter.Create;

  slLines := TStringList.Create;
  System.AssignFile (_fJsonData, _sFileJSonData);
  System.Reset (_fJsonData);
  j := 0;
  repeat
    System.Readln (_fJsonData, sLine);
    slLines.Add (sLine);
    Inc (j);
  until (j = 50);
//  until (System.Eof (_fJsonData));
  System.Close (_fJsonData);

  Sleep (1000);

  _swWatch.Reset;
  _swWatch.Start;

  aiFares [_iUid] := 0;
  aiElapsedParse [_iUid] := 0;
  aiElapsedDestroy [_iUid] := 0;
  for j := 1 to slLines.Count do
    aiFares [_iUid] := aiFares [_iUid] + oS.Calculate (_iUid, slLines.Strings [j - 1], aiElapsedParse [_iUid], aiElapsedDestroy [_iUid]);

  _swWatch.Stop;

  slLines.Free;
  os.Destroy;

  aiElapsed [_iUid] := _swWatch.ElapsedMilliseconds;

  _lRunning := False;
end;

constructor TConverter.Create;
begin
  inherited Create;

  _swWatch0 := TStopwatch.Create;
  _swWatch1 := TStopwatch.Create;
  _swWatch2 := TStopwatch.Create;
end;

destructor TConverter.Destroy;
begin
  inherited;
end;

function TConverter.Calculate (AUid: Integer; AJSonData: String; var AParse, ADestroy: Integer): Integer;
var
  jFare, jTotalFares, iElapsedParse, iElapsedDestroy, iElapsedTotal:  Integer;
begin
  _swWatch0.Reset;
  _swWatch0.Start;

  _swWatch1.Reset;
  _swWatch1.Start;
  _dwsjvData := TdwsJSONValue.ParseString (AJSonData);
  _swWatch1.Stop;
  iElapsedParse := _swWatch1.ElapsedMilliseconds;

  if (_dwsjvData.ValueType = jvtArray) then
  begin
    _swWatch2.Reset;
    _swWatch2.Start;

    jTotalFares := _dwsjvData.ElementCount;
    for jFare := 0 to (jTotalFares - 1) do
      if (_dwsjvData.Elements [jFare].ValueType = jvtObject) then
      begin

        _swWatch1.Reset;
        _swWatch1.Start;

        _swWatch1.Stop;
      end;
  end;

  _swWatch1.Reset;
  _swWatch1.Start;
  _dwsjvData.Destroy;
  _swWatch1.Stop;
  iElapsedDestroy := _swWatch1.ElapsedMilliseconds;

  _swWatch0.Stop;
  iElapsedTotal := _swWatch0.ElapsedMilliseconds;

  Inc (AParse, iElapsedParse);
  Inc (ADestroy, iElapsedDestroy);

  result := jTotalFares;
end;

procedure MultithreadStart;
var
  j:  Integer;
begin
  for j := 1 to iHowMany do
    if (athWorker [j] = nil) then
    begin
      athWorker [j] := TWorkerThread.Create (j);

      TWorkerThread (athWorker [j]).FreeOnTerminate := False;
      TWorkerThread (athWorker [j]).Priority := tpNormal;
    end;
end;

procedure MultithreadStop;
var
  j:  Integer;
begin
  for j := 1 to MAX_THREADS do
    if (athWorker [j] <> nil) then
    begin
      TWorkerThread (athWorker [j]).Terminate;
      TWorkerThread (athWorker [j]).WaitFor;

      TWorkerThread (athWorker [j]).Free;
      athWorker [j] := nil;
    end;
end;

procedure Prologue;
var
  j:  Integer;
begin
  iHowMany := StrToInt (ParamStr (1));

  for j := 1 to MAX_THREADS do
    athWorker [j] := nil;

  swWatchT := TStopwatch.Create;
  swWatchT.Reset;

  swWatchP := TStopwatch.Create;
  swWatchP.Reset;
end;

procedure RunConvert;

  function __IsRunning: Boolean;
  var
    j:  Integer;
  begin
    result := False;
    for j := 1 to MAX_THREADS do
      result := result or ((athWorker [j] <> nil) and TWorkerThread (athWorker [j]).Running);
  end;

begin

  swWatchT.Start;

  MultithreadStart;

  Sleep (1000);
  while (__isRunning) do
    Sleep (500);

  MultithreadStop;

  swWatchT.Stop;
  Writeln (#13#10, 'Total time:', swWatchT.ElapsedMilliseconds);
end;

procedure Epilogue;
var
  j:  Integer;
begin
  for j := 1 to iHowMany do
    Writeln ( #13#10, 'Thread # ', j, '  tot.time:', aiElapsed [j], '  fares:', aiFares [j], '  tot.parse:', aiElapsedParse [j], '  tot.destroy:', aiElapsedDestroy [j]);

  Readln;
end;

begin
  try
    Prologue;
    RunConvert;
    Epilogue;

  except
    on E: Exception do
      Writeln (E.ClassName, ': ', E.Message);
  end;
end.

Solution

  • The solution is exchange default Delphi XE2 or XE7 memory manager with Intel® Threading Building Blocks memory manager. In example application it scales ca. lineary with number of threads up to 16 when app is 64 bits.

    update: with assumption that number of threads running is less than number of cores

    This was tested on machines from 2cores/4ht to 12cores/24ht running KVM virtualized Windows 7 with 124GB RAM

    Interesting thing is virtualizing Win 7. memory allocation and deallocation is from 2 x faster as in native Win 7.

    Conclusion: if you do a lot of memory allocation / deallocation operations of 10kB-10MB blocks in threads of multithreaded ( more than 4-8 threads) application - use only memory manager from Intel.

    @André: thanks for tip pointing me to right direction!

    Here is unit with TBB memory manager taken for tests, it has to appear as 1st on unit list in main project file .dpr

    unit TBBMem;
    
    interface
    
    function  ScalableGetMem  (ASize: NativeInt): Pointer; cdecl; external 'tbbmalloc' name 'scalable_malloc';
    procedure ScalableFreeMem (APtr: Pointer); cdecl; external 'tbbmalloc' name 'scalable_free';
    function  ScalableReAlloc (APtr: Pointer; Size: NativeInt): Pointer; cdecl; external 'tbbmalloc' name 'scalable_realloc';
    
    implementation
    
    Function TBBGetMem (ASize: Integer): Pointer;
    begin
      result := ScalableGetMem (ASize);
    end;
    
    Function TBBFreeMem (APtr: Pointer): Integer;
    begin
      ScalableFreeMem (APtr);
      result := 0;
    end;
    
    Function TBBReAllocMem (APtr: Pointer; ASize: Integer): Pointer;
    begin
      result := ScalableRealloc (APtr, ASize);
    end;
    
    const
      TBBMemoryManager:  TMemoryManager = ( GetMem: TBBGetmem;
                                            FreeMem: TBBFreeMem;
                                            ReAllocMem:  TBBReAllocMem; );
    var
      oldMemoryManager:  TMemoryManager;
    
    initialization
      GetMemoryManager (oldMemoryManager);
      SetMemoryManager (TBBMemoryManager);
    
    finalization
      SetMemoryManager (oldMemoryManager);
    
    end.