1

Essentially, what is the best way to calculate a rolling balance with Google Cloud Datastore to determine when it is time to replenish the user's wallet?

Payments & Transactions

I maintain a payment platform where our users can pay for various items from third party agencies. Unfortunately the nature of my industry is that these payment events are not sent to us in real-time, they get bundled up into a batch and sent us a few hours to a few weeks later.

These are the primary objects that would effect a user's wallet balance:

class Transaction(ndb.model):
    user = ndb.KeyProperty(User, required=True)
    amount = ndb.FloatProperty(required=True)
    # ... other fields

class Payment(ndb.model):
    user = ndb.KeyProperty(User, required=True)
    amount = ndb.FloatProperty(required=True)
    # ... other fields

    @classmethod
    def charge(cls, user, amount):
        # ... make a call to braintree/stripe & save result if successful

(not shown are Refunds, 'Store Credit', Adjustments, etc)

The Wallet

However a large percentage of the transaction amounts are < $1. Since we have to pass the cost of credit-card processing on to the user, our user's maintain wallets with us to minimize these fees.

They can load up $10-$200, Transactions deduct from that balance, and when their balance is low (less than $2), we charge their card to replenish their account.

This is how I envision the model for wallet activity working

class WalletActivity(ndb.Model):
    user = ndb.KeyProperty(User, required=True)
    post_date = ndb.DateTimeProperty(required=True)
    balance_increment = ndb.FloatProperty(required=True)
    balance_result = ndb.FloatProperty(required=True)
    # the key to the Transaction or Payment object that this is for
    object_key = ndb.KeyProperty(required=True)

    @classmethod
    def create(cls, obj, previous_balance):
        return WalletActivity(
            user_key=obj.user,
            post_date=datetime.datetime.now(),
            balance_increment=obj.amount,
            balance_result=previous_balance+obj.amount,
            object_key=obj.key)

    @classmethod
    def fetch_last_wallet_activity(cls, user_key):
        return cls.query(cls.user == user_key).order(-cls.post_date).get()

Calculating Balance

To determine balance, the 2 ends of the spectrum seem to be:

  • calculating on the fly, sum an account's entire wallet history
  • storing a precomputed value (WalletActivity.fetch_last_wallet_activity().balance_result)

The right answer here sounds like a combination of the 2. Store some kind of BalanceUpdate / WalletDaySummary object at the end of each day on each account. You then only sum up today's activity and add it yesterday's BalanceUpdate. https://stackoverflow.com/a/4376221/4458510

class BalanceUpdate(ndb.model):
    user = ndb.KeyProperty(User)
    cut_off_date = ndb.DateTimeProperty()
    balance = ndb.IntegerProperty()

    @classmethod
    def current_balance(cls, user_key):
        last_balance_update = cls.query(cls.user == user_key).order(
            -cls.cut_off_date).get()
        recent_wallet_activity = WalletActivity.query(cls.user == user_key, 
            cls.post_date > last_balance_update.cut_off_date).fetch()
        return (last_balance_update.balance + 
            sum([i.balance_increment for i in recent_wallet_activity]))

However this may not work for a corporate account that generates a lot of transactions in a single day. It may be better to use the balance_result of the most recent WalletActivity

How to Process Transactions

Option 1

To process a batch of transactions we would

  1. Get the user's balance
  2. Replenish their account if the existing balance is low
  3. Add the transactions to their wallet

Code:

def _process_transactions(user, transactions, last_wallet_activity):
    transactions_amount = sum([i.amount for i in transactions])
    # 2. Replenish their account if the existing balance is low
    if last_wallet_activity.balance_result - transactions_amount < user.wallet_bottom_threshold:
        payment = Payment.charge(
            user=user,
            amount=user.wallet_replenish_amount + transactions_amount)
        payment.put()
        last_wallet_activity = WalletActivity.create(
            obj=payment,
            previous_balance=last_wallet_activity.balance_result)
        last_wallet_activity.put()
    # 3. Add the transactions to their wallet
    new_objects = []
    for transaction in transactions:
        last_wallet_activity = WalletActivity.create(
            obj=transaction,
            previous_balance=last_wallet_activity.balance_result)
        new_objects.append(last_wallet_activity)
    ndb.put_multi(new_objects)
    return new_objects

