0

I am trying to create a loop to execute transactions over more than 500 documents.

I have had a close look at posts like this one, however, the solutions there only apply to batch writes.

In my case I really need to perform an actual transaction where I lock down a doc I am reading from.

I am testing the code from the functions shell.

I am not really clear on what I should be doing differently since I am actually awaiting the read before performing the writes, and returning over each iteration?

Any hint would be greatly appreciated. Thanks.

My code:


import * as fb from "../utils/init";
import * as functions from "firebase-functions";
import { sub, add, startOfDay } from "date-fns";
import { getFirestore, FieldValue } from "firebase-admin/firestore";
import { Transaction } from "@app/types";
const chunk = require("lodash.chunk");

const db = getFirestore();

export const cronExpireCredits = functions
  .runWith({
    failurePolicy: true,
  })
  .region(...fb.REGIONS)
  .pubsub.schedule("0 1 * * *")
  .timeZone("Europe/London")
  .onRun(async (context) => {
    const startDate = sub(startOfDay(new Date()), {
            months: Number(fb.EXPIRY_WINDOW_IN_MONTHS),
          });

    const endDate = add(startDate, { days: 1 });

    try {
      const querySnapshot = await db
        .collectionGroup("credits")
        .where("expire_on", ">", startDate)
        .where("expire_on", "<", endDate)
        .get();


      if (querySnapshot.empty) {
        console.log("No matching documents.");
        return;
      }

      chunk(querySnapshot.docs, 500).map(async (userDocs: any) => {
        return await db.runTransaction(async (transaction) => {
          userDocs.forEach(async (userDoc: any) => {

            let userPublicDoc = await transaction.get(
              db
                .collection("users")
                .doc(userDoc.data().assigned_to)
                .collection("public")
                .doc("profile")
            );
            let userCurrentBalance = userPublicDoc.data()?.balance;
            let creditsAmountToExpire = userDoc.data().balance;

            let newBalance = userCurrentBalance - creditsAmountToExpire;
            transaction.update(userPublicDoc.ref, {
              balance: newBalance,
            });
            transaction.update(userDoc.ref, {
              status: "expired",
              balance: 0,
            });

            const balanceTransaction: Transaction = { // my transaction object };

            transaction.set(
              userPublicDoc.ref
                .collection("balance")
                .doc(`bal-${context.eventId}`),
              balanceTransaction
            );

            const transaction: Transaction = { // my transaction object };

            transaction.set(
              userDoc.ref
                .collection("transactions")
                .doc(`cpt-${context.eventId}`),
              transaction
            );

          });
        });
      });
    } catch (e) {
      console.log("error", e);
    }
  });
Frank van Puffelen
  • 565,676
  • 79
  • 828
  • 807
Stf_F
  • 846
  • 9
  • 23

1 Answers1

1

Ultimately, the problem lies in that you are using async in a forEach callback. These create "floating promises" that aren't properly chained to the caller. This means that the callback passed to runTransaction will resolve before any work that it is meant to do has completed, which leads to the error saying you are trying to modify an already completed batch.

To correctly chain this, you must replace someArray.forEach(async () => { /* ... */}) with the Promise.all(someArray.map(async () => { /* ... */ })) pattern.

However, this isn't the only issue with this code.

  • When using transactions you must perform all read operations before write operations.
  • The chunk call has been dropped down to use 125 because there is a limit of 500 write operations per transaction, but each document writes to 4 separate documents meaning using 500 would exceed the limit (125*4=500 vs. 500*4=2000).
  • If an assignedTo user document does not exist, the call will now error. Otherwise you would be performing arithmetic using undefined which would lead to further head scratching. You should figure out how you want to handle these "orphaned" balance calculations. One possibility is to collect any missing document into an array in the current runTransaction callback and return it to the caller so that all other entries in that batch still process.
  • Without seeing your database, I can't tell if your transaction will not hit the same assignedTo user more than once, so you may find that using context.eventId in the ID of the updated documents may collide. Instead, embed the eventId value into the document itself and use Firestore's automatic IDs to handle the document ID.
  • Don't use any. Either use unknown, let TypeScript handle it or specify the proper type. If for whatever reason the types aren't handled properly, consult the API Reference and use the proper types. The type comments above are added for the reader's benefit here on StackOverflow, you should delete them when importing it into your codebase and let Typescript handle them.
// written freehand, expect typos
chunk(querySnapshot.docs, 125)
  .map(async (userDocs /* QueryDocumentSnapshot[] */) => {
    return db.runTransaction(async (transaction) => {
      // transaction read phase
      const arrAssignedToUserPubProfileData /* ({ ref: DocumentReference, balance: number })[] */ = await Promise.all(
        userDocs.map(
          async (userDoc /* QueryDocumentSnapshot */) => {
            const userPublicDocSnapshot = await transaction.get(
              db
                .collection("users")
                .doc(userDoc.data().assigned_to)
                .collection("public")
                .doc("profile")
            );

            if (!userPublicDocSnapshot.exists()) {
              throw new Error(`User #${userDoc.data().assigned_to} in the "assigned_to" field of #${userDoc.id} does not exist.`);
            }

            return {
              ref: userPublicDocSnapshot.ref,
              balance: userPublicDocSnapshot.get("balance")
            }
          })
        )
      );

      // transaction write phase
      return Promise.all(
        arrAssignedToUserPubProfileData.map(
          async ({ ref: assignedToUserPubProfileDocRef, balance: assignedToUserBalance }) => {
            const creditsAmountToExpire = userDoc.get("balance");

            const newBalance = assignedToUserBalance - creditsAmountToExpire;
            transaction.update(assignedToUserPubProfileDocRef, {
              balance: newBalance,
            });
            transaction.update(userDoc.ref, {
              status: "expired",
              balance: 0,
            });

            const balanceEventData: DocumentData = {
              /* my updated data object */
              type: 'bal',
              eventId: context.eventId
            };

            transaction.set(
              assignedToUserPubProfileDocRef
                .collection("balance")
                .doc(), // <- no arguments generates an ID automatically
              balanceEventData
            );

            const cptEventData: DocumentData = {
              /* my updated data object */
              type: 'cpt',
              eventId: context.eventId
            };

            transaction.set(
              userDoc.ref
                .collection("transactions")
                .doc(),
              cptEventData
            );
          }
        )
      );
    });
  });
samthecodingman
  • 23,122
  • 4
  • 30
  • 54
  • Hi Sam, many thanks for taking the time to answer my question in a such insightful way. I have have followed your guidance for the structure of the code and the order of operations, and things are working now. I will give a serious ponder to your comment on the eventID as I don't think I will run into the issue. Cheers – Stf_F Mar 17 '22 at 11:36