multithreadingdelphidelphi-2010omnithreadlibrary

Download and process files with IOmniPipeline


My goal is a VCL app where I need to concurrently download some files (URLs stored in TDataSet), then each of downloaded file must be processed (one-by-one). During app working GUI must not be hanged, user user should be able to cancel (interrupt) process in any stage.

I totally rewrite my first example, now there is no third-party classes (Omni Thread Library 3.07.6 and VCL only). Of course, it's still demo and some checks was removed from code, but however, this sample still not short unfortunately. Downloading part based on this answer (thanks!).

So, when any file downloading I need to show progress to this files in GUI. Downloader class generate "event" OnProgressChange (because in real app I'll use TALWinInetHTTPClient class instance from Alcinoe library and it has real event OnProgressChange). I think it's enough just write progress value in DB, then DBGrid show progress values correctly.

Of course I saw this answers (and some other related with OTL): How to Stop all Pipeline tasks correctly How to use Pipeline pattern in Delphi (it's similar to my tasks, but differences in details).

Also I saw OTL docs and examples but I can't find some real example for doing similar task.

I created some classes to solve this task, and it's works, but have some critical troubles:

  1. At first stage downloading not starts separately (not parallels, but one-by-one).

  2. Cancellation not works properly.

Below some code to illustrate my problem. It's contain two units, one is main form (GUI, preparing data, interaction with user), second is Pipeline wrapper and downloader.

DFM for main form is:

object fmMain: TfmMain
  Left = 628
  Top = 172
  Caption = 'WorkSpace preparer'
  ClientHeight = 262
  ClientWidth = 700
  Color = clBtnFace
  Font.Charset = DEFAULT_CHARSET
  Font.Color = clWindowText
  Font.Height = -13
  Font.Name = 'Segoe UI'
  Font.Style = []
  OldCreateOrder = False
  Position = poScreenCenter
  OnCloseQuery = FormCloseQuery
  OnCreate = FormCreate
  PixelsPerInch = 96
  TextHeight = 17
  object DBGridApps: TDBGrid
    AlignWithMargins = True
    Left = 3
    Top = 3
    Width = 694
    Height = 207
    Align = alClient
    DataSource = dsApps
    ReadOnly = True
    TabOrder = 0
    TitleFont.Charset = DEFAULT_CHARSET
    TitleFont.Color = clWindowText
    TitleFont.Height = -13
    TitleFont.Name = 'Segoe UI'
    TitleFont.Style = []
  end
  object Panel1: TPanel
    AlignWithMargins = True
    Left = 3
    Top = 216
    Width = 694
    Height = 43
    Align = alBottom
    TabOrder = 1
    object bbExit: TBitBtn
      AlignWithMargins = True
      Left = 549
      Top = 4
      Width = 141
      Height = 35
      Align = alRight
      Caption = 'Exit'
      TabOrder = 0
      OnClick = bbExitClick
    end
    object bbCancel: TBitBtn
      AlignWithMargins = True
      Left = 151
      Top = 4
      Width = 141
      Height = 35
      Align = alLeft
      Caption = 'Cancel'
      TabOrder = 1
      OnClick = bbCancelClick
      ExplicitTop = 0
    end
    object bbStart: TBitBtn
      AlignWithMargins = True
      Left = 4
      Top = 4
      Width = 141
      Height = 35
      Align = alLeft
      Caption = 'Start'
      TabOrder = 2
      OnClick = bbStartClick
    end
  end
  object dsApps: TDataSource
    DataSet = cdsApps
    Left = 32
    Top = 88
  end
  object cdsApps: TClientDataSet
    Aggregates = <>
    Params = <>
    Left = 16
    Top = 72
  end
end

Main form code:

unit MainForm;

interface

uses
  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
  Dialogs, DB, DBClient, Grids, DBGrids, StdCtrls, Buttons, ExtCtrls,
  PipelineHolder;

type
  TfmMain = class(TForm)
    DBGridApps: TDBGrid;
    dsApps: TDataSource;
    Panel1: TPanel;
    bbExit: TBitBtn;
    bbCancel: TBitBtn;
    bbStart: TBitBtn;
    cdsApps: TClientDataSet;
    procedure bbExitClick(Sender: TObject);
    procedure bbCancelClick(Sender: TObject);
    procedure bbStartClick(Sender: TObject);
    procedure FormCloseQuery(Sender: TObject; var CanClose: Boolean);
    procedure FormCreate(Sender: TObject);
  private
    { Private declarations }
    PH : TPipelineHolder;
    procedure SwitchControlState;
  public
    { Public declarations }
  end;

var
  fmMain: TfmMain;

implementation

{$R *.dfm}

procedure TfmMain.bbExitClick(Sender: TObject);
begin
  Close;
end;

procedure TfmMain.bbCancelClick(Sender: TObject);
begin
  if Assigned(PH) then
    begin
      SwitchControlState;
      PH.Stop;
    end;
end;

procedure TfmMain.bbStartClick(Sender: TObject);
begin
  if not Assigned(PH) then
    PH := TPipelineHolder.Create;
  SwitchControlState;
  PH.Make(cdsApps);
end;

procedure TfmMain.FormCloseQuery(Sender: TObject; var CanClose: Boolean);
begin
  CanClose := MessageBox(0, 'Exit now?', 'Exit', MB_YESNO + MB_ICONQUESTION + MB_DEFBUTTON2 + MB_TOPMOST) = IDYES;
  if CanClose then bbCancel.Click;
end;

procedure TfmMain.FormCreate(Sender: TObject);
begin
  //Prepare dataset
  cdsApps.Close;
  With cdsApps do
    begin
      FieldDefs.Add('progress', ftFloat);
      FieldDefs.Add('status', ftString, 30);
      FieldDefs.Add('id', ftString, 30);
      FieldDefs.Add('uid', ftString, 30);
      FieldDefs.Add('title', ftString, 30);
      FieldDefs.Add('url', ftString, 255);
      FieldDefs.Add('silent_parameters', ftString, 255);
      FieldDefs.Add('target_file', ftString, 255);
      CreateDataSet;
      LogChanges := False;
      Open;

      // Below you can change URL as you wish.
      // For example I'll use VirtualBox distrib from this page: https://www.virtualbox.org/wiki/Downloads
      // To correct progress values web-server must response with correct content-lenght values and must
      // support HEAD command.
      // Record 1
      Append;
      Fields[0].AsFloat := 0.0;
      Fields[1].AsString := 'Ready';
      Fields[2].AsString := '5be2e746ce46a1000cdc8b90';
      Fields[3].AsString := 'SomeApp1';
      Fields[4].AsString := 'VirtualBox 6.0.10';
      Fields[5].AsString := 'https://download.virtualbox.org/virtualbox/6.0.10/VirtualBox-6.0.10-132072-Win.exe';
      Fields[6].AsString := '/S';
      Fields[7].AsString := '001_installer.exe';
      Post;
      // Record 2
      Append;
      Fields[0].AsFloat := 0.0;
      Fields[1].AsString := 'Ready';
      Fields[2].AsString := '5be31c63ce46a1000b268bb2';
      Fields[3].AsString := 'SomeApp2';
      Fields[4].AsString := 'VirtualBox 6.0.10';
      Fields[5].AsString := 'https://download.virtualbox.org/virtualbox/6.0.10/VirtualBox-6.0.10-132072-Win.exe';
      Fields[6].AsString := '';
      Fields[7].AsString := '002_installer.exe';
      Post;
      // Record 3
      Append;
      Fields[0].AsFloat := 0.0;
      Fields[1].AsString := 'Ready';
      Fields[2].AsString := '5be3428ace46a1000b268bc0';
      Fields[3].AsString := 'SomeApp3';
      Fields[4].AsString := 'VirtualBox 6.0.10';
      Fields[5].AsString := 'https://download.virtualbox.org/virtualbox/6.0.10/VirtualBox-6.0.10-132072-Win.exe';
      Fields[6].AsString := '/VERY_SILENT';
      Fields[7].AsString := '003_installer.exe';
      Post;
      // Record 4
      Append;
      Fields[0].AsFloat := 0.0;
      Fields[1].AsString := 'Ready';
      Fields[2].AsString := '5be3428ace46a1000b268bc1';
      Fields[3].AsString := 'SomeApp4';
      Fields[4].AsString := 'VirtualBox 6.0.10';
      Fields[5].AsString := 'https://download.virtualbox.org/virtualbox/6.0.10/VirtualBox-6.0.10-132072-Win.exe';
      Fields[6].AsString := '';
      Fields[7].AsString := '004_installer.exe';
      Post;
      // Record 5 - it's not exe, just simple testing file, in this demo at
      // Install method with this file will set status to error.
      Append;
      Fields[0].AsFloat := 0.0;
      Fields[1].AsString := 'Ready';
      Fields[2].AsString := '5be512bb4a9bbb000b6de944';
      Fields[3].AsString := 'SomeFile';
      Fields[4].AsString := 'Demo File (not executable)';
      Fields[5].AsString := 'https://speed.hetzner.de/100MB.bin';
      Fields[6].AsString := '';
      Fields[7].AsString := '005_sample_100MB.bin';
      Post;

      First;
    end;
end;

procedure TfmMain.SwitchControlState;
begin
  bbStart.Enabled := not bbStart.Enabled;
end;

end.

Second unit for pipeline working implementation:

unit PipelineHolder;

interface

uses
  Windows, SysUtils, Classes, OtlCommon, OtlCollections, OtlParallel, Forms,
  DB, Generics.Defaults, StrUtils, Generics.Collections, Messages, OtlComm,
  OtlTask, OtlTaskControl, ShellAPI, Dialogs, OtlSync, Math, WinInet;

// Messages
const
  WM_PROGRESSCHANGED = WM_APP + 105;

// Process states
type
  TAppState  = (asReady = 0, asCancelled = 1, asError = 2, asDownloading = 3, asDownloaded = 4, asInstalling = 5, asCompleted = 6);
  TAppStateNames = array[asReady..asCompleted] of string;
const
  AppState: TAppStateNames = ('Ready', 'Canceled', 'Error', 'Downloading', 'Downloaded', 'Installing', 'Installed');

type
  // Data structs for progress message
  PProgressInfo = ^TProgressInfo;
  TProgressInfo = record
    Read  : Int64;
    Total : Int64;
    ID    : string;
    URL   : string;
  end;

  //Structure for record info
  TRecordInfo = record
    Filename: string;
    URL: string;
    ID: string;
    Cmd : string;
  end;

  // Class for downloading
  TDBAppItem = class
  private
    FHandle : HWND;
    FDS : TDataSet;
    FFilename: string;
    FURL: string;
    FId: string;
    FCmd : string;
    FFileSize : Int64;
    FDownloaded : Int64;
    function GetWinInetError(ErrorCode: Cardinal): string;
    procedure ParseURL(const lpszUrl: string; var Host, Resource: string);
    function GetRemoteFileSize(const Url : string): Integer;
    function DownloadFile(const url: string; const TargetFileName: string): boolean;
    procedure InternalDownloadProgress(Sender: TObject; Read: Integer; Total: Integer);
  public
    constructor Create(const OwnerHandle: HWND; var DS: TDataSet; const URL, ID: string; const Cmd: string; const TargetFilename: string);
    destructor Destroy; override;
    function Download : Boolean; overload;
  end;


  // Main class, pipeline holder
  TPipelineHolder = class
  private
    FDS : TDataSet;
    FHandle : HWND;
    FPipeline : IOmniPipeline;
    FInProcess: Boolean;
    procedure Retrieve(const input: TOmniValue; var output: TOmniValue);
    procedure RetrieveAll(const input, output: IOmniBlockingCollection);
    procedure Install(const input, output: IOmniBlockingCollection);
    procedure JobDone;
    procedure WndProc(var Message: TMessage);
    procedure WMProgressChanged(var msg: TMessage); message WM_PROGRESSCHANGED;
  public
    constructor Create;
    destructor Destroy; override;

    procedure Make(SourceDS : TDataSet);
    function Stop: Boolean;
    property InProcess: Boolean read FInProcess write FInProcess;
  end;


implementation

{ Tools }
function RunAsAdmin(const Handle: HWnd; const Filename, Params: string): Boolean;
var
  sei: TShellExecuteInfo;
begin
  FillChar(sei, SizeOf(sei), 0);
  sei.cbSize := SizeOf(sei);
  sei.Wnd := Handle;
  sei.fMask := SEE_MASK_FLAG_DDEWAIT or SEE_MASK_FLAG_NO_UI;
  sei.lpVerb := 'runas';
  sei.lpFile := PChar(Filename);
  sei.lpParameters := PChar(Params);
  sei.nShow := SW_SHOWNORMAL;
  Result := ShellExecuteEx(@sei);
end;

{TPipelineHolder}

constructor TPipelineHolder.Create;
begin
  inherited Create;
  FHandle := AllocateHWnd(WndProc);
  FInProcess := False;
end;

destructor TPipelineHolder.Destroy;
begin
  if FInProcess then
    if Assigned(FPipeline) then
      begin
        FPipeline.Cancel;
        FPipeline := nil;
        FInProcess := False;
      end;

  if FHandle <> 0 then DeallocateHWnd(FHandle);

  inherited;
end;

procedure TPipelineHolder.Install(const input, output: IOmniBlockingCollection);
var
  app      : TOmniValue;
  appFile  : string;
  appParams: string;
  ID       : string;
  State    : string;
  AppInfo  : TRecordInfo;
begin
  // In real app here is downloaded file must be started as separate process and
  // we must wait when it will be completed.
  for app in input do
    begin
      if not app.IsEmpty then
        begin
          AppInfo := app.ToRecord<TRecordInfo>;
          appFile := AppInfo.Filename;
          appParams := AppInfo.Cmd;
          ID := AppInfo.ID;
          if (appFile <> EmptyStr) and (FileExists(appFile)) then
            begin
              // Change file state
              FDS.DisableControls;
              try
                if FDS.Locate('id', ID, [loCaseInsensitive]) then
                  begin
                    FDS.Edit;
                    FDS.FieldByName('Status').AsString := AppState[asInstalling];
                    FDS.Post;
                  end;
              finally
                FDS.EnableControls;
              end;

              // Try to execute intsaller
              if RunAsAdmin(Application.Handle, appFile, appParams) then
                begin
                  State := AppState[asCompleted]
                end
              else
                begin
                  State := AppState[asError];
                end;

              // Change state again
              FDS.DisableControls;
              try
                if FDS.Locate('id', ID, [loCaseInsensitive]) then
                  begin
                    FDS.Edit;
                    FDS.FieldByName('Status').AsString := State;
                    FDS.Post;
                  end;
              finally
                FDS.EnableControls;
              end;
            end;
        end;
    end;
end;

procedure TPipelineHolder.Retrieve(const input: TOmniValue; var output: TOmniValue);
var
  App: TDBAppItem;
  Info : TRecordInfo;
begin
  // Checking cancellation flag
  if not FInProcess then Exit;

  // Preparing
  Info := input.ToRecord<TRecordInfo>;
  App := TDBAppItem.Create(FHandle, FDS, Info.URL, Info.ID, Info.Cmd,  Info.Filename);

  // Downloading
  try
    if App.Download then
      output := TOmniValue.FromRecord<TRecordInfo>(Info)
    else
      output.Clear;
  finally
    FreeAndNil(App);
  end;
end;

procedure TPipelineHolder.RetrieveAll(const input,
  output: IOmniBlockingCollection);
var
  App: TDBAppItem;
  Info : TRecordInfo;
  value : TOmniValue;
begin
  // Preparing
  for value in input do
    begin
      if not FInProcess then Exit;

      Info := value.ToRecord<TRecordInfo>;
      App := TDBAppItem.Create(FHandle, FDS, Info.URL, Info.ID, Info.Cmd, Info.Filename);

      // Downloading
      try
        if App.Download then
          output.Add(TOmniValue.FromRecord<TRecordInfo>(Info));
      finally
        FreeAndNil(App);
      end;
    end;
end;

function TPipelineHolder.Stop: Boolean;
begin
  if FInProcess then
    begin
      if Assigned(FPipeline) then
        begin
          FPipeline.Cancel;
          FPipeline := nil;
          FInProcess := False;
        end;
    end;
  Result := not FInProcess;
end;

procedure TPipelineHolder.WMProgressChanged(var msg: TMessage);
var
  MsgRec  : TProgressInfo;
  Percent, Current : Double;
  Read, Total : Int64;
  ID : string;
begin
  MsgRec := PProgressInfo(Msg.LParam )^;

  Read := MsgRec.Read;
  Total := MsgRec.Total;
  Percent := 100 * Read / Total;
  ID := MsgRec.ID;
  // Write data to db
  if FDS.Locate('id', ID, [loCaseInsensitive]) then
    begin
      FDS.DisableControls;
      try
        Current := FDS.FieldByName('Progress').AsFloat;
        if Current <> Trunc(Percent) then
          begin
            FDS.Edit;
            FDS.FieldByName('Progress').AsFloat := Round(Percent);
            if Percent >= 99 then
              begin
                FDS.FieldByName('Status').AsString := AppState[asDownloaded];
              end;
            FDS.Post;
          end;
      finally
        FDS.EnableControls;
      end;
    end;
end;

procedure TPipelineHolder.WndProc(var Message: TMessage);
begin
  Dispatch(Message);
  inherited;
end;

procedure TPipelineHolder.JobDone;
begin
  FPipeline := nil;
  FInProcess := False;
end;

procedure TPipelineHolder.Make(SourceDS: TDataSet);
var
  BM            : TBookmark;
  RecInfo       : TRecordInfo;
begin
  if SourceDS = nil then Exit;
  if not SourceDS.Active then Exit;
  if SourceDS.IsEmpty then Exit;

  FDS := SourceDS;
  FInProcess := True;

  // Here at first stage calling Retrive or RetrieveAll gives same effect, no
  // matter what we uses value or queue.
  FPipeline := Parallel.Pipeline
   .Stage(RetrieveAll, //Retrieve
 Parallel.TaskConfig.OnMessage(Self)).NumTasks(Environment.Process.Affinity.Count * 2)
   .Stage(Install)
   .OnStop(JobDone)
   .Run;

  // Get URLs to be downloaded
  BM := FDS.GetBookmark;
  FDS.DisableControls;
  try
    FDS.First;
    while not FDS.Eof do
      begin
        // Get data from database
        RecInfo.URL := Trim(FDS.FieldByName('url').AsString);
        RecInfo.Id := Trim(FDS.FieldByName('id').AsString);
        RecInfo.Cmd := Trim(FDS.FieldByName('silent_parameters').AsString);
        RecInfo.Filename := ExtractFilePath(ParamStr(0)) + 'Downloads\' + Trim(FDS.FieldByName('target_file').AsString);

        if RecInfo.URL = EmptyStr then
          begin
            // Skips empty URLs
            FDS.Next;
            Continue;
          end;
        FDS.Edit;
        FDS.FieldByName('Status').AsString := AppState[asDownloading];
        FDS.Post;

        FPipeline.Input.Add(TOmniValue.FromRecord<TRecordInfo>(RecInfo));
        FDS.Next;
      end;
  finally
    if FDS.BookmarkValid(BM) then SourceDS.GotoBookmark(BM);
    FDS.FreeBookmark(BM);
    FDS.EnableControls;
  end;

  FPipeline.Input.CompleteAdding;

  // Wait for pipeline to complete - I'm not use it to avoid GUI freezing
//  FPipeline.WaitFor(INFINITE);
end;

constructor TDBAppItem.Create(const OwnerHandle: HWND; var DS: TDataSet; const URL, ID, Cmd, TargetFilename: string);
begin
  inherited Create;
  FDS         := DS;
  FURL        := URL;
  FId         := ID;
  FCmd        := Cmd;
  FFilename   := TargetFilename;
  FHandle     := OwnerHandle;
  FFileSize   := -1;
  FDownloaded := 0;
end;

destructor TDBAppItem.Destroy;
begin
  FDS := nil;
  inherited;
end;

function TDBAppItem.Download: Boolean;
var
  path : string;
begin
  path := ExtractFilePath(FFilename);
  if not DirectoryExists(path) then
    if not ForceDirectories(path) then
      raise Exception.Create('Cannot create directory: "'+path+'".');

  if FileExists(FFilename) then
    try
      if not DeleteFile(FFilename) then
        raise Exception.Create('Cannot delete file: "'+FFilename+'".');
    except on E: Exception do
      raise Exception.Create('Cannot delete file: "'+FFilename+'".'+sLineBreak + E.Message);
    end;

  Result := DownloadFile(FURL, FFilename);
  if Result then Result := FileExists(FFilename);
end;

function TDBAppItem.DownloadFile(const url, TargetFileName: string): boolean;
var
  hInet: HINTERNET;
  hFile: HINTERNET;
  localFile: file;
  buffer: array[1..65535] of Byte;
  bytesRead: DWORD;
  b: boolean;
begin
  b := False;
  if FFileSize < 0 then FFileSize := GetRemoteFileSize(url);
  FDownloaded := 0;
  hInet := WinInet.InternetOpen('MyFileAgent', INTERNET_OPEN_TYPE_DIRECT, nil, nil, 0);
  if Assigned(hInet) then
    begin
      hFile := InternetOpenURL(hInet, PChar(url), nil, 0, INTERNET_FLAG_PRAGMA_NOCACHE, 0);
      if Assigned(hFile) then
        begin
          AssignFile(localFile, TargetFileName);
          Rewrite(localFile, 1);
          bytesRead := 0;
          repeat
            InternetReadFile(hFile, @buffer, SizeOf(buffer), bytesRead);
            BlockWrite(localFile, buffer, bytesRead);
            Inc(FDownloaded, bytesRead);
            //In real app this event occurs in TALWinHttpClient from Alcinoe library.
            InternalDownloadProgress(Self, FDownloaded, FFileSize);
          until bytesRead = 0;
          CloseFile(localFile);
          InternetCloseHandle(hFile);
        end;
      InternetCloseHandle(hInet);
      b := true;
    end;
  DownloadFile := b;
  FFileSize := -1;
  FDownloaded := 0;
end;

function TDBAppItem.GetRemoteFileSize(const Url: string): Integer;
const
  sUserAgent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36';
var
  hInet    : HINTERNET;
  hConnect : HINTERNET;
  hRequest : HINTERNET;
  lpdwBufferLength: DWORD;
  lpdwReserved    : DWORD;
  ServerName, Resource: string;
//  Prot, Host, User, Pass, Path, Extra: string;
  ErrorCode : Cardinal;
begin
  Result := -1;
  ParseURL(Url, ServerName, Resource);

  hInet := InternetOpen(PChar(sUserAgent), INTERNET_OPEN_TYPE_PRECONFIG, nil, nil, 0);
  if hInet=nil then
    begin
      ErrorCode:=GetLastError;
      raise Exception.Create(Format('InternetOpen Error %d Description %s',[ErrorCode,GetWinInetError(ErrorCode)]));
    end;

  try
    hConnect := InternetConnect(hInet, PChar(ServerName), INTERNET_DEFAULT_HTTP_PORT, nil, nil, INTERNET_SERVICE_HTTP, 0, 0);
    if hConnect=nil then
      begin
        ErrorCode:=GetLastError;
        raise Exception.Create(Format('InternetConnect Error %d Description %s',[ErrorCode,GetWinInetError(ErrorCode)]));
      end;

    try
      hRequest := HttpOpenRequest(hConnect, PChar('HEAD'), PChar(Resource), nil, nil, nil, 0, 0);
      if hRequest<>nil then
        begin
          try
            lpdwBufferLength := SizeOf(Result);
            lpdwReserved     := 0;
            if not HttpSendRequest(hRequest, nil, 0, nil, 0) then
              begin
                ErrorCode := GetLastError;
                raise Exception.Create(Format('HttpOpenRequest Error %d Description %s',[ErrorCode,GetWinInetError(ErrorCode)]));
              end;

            if not HttpQueryInfo(hRequest, HTTP_QUERY_CONTENT_LENGTH or HTTP_QUERY_FLAG_NUMBER, @Result, lpdwBufferLength, lpdwReserved) then
              begin
                Result := 0;
                ErrorCode := GetLastError;
                raise Exception.Create(Format('HttpQueryInfo Error %d Description %s',[ErrorCode,GetWinInetError(ErrorCode)]));
              end;
          finally
            InternetCloseHandle(hRequest);
          end;
        end
      else
        begin
          ErrorCode:=GetLastError;
          raise Exception.Create(Format('HttpOpenRequest Error %d Description %s',[ErrorCode,GetWinInetError(ErrorCode)]));
        end;
    finally
      InternetCloseHandle(hConnect);
    end;
  finally
    InternetCloseHandle(hInet);
  end;
end;

function TDBAppItem.GetWinInetError(ErrorCode: Cardinal): string;
const
  winetdll = 'wininet.dll';
var
  Len: Integer;
  Buffer: PChar;
begin
  Len := FormatMessage(
  FORMAT_MESSAGE_FROM_HMODULE or FORMAT_MESSAGE_FROM_SYSTEM or
  FORMAT_MESSAGE_ALLOCATE_BUFFER or FORMAT_MESSAGE_IGNORE_INSERTS or  FORMAT_MESSAGE_ARGUMENT_ARRAY,
  Pointer(GetModuleHandle(winetdll)), ErrorCode, 0, @Buffer, SizeOf(Buffer), nil);
  try
    while (Len > 0) and (CharInSet(Buffer[Len - 1], [#0..#32, '.'])) do Dec(Len);
    SetString(Result, Buffer, Len);
  finally
    LocalFree(HLOCAL(Buffer));
  end;
end;

procedure TDBAppItem.ParseURL(const lpszUrl: string; var Host,
  Resource: string);
var
  lpszScheme      : array[0..INTERNET_MAX_SCHEME_LENGTH - 1] of Char;
  lpszHostName    : array[0..INTERNET_MAX_HOST_NAME_LENGTH - 1] of Char;
  lpszUserName    : array[0..INTERNET_MAX_USER_NAME_LENGTH - 1] of Char;
  lpszPassword    : array[0..INTERNET_MAX_PASSWORD_LENGTH - 1] of Char;
  lpszUrlPath     : array[0..INTERNET_MAX_PATH_LENGTH - 1] of Char;
  lpszExtraInfo   : array[0..1024 - 1] of Char;
  lpUrlComponents : TURLComponents;
begin
  ZeroMemory(@lpszScheme, SizeOf(lpszScheme));
  ZeroMemory(@lpszHostName, SizeOf(lpszHostName));
  ZeroMemory(@lpszUserName, SizeOf(lpszUserName));
  ZeroMemory(@lpszPassword, SizeOf(lpszPassword));
  ZeroMemory(@lpszUrlPath, SizeOf(lpszUrlPath));
  ZeroMemory(@lpszExtraInfo, SizeOf(lpszExtraInfo));
  ZeroMemory(@lpUrlComponents, SizeOf(TURLComponents));

  lpUrlComponents.dwStructSize      := SizeOf(TURLComponents);
  lpUrlComponents.lpszScheme        := lpszScheme;
  lpUrlComponents.dwSchemeLength    := SizeOf(lpszScheme);
  lpUrlComponents.lpszHostName      := lpszHostName;
  lpUrlComponents.dwHostNameLength  := SizeOf(lpszHostName);
  lpUrlComponents.lpszUserName      := lpszUserName;
  lpUrlComponents.dwUserNameLength  := SizeOf(lpszUserName);
  lpUrlComponents.lpszPassword      := lpszPassword;
  lpUrlComponents.dwPasswordLength  := SizeOf(lpszPassword);
  lpUrlComponents.lpszUrlPath       := lpszUrlPath;
  lpUrlComponents.dwUrlPathLength   := SizeOf(lpszUrlPath);
  lpUrlComponents.lpszExtraInfo     := lpszExtraInfo;
  lpUrlComponents.dwExtraInfoLength := SizeOf(lpszExtraInfo);

  InternetCrackUrl(PChar(lpszUrl), Length(lpszUrl), ICU_DECODE or ICU_ESCAPE, lpUrlComponents);

  Host := lpszHostName;
  Resource := lpszUrlPath;
end;

procedure TDBAppItem.InternalDownloadProgress(Sender: TObject; Read,
  Total: Integer);
var
  MsgRec : PProgressInfo;
begin
  // Create progress changed message
  New(MsgRec);
  MsgRec^.ID := Fid;
  MsgRec^.Read := Read;
  MsgRec^.Total := Total;
  MsgRec^.URL := FURL;

  SendMessage(FHandle, WM_PROGRESSCHANGED, 0, LongInt(MsgRec));
end;

end.

My basic idea is creating pipeline with 2 stages:

  1. Retrieve: downloading all files at the same time (threads count is constrained by NumTasks from OTL).

  2. Install: As soon as any file be downloaded, it must be processed by this stage. Action in this stage must be one by one, i.e. only one action in same time (in real app I won't start many installers together).

I try to understand how OTL works here, but I have not many experience with this library yet.

So, dear community, how I must rewrite my code to:

  1. Have parallel downloadings at Stage 1 (now it works one-by-one).

  2. Have possibility to correctly stop Pipeline with GUI (now I call FPipeline.Cancel by TButton pressing and it cannot stop tasks immediately).

Sources also placed here.

Thanks in advance. I'll be glad meet any advice here.


Solution

  • 1) Download works in parallel just fine - as far as OTL is concerned. On my machine the test app starts three parallel downloads each time I press F9. The other two downloads get stuck in the

    hFile := InternetOpenURL(hInet, PChar(url), nil, 0, INTERNET_FLAG_PRAGMA_NOCACHE, 0);
    

    call. IOW, all five downloader threads enter InternetOpenURL, but only three exit immediately and start downloading. I have no idea why (and it is related to WinINET, not OTL).

    2) Cancellation doesn't work because noone tells the DownloadFile method to stop. IOmniPipeline.Cancel just calls CompleteAdding on each pipeline and tells each stage to stop processing input. It cannot stop the code which is already working on an input element (i.e. your DownloadFile method). You must do that yourself.

    One way to do that is to create a global Cancel flag and change DownloadFile so that it checks whether this flag is set in the following loop:

    repeat
      InternetReadFile(hFile, @buffer, SizeOf(buffer), bytesRead);
      BlockWrite(localFile, buffer, bytesRead);
      Inc(FDownloaded, bytesRead);
      //In real app this event occurs in TALWinHttpClient from Alcinoe library.
      InternalDownloadProgress(Self, FDownloaded, FFileSize);
      if FCancelled then break; // <-----------
    until bytesRead = 0;
    

    You could also change InternalDownloadProgress and add a var cancelled: boolean parameter which could be set in the event handler when the pipeline needs to shut down.