def process_transactions_1(user, transactions):
    # 1. Get the user's balance from the last WalletActivity
    last_wallet_activity = WalletActivity.fetch_last_wallet_activity(user_key=user.key)
    return _process_transactions(user, transactions, last_wallet_activity)

An issue with both WalletActivity.fetch_last_wallet_activity().balance_result and BalanceUpdate.current_balance() is that datastore queries are eventually consistent.

I thought about using entity-groups & ancestor queries, however it sounds like you'd face contention errors:

Option 2 - Fetch Last WalletActivity by key

We can track the key of the last WalletActivity since fetching by key is strongly consistent:

class LastWalletActivity(ndb.Model):
    last_wallet_activity = ndb.KeyProperty(WalletActivity, required=True)

    @classmethod
    def get_for_user(cls, user_key):
        # LastWalletActivity has the same key as the user it is for
        return ndb.Key(cls, user_key.id()).get(use_cache=False, use_memcache=False)

def process_transactions_2(user, transactions):
    # 1. Get the user's balance from the last WalletActivity
    last_wallet_activity = LastWalletActivity.get_for_user(user_key=user.key)
    new_objects = _process_transactions(user, transactions, last_wallet_activity.last_wallet_activity)

    # update LastWalletActivity
    last_wallet_activity.last_wallet_activity = new_objects[-1].key
    last_wallet_activity.put()
    return new_objects

Alternatively, I could've stored last_wallet_activity on the User object, but I didn't want to worry about race conditions where a user updates their email and wipes out my new value for last_wallet_activity

Option 3 - Payment Lock

But what about a race condition where 2 jobs are trying to process transactions on the same user at the same time. We could add another object to 'lock' an account.

class UserPaymentLock(ndb.Model):
    lock_time = ndb.DateTimeProperty(auto_now_add=True)

    @classmethod
    @ndb.transactional()
    def lock_user(cls, user_key):
        # UserPaymentLock has the same key as the user it is for
        key = ndb.Key(cls, user_key.id())
        lock = key.get(use_cache=False, use_memcache=False)
        if lock:
            # If the lock is older than a minute, still return False, but delete it
            # There are situations where the instance can crash and a user may never get unlocked
            if datetime.datetime.now() - lock.lock_time > datetime.timedelta(seconds=60):
                lock.key.delete()
            return False
        key.put()
        return True

    @classmethod
    def unlock_user(cls, user_key):
        ndb.Key(cls, user_key.id()).delete()

def process_transactions_3(user, transactions):
    # Attempt to lock the account, abort & try again if already locked 
    if not UserPaymentLock.lock_user(user_key=user.key):
        raise Exception("Unable to acquire payment lock")

    # 1. Get the user's balance from the last WalletActivity
    last_wallet_activity = LastWalletActivity.get_for_user(user_key=user.key)
    new_objects = _process_transactions(user, transactions, last_wallet_activity.last_wallet_activity)

    # update LastWalletActivity
    last_wallet_activity.last_wallet_activity = new_objects[-1].key
    last_wallet_activity.put()

    # unlock the account
    UserPaymentLock.unlock_user(user_key=user.key)
    return new_objects

I thought of trying to wrap up this whole thing in a transaction, but I need to prevent making 2 http to braintree/stripe.

I'm leaning towards option 3, but the system feels increasingly fragile with each new model I introduce.

