Essentially, what is the best way to calculate a rolling balance with Google Cloud Datastore to determine when it is time to replenish the user's wallet?
Payments & Transactions
I maintain a payment platform where our users can pay for various items from third party agencies. Unfortunately the nature of my industry is that these payment events are not sent to us in real-time, they get bundled up into a batch and sent us a few hours to a few weeks later.
These are the primary objects that would effect a user's wallet balance:
class Transaction(ndb.model):
user = ndb.KeyProperty(User, required=True)
amount = ndb.FloatProperty(required=True)
# ... other fields
class Payment(ndb.model):
user = ndb.KeyProperty(User, required=True)
amount = ndb.FloatProperty(required=True)
# ... other fields
@classmethod
def charge(cls, user, amount):
# ... make a call to braintree/stripe & save result if successful
(not shown are Refunds, 'Store Credit', Adjustments, etc)
The Wallet
However a large percentage of the transaction amounts are < $1. Since we have to pass the cost of credit-card processing on to the user, our user's maintain wallets with us to minimize these fees.
They can load up $10-$200, Transactions deduct from that balance, and when their balance is low (less than $2), we charge their card to replenish their account.
This is how I envision the model for wallet activity working
class WalletActivity(ndb.Model):
user = ndb.KeyProperty(User, required=True)
post_date = ndb.DateTimeProperty(required=True)
balance_increment = ndb.FloatProperty(required=True)
balance_result = ndb.FloatProperty(required=True)
# the key to the Transaction or Payment object that this is for
object_key = ndb.KeyProperty(required=True)
@classmethod
def create(cls, obj, previous_balance):
return WalletActivity(
user_key=obj.user,
post_date=datetime.datetime.now(),
balance_increment=obj.amount,
balance_result=previous_balance+obj.amount,
object_key=obj.key)
@classmethod
def fetch_last_wallet_activity(cls, user_key):
return cls.query(cls.user == user_key).order(-cls.post_date).get()
Calculating Balance
To determine balance, the 2 ends of the spectrum seem to be:
- calculating on the fly, sum an account's entire wallet history
- storing a precomputed value (
WalletActivity.fetch_last_wallet_activity().balance_result
)
The right answer here sounds like a combination of the 2. Store some kind of BalanceUpdate / WalletDaySummary object at the end of each day on each account. You then only sum up today's activity and add it yesterday's BalanceUpdate. https://stackoverflow.com/a/4376221/4458510
class BalanceUpdate(ndb.model):
user = ndb.KeyProperty(User)
cut_off_date = ndb.DateTimeProperty()
balance = ndb.IntegerProperty()
@classmethod
def current_balance(cls, user_key):
last_balance_update = cls.query(cls.user == user_key).order(
-cls.cut_off_date).get()
recent_wallet_activity = WalletActivity.query(cls.user == user_key,
cls.post_date > last_balance_update.cut_off_date).fetch()
return (last_balance_update.balance +
sum([i.balance_increment for i in recent_wallet_activity]))
However this may not work for a corporate account that generates a lot of transactions in a single day.
It may be better to use the balance_result
of the most recent WalletActivity
How to Process Transactions
Option 1
To process a batch of transactions we would
- Get the user's balance
- Replenish their account if the existing balance is low
- Add the transactions to their wallet
Code:
def _process_transactions(user, transactions, last_wallet_activity):
transactions_amount = sum([i.amount for i in transactions])
# 2. Replenish their account if the existing balance is low
if last_wallet_activity.balance_result - transactions_amount < user.wallet_bottom_threshold:
payment = Payment.charge(
user=user,
amount=user.wallet_replenish_amount + transactions_amount)
payment.put()
last_wallet_activity = WalletActivity.create(
obj=payment,
previous_balance=last_wallet_activity.balance_result)
last_wallet_activity.put()
# 3. Add the transactions to their wallet
new_objects = []
for transaction in transactions:
last_wallet_activity = WalletActivity.create(
obj=transaction,
previous_balance=last_wallet_activity.balance_result)
new_objects.append(last_wallet_activity)
ndb.put_multi(new_objects)
return new_objects
def process_transactions_1(user, transactions):
# 1. Get the user's balance from the last WalletActivity
last_wallet_activity = WalletActivity.fetch_last_wallet_activity(user_key=user.key)
return _process_transactions(user, transactions, last_wallet_activity)
An issue with both WalletActivity.fetch_last_wallet_activity().balance_result
and
BalanceUpdate.current_balance()
is that datastore queries are eventually consistent.
I thought about using entity-groups & ancestor queries, however it sounds like you'd face contention errors:
Option 2 - Fetch Last WalletActivity by key
We can track the key of the last WalletActivity
since fetching by key is strongly consistent:
class LastWalletActivity(ndb.Model):
last_wallet_activity = ndb.KeyProperty(WalletActivity, required=True)
@classmethod
def get_for_user(cls, user_key):
# LastWalletActivity has the same key as the user it is for
return ndb.Key(cls, user_key.id()).get(use_cache=False, use_memcache=False)
def process_transactions_2(user, transactions):
# 1. Get the user's balance from the last WalletActivity
last_wallet_activity = LastWalletActivity.get_for_user(user_key=user.key)
new_objects = _process_transactions(user, transactions, last_wallet_activity.last_wallet_activity)
# update LastWalletActivity
last_wallet_activity.last_wallet_activity = new_objects[-1].key
last_wallet_activity.put()
return new_objects
Alternatively, I could've stored last_wallet_activity
on the User
object, but I didn't want to worry about race conditions
where a user updates their email and wipes out my new value for last_wallet_activity
Option 3 - Payment Lock
But what about a race condition where 2 jobs are trying to process transactions on the same user at the same time. We could add another object to 'lock' an account.
class UserPaymentLock(ndb.Model):
lock_time = ndb.DateTimeProperty(auto_now_add=True)
@classmethod
@ndb.transactional()
def lock_user(cls, user_key):
# UserPaymentLock has the same key as the user it is for
key = ndb.Key(cls, user_key.id())
lock = key.get(use_cache=False, use_memcache=False)
if lock:
# If the lock is older than a minute, still return False, but delete it
# There are situations where the instance can crash and a user may never get unlocked
if datetime.datetime.now() - lock.lock_time > datetime.timedelta(seconds=60):
lock.key.delete()
return False
key.put()
return True
@classmethod
def unlock_user(cls, user_key):
ndb.Key(cls, user_key.id()).delete()
def process_transactions_3(user, transactions):
# Attempt to lock the account, abort & try again if already locked
if not UserPaymentLock.lock_user(user_key=user.key):
raise Exception("Unable to acquire payment lock")
# 1. Get the user's balance from the last WalletActivity
last_wallet_activity = LastWalletActivity.get_for_user(user_key=user.key)
new_objects = _process_transactions(user, transactions, last_wallet_activity.last_wallet_activity)
# update LastWalletActivity
last_wallet_activity.last_wallet_activity = new_objects[-1].key
last_wallet_activity.put()
# unlock the account
UserPaymentLock.unlock_user(user_key=user.key)
return new_objects
I thought of trying to wrap up this whole thing in a transaction, but I need to prevent making 2 http to braintree/stripe.
I'm leaning towards option 3, but the system feels increasingly fragile with each new model I introduce.