angularrxjssubject-observer

File upload progress not being updated in the observable or completed before the observable is ready


Below is a snippet of code that calls the function below. This is a process that allows people to drag and drop files from the system to the website. It displays a list of all the files with a progress bar as they are loaded. It works fine most of the time but when there are a large number of files I run into some problems. I have a test directory that I am loading that has over 100 files in it. The first files that are getting loaded are pretty small so it seems that they are getting loaded before the observable is getting set up because the progress bar does not show any progress and the forkJoin does not complete but if I look on the system the files are actually loaded.

Am I not setting the Subject up correctly? Is there a better way to track the progress of files that are being uploaded? Any help would be appreciated.

if (this.files.size > 0) {
  this.progress = await this.uploadService.dndUpload(
    this.files, this.currentDir, this.currentProject, timestamp
  );
  let allProgressObservables = [];
  for (let key in this.progress) {
    allProgressObservables.push(this.progress[key].progress);
  }

  this.sfUploadSnackbar.openSnackbar(this.files, this.progress);

  forkJoin(allProgressObservables).subscribe(async end => {
    this.sfUploadSnackbar.closeSnackbar();
    this.uploadService.clearUploadDir(this.currentProject, timestamp)
      .subscribe();
    this.uploadInProgress = false;
    this.getFiles();
  });
}





 async dndUpload(files: Set<any>, dir: string, projectId: number, timestamp: number) {
    const status: { [key: string]: { progress: Observable<number> } } = {};

    for (let it = files.values(), file = null; file = it.next().value;) {

      let params = new HttpParams()
        .set('dir', dir)
        .set('path', file.fullPath.replace(file.name,''))
        .set('projectId', projectId.toString())
        .set('timestamp', timestamp.toString());
      let f: File = await new Promise((resolve, reject) => file.file(resolve, reject))
      const formData: FormData = new FormData();
      formData.append('file', f, f.name);

      const req = new HttpRequest('POST', '/api/dndUpload', formData, {
        reportProgress: true, params
      });

      const progress = new Subject<number>();

      status[file.name] = {
        progress: progress.asObservable()
      };

      this.http.request(req).subscribe(event => {
        if (event.type === HttpEventType.UploadProgress) {

          const percentDone = Math.round(100 * event.loaded / event.total);

          progress.next(percentDone);
        } else if (event instanceof HttpResponse) {

          progress.complete();
        }
      });
    }

    return status;
  }

Solution

  • In order for forkJoin to complete, you must make sure that all of the provided observables complete. What might happen is that forkJoin subscribes too late to the Subjects from allProgressObservables.

    I assume that this.sfUploadSnackbar.openSnackbar(this.files, this.progress); will subscribe to this.progress in order to receive the percent for each file.

    Here's an idea:

    dndUpload (files: Set<...>/* ... */): Observable<any> {
      // Note that there are no subscriptions
      return [...files].map(
        f => from(new Promise((resolve, reject) => file.file(resolve, reject)))
          .pipe(
            map(f => (new FormData()).append('file', f, f.name)),
          )
      )
    }
    
    const fileObservables$ = this.dndUpload(files);
    
    const progressObservables$ = fileObservables$.map(
      (file$, fileIdx) => file$.pipe(
        switchMap(formData => {
          const req = /* ... */;
    
          return this.http.request(req)
            .pipe(
              filter(event.type === HttpEventType.UploadProgress),
              // Getting the percent
              map(e => Math.round(100 * e.loaded / e.total)),
              tap(percent => this.updatePercentVisually(percent, fileIdx))
            )
        })
      )
    );
    
    // Finally subscribing only once to the observables
    forkJoin(progressObservables$).subscribe(...);
    

    Notice there are a few changes:

    this.http.request will complete when the request fulfills, so forkJoin should be able to complete as well, allowing you to do the 'cleanups'(removing loading progress etc...).