Alex
  • 5,141
  • 12
  • 26
  • 1
    Contention errors occur if there is more than 1 write op per second into the same entity group. This also applies to entities that have been read from Datastore inside a Datastore transaction with-out writing them back explicitly (my understanding is that they also are written in the background for serialization). Since I don't see particular user-facing requests or time-constraints, if you put all transactions and loading of a user into the same entity group + using Datastore transactions, you could just throttle down the write ops and implement a safe fail and exponential back-off per user. – Ani Feb 13 '18 at 09:05
  • PS: In general, it is easier to design an app in a way to work safely within write limits than writing logic that tries to implement transactional aspects. – Ani Feb 13 '18 at 09:08
  • Putting all of the transactions for a user in one entity-group was my original idea, but I was recommended to not do that (see my original question https://stackoverflow.com/questions/46411694/is-there-an-entity-group-max-size). And datastore's transactions would've worked, but this first thing that would happen in it is an http call to braintree, which the datastore transaction is incapable of rolling back. I want to make sure that part doesnt get duplicated. – Alex Feb 13 '18 at 15:34
  • 1
    I remember your original question and I agree with the answer especially since you mentioned corporate accounts with millions of transactions, assuming they were processed in real-time. However, in your question above it is obvious that hours or even weeks don't matter, so you could just queue and batch them in batches. Per account one Datastore transaction per second, up to 500 financial transactions, equals to 1.8m per hour per account. Even with retries and outages, maybe that should be enough even for enterprise accounts? – Ani Feb 13 '18 at 18:18
  • @Ani, I find the info you're providing quite useful, could you please post it as an answer? Thank you. – Rodrigo C. Feb 20 '18 at 14:40
  • 1
    @RodrigoC. I have posted my info as an answer and also added a few other considerations and suggestions. Hope that was useful. – Ani Feb 21 '18 at 23:56
  • 1
    @Ani, it was indeed. Thank you once again for this and for all your hard work in this community. – Rodrigo C. Feb 22 '18 at 12:42

1 Answers1

2

Design considerations regarding contention

In general, I completely agree with Dan's answer to your original question, although in your particular use-case it might be reasonable to go with large entity-groups.

Contention errors may occur at more than 1 write op per second to the same entity group, e.g. a particular wallet. This limitation also applies to entities that have been read from Datastore inside a Datastore transaction with-out writing them back explicitly (my understanding is that they are written together with the modified entities for serializability).

Although, the 1 second rule is not an enforced limit, and in my experience Cloud Datastore typically can handle short bursts slightly above this limit, there is no guarantee and it generally is recommended and best-practice to avoid large entity groups, especially where the write-ops don't originate from the same user. In contrast, it could be safe to store all comments posted by a user in the same entity-group (author = parent), because it is very unlikely and perhaps even undesirable, that a user could post more than one comment per second. Another example could be background tasks that are not time-sensitive and not user-facing, where the write-ops to an entity group is orchestrated per entity-group, or at least can significantly back off the write-op if contention occurs.

In situations where new entities are added at a suddenly very high rate, contention errors can also be caused by monotonically increasing keys/IDs, or by indexed properties and composite indexes, where the indexed values are too close to each other (e.g. timestamps). It is recommended to let Datastore automatically create IDs of new entities (e.g. the user IDs), because Datastore will spread the keys far enough. And either avoid indexing properties where monotonically increasing values can occur or prefix the value with a hash.

The Cloud Datastore article Best Practices includes a section about Designing for scale that provides very useful advice.

That said, it can be easier to design an app in a way to work safely within write limits and to rely on the transactional and strong consistency support by Datastore (or other database), than to write application logic that tries to imitate transactional aspects. Race conditions, dead-locks, and so much more that can go wrong and make the system more fragile and prone to errors.

(A) Wallet activities in small entitity-groups

In the original question you mentioned corporate accounts and payments, which suggested some realtime payment solution. Scenario: many thousands of users in that corporation could submit a new transaction for a single account, with the very high risk that many could do so at the same time. If every transaction would be to the same entity-group (the corporate account), this could easily lead to contention errors. If you implement retries in your write-ops, those would lead to extended latency until the users get a response to their transaction request. But even with retries, it is likely that write-ops would often fail and users would frequently face server errors. This would make a terrible user experience.

I would tend to option (2), but with a Wallet kind. You expressed concerns where to store last_wallet_activity. It would be possible to have your own Wallet kind, that always has the same ID as the User. In that case, you can have two separate entity groups and don't care for intermediate changes on the User object triggered by the user. I would also use Datastore transactions. It would allow up to 25 different entity-groups in the same transaction, that is maybe 23 events in one batch. With your current design, this would also be your max write rate per wallet. Not sure this is an acceptable limit for your app.

However, if there are really many transactions for a certain wallet (by frequently updating last_wallet_activity) you could again face the risk of contention. To achieve a higher write rate per wallet and avoid such contention you could also combine option (1) with some kind of sharding.

Option (3) tries to implement transactional aspects (see earlier). Option (1) indeed suffers of the fact that the queries are only eventually consistent.

(B) Wallet activities in single large entity-group

However, the question here mentions that all these wallet activities are processed in the background (not by user requests), and that the events are not processed in real-time, but with a delay of hours or weeks. This would allow to process them in batches by background tasks. Assuming your app will stay within the other Cloud Datastore limits, e.g. max size for a transaction of 10 MiB, your app could batch up to 500 wallet activities into a single write-op per second for each corporate wallet (entity-group). This equals to 1.8m wallet activities per hour per corporate account. Even with retries and outages, would this be sufficient for even the largest corporate account? If yes, and if your product never changes to real-time user-facing wallet activities, I don't see why you shouldn't put wallet activities into an entity-group per wallet. Another approach of course could be to just have multiple wallets per corporate account.

In this scenario, your Option (1) should work, because an ancestor query (where the wallet is the ancestor in a WalletActivity query) is strongly consistent.

Use ndb.put_multi()

I've seen that you use multiple put() calls during the same request, where you could perfectly collect the entities in some toPut list or so, and then write all of them together. By doing so, you save instance runtime and also reduce the number of write ops to the same entity-group.

How to avoid duplicate payment requests

Regarding the payment requests to Braintree or Stripe:

  1. Before a wallet activity is added to the next batch that will be written to Datastore, check if the wallet's balance will be sufficient.
  2. If balance wouldn't be sufficient, stop adding more activities.
  3. Inside the Datastore transaction where you write the batch to Datastore, add a transactional push task (that will not be created if the transaction fails). I believe, GAE/NDB accepts up to 5 transactional tasks per HTTP request.
  4. This task would be responsible to send the request to Braintree/Stripe, and update the wallet's balance. And this should also be inside a Datastore transaction, with a transactional task to continue processing the events where it left before.

You'll need to handle cases where Braintree/Stripe rejects a payment request.

I also don't know how your events arrive in the app, so I'm not sure about the best way to orchestrate the batch tasks per wallet. The pattern above suggests that for every wallet there is only one task running (i.e., not multiple tasks/batches in parallel for same wallet). But there are different ways you can do this, depending on the way events arrive in your app.

Ani
  • 1,377
  • 1
  • 18
  • 29
  • 1
    Just a note - contention is not limited to just writing to the same entity group. I thought so as well initially and got burned, had to do some significant re-design to avoid it. See https://stackoverflow.com/questions/32927068/contention-problems-in-google-app-engine/45496048#45496048. – Dan Cornilescu Feb 22 '18 at 08:48
  • Thank you @DanCornilescu (and BTW, also for all your contributions here on SO). Just to clarify your note is different from the serializability part I linked in my second paragraph: Also for transactions that never write back to this group or any other group inside the transaction? For the serializability part I know that as soon as a write happens to any entity-group all the entity-groups of the transaction will get a write, even when unmodified by the transaction. Your answer suggests that the "marking" is causing the contention. Is my understanding correct? – Ani Feb 22 '18 at 09:06
  • I don't know the exact technical details, I just deducted it must be something alongside the marking, but yes, that's what I mean - transactions failing at read operations, they look like this: `suspended generator _get_tasklet(context.py:344) raised TransactionFailedError(too much contention on these datastore entities. please try again. entity group key: ...`, with the key pointing to a (config) entity not written by any of the operations executing at that time. – Dan Cornilescu Feb 22 '18 at 15:55
  • Yes, that was the same error I've got for entities that work as service configs, and were only read inside the transaction, while the writes were happening to other entity-groups. I remember that a Googler explained to me a few years ago that Cloud Datastore uses optimistic concurrency, but I can't remember where I was given the information. However, I've found this older article here: https://cloud.google.com/appengine/articles/scaling/contention#keep-entity-groups-small that makes me believe that the contention doesn't happen by transactions that are entirely read-only. – Ani Feb 22 '18 at 18:47