0

I'm trying to automatize a process of updating some data in the backend of my program. I'm using my Angular frontend, where I created a function only the master user can access and that should make him login in every administration (tenant), where it will download some objects with some wrong data inside, ask from a google service the right data and update the data in the backend, and do all this operations for each tenant.

I thougt of writing each of this operations as an observable and using concat to do everything in order, but before even finishing getting the right data, which I do inside a tap, it alredy tries to login in the next tenant, so when it actually has the right data he wont be able to upload them to the backend since it will refuse them as from the wrong tenant.

I think this problem is caused by the long operations needed in the tap (and there are some stuff I'll need to do that will require even more time),.

This is my code snippet (without unrelated stuff):

const obsList = [] as Observable<any>[];
this.assignedTenants.forEach(tenant => {
  const obsList2 = [] as Observable<any>[];
  obsList.push(this.authenticationService.login(new Credentials(usr, psw), tenant.id));
  obsList.push(this.structureService.getStructuresWithWrongAltitude()
    .pipe(tap(structuresReceived => {
      obsList2 = [] as Observable<any>[];
      if (structuresReceived != null && structuresReceived.length > 0) {
        structuresReceived.forEach(s => {
          this.getElevation(new google.maps.LatLng(s.centro.coordinates[0], s.centro.coordinates[1]))
            .then(a => {
              s.centroAltitudine = a;
              this.obsList2.push(this.structureService.putStructure(s));
            })
            .catch();
        });
      }
  })));
  obsList.push(forkJoin(obsList2)
    .pipe(tap(() => this.storageService.logout())));
});
concat(...obsList).subscribe();

As you can see the this code should create and execute 3 observables for each tenant, the first one used to login, the second one to get the wrong data, get the right data and prepare for the third, which will update the data. As I said, usually when entering the tap from the second observable, the getStructuresWithWrongAltitude one, I can alredy see by using logs that it tries to login to other tenants.

My theory is that as soon as it gets the wrong data it tries to execute the third observable, which is still void, and go to the next tenant, but I dont know how to fix this.

I would need a way for the second observable to not emit until the tap is completed, or another way to prevent the concat to go on before other operations have finished

Thank you for your help

EDIT:

I was able to fix this by making the getElevation (which returns a promise) to an observable list, which would in turn create a new observable list to save the data.

As I said before I need to do something very similar, with the difference that this time the tap will actually have to do a lot of calculations that will take a long time, so I wont be able to use the same fix, as such my question remains: can I make the concat wait until the tap is finished?

EDIT 2 for clarification

As I said in my last edit, that specific example got solved by converting the stuff inside the tap into other observables, but I have almost the same problem with another function

this function needs to find files inside a folder, before uploading them

const folderInput = this.folderInput.nativeElement;
folderInput.onchange = () => {
  this.filesUploaded = folderInput.files;
  const obsList = [] as any[];

  this.assignedTenants.forEach(tenant => {
    const obsList2 = [] as Observable<any>[];

    obsList.push(this.authenticationService.login(new Credentials(usr, psw), tenant.id));

    obsList.push(this.fileService.getAll()
      .pipe(
        tap(filesReceived => {
          if (filesReceived != null && filesReceived.length > 0) {
            console.log('upload picture: received list of files to update');

            let i = filesReceived?.length;
            filesReceived?.forEach(f => {
              const pathReceived = (f.originalFilename as string).substr(1).split('\\');

              let found = false;
              let index = -1;
              
              // searching the file in the folder
              //...
              
              if (found) {
                console.log('found a file');
                const selectedFile = this.filesUploaded[index];
                const formData = new FormData();
                formData.append('file', selectedFile, selectedFile.name);
                obsList2.push(this.fileService.updateFile(formData, f.id));
              }
              i--;
            });
            console.log('upload picture: updated obsList2');
            obsList.push(forkJoin(obsList2).subscribe(() => {
              console.log('upload picture: uploaded pictures');
              this.storageService.logout();
            }));
          }
      }))
    );
  });

  this.loadingIndicatorService.loading$.next(true);
  let counter = obsList.length;
  concat(...obsList).subscribe(() => {
    counter--;
    console.log('upload pictures: remaining phases: ' + counter);
    if (counter <= 0) {
      this.loadingIndicatorService.loading$.next(false);
  }
});
};
folderInput.click();
  • There are few things here that I do not understand, but the first one is related to the constant `obsList2`. You declare it in the third line but then reset it in the seventh line with the code `obsList2 = [] as Observable[];`. Are you sure this actually works in Typescript? Once we clarify this we can proceed with the rest. – Picci Feb 11 '21 at 15:08
  • Have you tried this SO post? https://stackoverflow.com/questions/43336549/how-to-force-observables-to-execute-in-sequence. This article also talks about event ordering - https://juristr.com/blog/2019/01/Guarantee-Event-Order-with-RxJS/ – Andy Danger Gagne Feb 11 '21 at 15:12
  • @Picci yes it works, I just make it empty again to be sure, but it does not generate any error or warning – MassimoDeFacci4EmmeService Feb 11 '21 at 15:24
  • @AndyDangerGagne the problem is not excecuting them in order, but to make them wait for the tap of the second pipe, which will take some time. As I said the one in the example was solved, but it does not work with a more complex tap. – MassimoDeFacci4EmmeService Feb 11 '21 at 15:26

2 Answers2

0

If I have understood the core of the problem, I think that the core point is that the operation in the tap is aysnchronous and so tap is not exactly the right operator if you want to wait for the result of it. So you better use something like concatMap. The other thing I would do is to transform the Promise to Observable and then use pipe against it to perform the fetch operation, call the Google service and then update. Last point is about the use of concat at the end. This means that you would attack each tenant sequentially. If this is what you want to do this is ok. If you think you can proceed in parallel you may want to think to substitute concat with forkJoin.

The code would look something like this.

const obsList = [] as Observable<any>[];
this.assignedTenants.forEach(tenant => {
  obsList.push(
    // create here an Observable which executes login, fetch the wrong data, ask Google for the right data and update sequentially
    this.authenticationService.login(new Credentials(usr, psw), tenant.id)).pipe(
      concatMap(() => this.structureService.getStructuresWithWrongAltitude()),
      concatMap(structuresReceived => {
        const obsList3 = [] as Observable<any>[];
        if (structuresReceived != null && structuresReceived.length > 0) {
          structuresReceived.forEach(s => {
            // transform the Promise into an Observable using the from function
            // and then concatenate with the update operation
            obsList3.push(
              from(this.getElevation(new google.maps.LatLng(s.centro.coordinates[0], s.centro.coordinates[1]))).pipe(
                concatMap(a => {
                  s.centroAltitudine = a;
                  return this.structureService.putStructure(s)
                ),
              )
            )
          }
        }
        // execute the calls to Google in parallel and  (for each tenant)
        return forkJoin(obsList3)
      }),
      concatMap(() => this.storageService.logout())
    )
  });
});
concat(...obsList).subscribe();
Picci
  • 16,775
  • 13
  • 70
  • 113
0

In short: No

You can never ever never make synchronous code wait for asynchronous code in javascript. JS runs on one thread and if you try this your program will stop. JS does have async-await to make it look like synchronous code is waiting (but it just places the continuation on the event loop and doesn't wait at all).

On the other hand, all the synchronous code in your tap will finish (100% of the time) before the next part of your observable pipeline can execute.

But

There's good news, you never need to

You never need synchronous code to wait for asynchronous code in javascript. If you're using observables, you have all the tools you need to decide the order your code runs in.

If, inside your tap, if you have a .then or a .subscribe, you're likely doing something wrong. In RxJS, this is considered a code smell for good reason.

Your code (as it sits right now) is tough to read, so it's hard to get more than the broad strokes of what you're attempting.

Here's how I understand it:

For each user:

  1. Log in a user using an id
  2. call this.fileService.getAll() // Is this done as the logged in user? Does your service handle this for you?
  3. call this.fileService.updateFile on 0+ files

Here's a rough go at it. This definitely won't compile. Also, it could be cleaned up a bunch if I knew a little bit more about the functioning of your observables, but they're a bit cryptic from the code shown above.

from(this.assignedTenants).pipe(
  concatMap(tenant => concat(
    this.authenticationService.login(new Credentials(usr, psw), tenant.id),
    this.fileService.getAll().pipe(
      switchMap(filesReceived => forkJoin(
        filesReceived.map(f => {
          //Code to get formData and such
          if(found){
            return this.fileService.updateFile(formData, f.id);
          }
          return null;
        }).filter(v => v != null)
      )
    ))
  )),
).subscribe(result => {
  console.log("Result of forkjoin: ", result);
}

Some Refactorting:

/*****
 * An Observable that gets all files, updates them, then completes
 *****/
function updateFiles(): Observable<any[]>{
  return this.fileService.getAll().pipe(
    // This map should turn every file received into either:
    //  1. A service call to update that file
    //  2. null
    map(filesReceived => filesReceived.map(f => {
      //Code to get formData and such
      if(found){
        return this.fileService.updateFile(formData, f.id);
      }
      return null;
    })),
    // Filter out null entries in our serviceCalls array
    map(serviceCalls => serviceCalls.filter(
      serviceCall => serviceCall != null
    )),
    // subscribe to all our service calls at once
    switchMap(serviceCalls => forkJoin(serviceCalls))
  );
}

from(this.assignedTenants).pipe(
  // ConcatMap won't start the second tenant until the first one's 
  // updateFiles() observable completes.
  concatMap(tenant => concat(
    this.authenticationService.login(new Credentials(usr, psw), tenant.id),
    updateFiles()
  )),
).subscribe({
  next: result => console.log("The Result of login(...) or updateFiles()", result),
  complete: () => console.log("Every file for every tenant is done")
})
Mrk Sef
  • 7,557
  • 1
  • 9
  • 21
  • @MassimoDeFacci4EmmeService I took a moment to refactor and comment on the code I sent you. It still won't compile as-in, but I've tried to create some separation of concerns to break apart the mapping, filtering, forkJoining into separate functionality so it's easier to read. Hopefully, that's a better place to start/learn from. – Mrk Sef Feb 12 '21 at 16:38