3

Context/Scenario

Let's say we have an immutable object called Transaction, where transaction.getAction() would return a TransactionAction enum which can be DEPOSIT or WITHDRAW, and transaction.getAmount() would return an Integer which specify the amount of money being deposit or withdrawn.

enum TransactionAction {
    WITHDRAW,
    DEPOSIT
}

public class Transaction {

    private final TransactionAction action;
    private final int amount;

    public Transaction(TransactionAction action, int amount) {
        this.action = action;
        this.amount = amount;
    }

    public TransactionAction getAction() {
        return action;
    }

    public int getAmount() {
        return amount;
    }

}

Question

We now have a Stream<Transaction> which is a stream filled with Transaction that can either be DEPOSIT or WITHDRAW. We can imagine this Stream<Transaction> as a history of transactions of one particular bank account.

What I am trying to achieve is to get the highest balance the account has ever achieved in most efficient manner (thus using Stream API).


Example

Bob transaction history is:

// balance start at 0
[DEPOSIT]   1200        // balance: 1200
[DEPOSIT]   500         // balance: 1700
[WITHDRAW]  700         // balance: 1000
[DEPOSIT]   300         // balance: 1300
[WITHDRAW]  800         // balance: 500
[WITHDRAW]  500         // balance: 0

Bob's highest balance is 1700.

Naman
  • 27,789
  • 26
  • 218
  • 353
Programer Beginner
  • 1,377
  • 6
  • 21
  • 47
  • A `Stream` would not be a good solution to this problem, as you need to be able to track a running balance while simultaneously tracking the highest-achieved balance. If I were you, I'd just use a for-loop. – Jacob G. Jun 01 '20 at 01:22
  • If you were to really do that in a `Stream` way, a better design would be including a `class Entry{}` consisting of a `balance` per transaction as an audit would be helpful and cleaner to implement. (`max` based on `balance` from the `List`) – Naman Jun 01 '20 at 02:04

5 Answers5

3

What you need is to find the maximum value of a cumulative sum. In pseudo-code, this would be something like:

transactions = [1200, 500, -700, 300, -800, -500]
csum = cumulativeSum(transactions) // should be [1200,1700,1000,1300,500,0]
max(csum) // should be 1700

The imperative way:

The traditional for-loop is well suited for such cases. It should be fairly easy to write and is probably the most efficient alternative both in time and space. It does not require multiple iterations and it does not require extra lists.

int max = 0;
int csum = 0;
for (Transaction t: transactions) {
    int amount = (t.getAction() == TransactionAction.WITHDRAW ? -1 : 1) * t.getAmount();
    csum += amount;
    if (csum > max) max = csum;
}

Diving into functional:

Streams are a functional programming concept and, as such, they are free of side-effects and well suited for stateless operations. Keeping the cumulative state is considered a side-effect, and then we would have to talk about Monads to bring those side-effects under control and... we don't want to go that way.

Java, not being a functional language (although allowing for functional style), cares less about purity. You could simply have a control variable outside the stream to keep track of that external state within the current map or reduce operations. But that would also be giving up everything Streams are meant for.

So let's see how Java's experienced fellows do in this matter. In pure Haskell, the cumulative sum can be achieved with a Scan Left operation:

λ> scanl1 (+) [1200, 500, -700, 300, -800, -500] 
[1200,1700,1000,1300,500,0]

Finding the maximum of this would be as simple as:

λ> maximum ( scanl1 (+) [1200, 500, -700, 300, -800, -500] )
1700

A Java Streams solution:

Java does not have such an idiomatic way of expressing a scan left, but you may achieve a similar result with collect.

transactions.stream()
    .map(t -> (t.getAction() == TransactionAction.WITHDRAW ? -1 : 1) * t.getAmount())
    .collect(ArrayList<Integer>::new, (csum, amount) -> 
        csum.add(csum.size() > 0 ? csum.get(csum.size() - 1) + amount : amount), 
        ArrayList::addAll)
    .stream()
    .max(Integer::compareTo);
// returns Optional[1700]

EDIT: As correctly pointed out in the comments, this accumulator function is not associative and problems would appear if trying to use parallelStream instead of stream.

This can be further simplified. For example, if you enrich your TransactionAction enum with a multiplier (-1 for WITHDRAW and 1 for DEPOSIT), then map could be replaced with:

.map(t -> t.getAction().getMultiplier() * t.getAmount())

EDIT: Yet another approach: Parallel Prefix Sum

Since Java 8, arrays offer a parallelPrefix operation that could be used like:

Integer[] amounts = transactions.stream()
    .map(t -> (t.getAction() == TransactionAction.WITHDRAW ? -1 : 1) * t.getAmount())
    .toArray(Integer[]::new);

Arrays.parallelPrefix(amounts, Integer::sum);

Arrays.stream(amounts).max(Integer::compareTo); 
// returns Optional[1700]

As Streams collect, it also requires an associative function, Integer::sum satisfies that property. The downside is that it requires an array and can't be used with lists. Although the parallelPrefix is very efficient, setting up the array to work with it could not pay off.

Wrapping up:

Again, it's possible to achieve this with Java Streams although it won't be as efficient as a traditional loop both in time and space. But you benefit from the compositionality of streams. As always, it's a trade-off.

rph
  • 2,104
  • 2
  • 13
  • 24
  • 1
    Doesn't the documentation for `collect` require that the accumulator function is associative? Is the accumulator here associative? – MelvinWM Jun 01 '20 at 13:26
  • 1
    Thanks for pointing that out. Forgot to add that this code is not parallelizable because the accumulator is not associative due to the "special first case". It should be enough to use `.parallelStream()` instead of `stream()` to verify that. – rph Jun 01 '20 at 14:03
  • I think your answer is very nice. The only thing I wonder about - can it also be an issue in the non-parallel case? I tried to search through the documentation, but I did not find confirmation on that. – MelvinWM Jun 01 '20 at 14:07
  • It should be ok with non-parallel streams. [Here](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collector.html) it states that "The associativity constraint says that splitting the computation must produce an equivalent result.". [Here](https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html) it talks about "The importance of this to parallel evaluation". It's subtle, but it's there. – rph Jun 01 '20 at 14:29
  • 1
    Added another approach to the problem using `Arrays.parallelPrefix`. Of course, every mentioned approach has its pros and cons and should be evaluated carefully. – rph Jun 01 '20 at 14:31
  • 2
    The `parallelPrefix` solution is very nice. – MelvinWM Jun 01 '20 at 14:37
1

A stream would not help here. Use a list and a for-loop:

List<Transaction> transactions = ...;

int balance = 0;
int max = 0;
for (Transaction transaction : transactions) {
    balance += (transaction.getAction() == TransactionAction.DEPOSIT ? 1 : -1) 
                * transaction.getAmount();
    max = Math.max(max, balance);
}

The problem is that you need to keep track of some state while processing transactions, and you wouldn't be able to do this with streams without introducing complicated or mutable data structures that would make this code bug-prone.

ernest_k
  • 44,416
  • 5
  • 53
  • 99
  • A good solution, though, you can do this with streams if you both require that the stream respects the order of the original transactions and you use the `forEachOrdered`. That solution would look a lot like this answer. See also my answer. – MelvinWM Jun 01 '20 at 10:54
0

Here is another Stream solution:

    AtomicInteger balance = new AtomicInteger(0);
    int highestBalance = transactions
            .stream()
            .mapToInt(transaction -> {
                int amount = transaction.getAmount();
                if (transaction.getAction() == TransactionAction.WITHDRAW) {
                    amount = -amount;
                }
                return balance.accumulateAndGet(amount, Integer::sum);
            })
            .max()
            .orElse(0);
MartinBG
  • 1,500
  • 13
  • 22
  • This requires that `mapToInt` executes in the order of the original transactions, since "current balance" is meaningless if not computed with the original order. See also my answer. – MelvinWM Jun 01 '20 at 10:56
  • @MelvinWM Of course `transactions` should produce an [ordered](https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#Ordering) stream and that's what I assumed from OP's `a history of transactions`. Given that, the code above will produce correct result as `mapToInt` is applied on elements as they come in stream and for `max()` ordering doesn't matter. – MartinBG Jun 01 '20 at 12:55
  • But are you certain that a sequential stream will execute in the given order? Is there anything in the documentation for `map` that requires it to execute according to the order when the stream is ordered and it is sequential? What if there is an implementation of `map` that applies the transformation function from the end to start (and still produces correct results assuming no side-effects)? Would such an implementation obey the interface described in the Javadoc? `forEachOrdered` is guaranteed to execute in order for a stream that is ordered. – MelvinWM Jun 01 '20 at 13:25
  • From https://docs.oracle.com/en/java/javase/14/docs/api/java.base/java/util/stream/package-summary.html#SideEffects : "[...] no guarantees are made as to the order in which the mapper function is applied to individual elements, or in what thread any behavioral parameter is executed for a given element. ". This of course holds for parallel execution, but what about sequential execution? – MelvinWM Jun 01 '20 at 13:29
0

Cumulative Sum of each position can be computed like this:

List<Integer> integers = Arrays.asList(1200, 500, -700, 300, -800, -500);

Stream<Integer[]> cumulativeSum = Stream.iterate(
        new Integer[]{0, integers.get(0)}, 
        p -> new Integer[]{p[0] + 1, p[1] + integers.get(p[0] + 1)}
        )
        .limit(integers.size());

With this you can get the max balance in this way:

Integer[] max = cumulativeSum
        .max(Comparator.comparing(p -> p[1]))
        .get();
System.out.println("Position: " + max[0]);
System.out.println("Value: " + max[1]);

Or with iterator but here is problem that last sum wouldn't be computed:

Stream<Integer> integerStream = Arrays.stream(new Integer[]{
        1200, 500, -700, 300, -800, -500});
Iterator<Integer> iterator = integerStream.iterator();
Integer maxCumulativeSum = Stream.iterate(iterator.next(), p -> p + iterator.next())
        .takeWhile(p -> iterator.hasNext())
        .max(Integer::compareTo).get();
System.out.println(maxCumulativeSum);

Problem is with takeWhile and it may be solved with takeWhileInclusive (from external library).

lczapski
  • 4,026
  • 3
  • 16
  • 32
0

A wrong solution

    // Deposit is positive, withdrawal is negative.
    final Stream<Integer> theOriginalDepositWithdrawals = Stream.of(1200, 500, -700, 300, -800, -500);
    final Stream<Integer> sequentialDepositWithdrawals = theOriginalDepositWithdrawals.sequential();

    final CurrentBalanceMaximumBalance currentMaximumBalance = sequentialDepositWithdrawals.<CurrentBalanceMaximumBalance>reduce(

      // Identity.
      new CurrentBalanceMaximumBalance(0, Integer.MIN_VALUE),

      // Accumulator.
      (currentAccumulation, elementDepositWithdrawal) -> {

        final int newCurrentBalance =
          currentAccumulation.currentBalance +
          elementDepositWithdrawal;

        final int newMaximumBalance = Math.max(
          currentAccumulation.maximumBalance,
          newCurrentBalance
        );

        return new CurrentBalanceMaximumBalance(
          newCurrentBalance,
          newMaximumBalance
        );
      },

      // Combiner.
      (res1, res2) -> {

        final int newCurrentBalance =
          res1.currentBalance +
          res2.currentBalance;

        final int newMaximumBalance = Math.max(
          res1.maximumBalance,
          res2.maximumBalance
        );

        return new CurrentBalanceMaximumBalance(
          newCurrentBalance, newMaximumBalance
        );
      }

    );

    System.out.println("Maximum is: " + currentMaximumBalance.maximumBalance);

Helper class:

class CurrentBalanceMaximumBalance {

  public final int currentBalance;
  public final int maximumBalance;

  public CurrentBalanceMaximumBalance(
    int currentBalance,
    int maximumBalance
  ) {

    this.currentBalance = currentBalance;
    this.maximumBalance = maximumBalance;
  }
}

This is a wrong solution. It might arbitrarily work, but there is no guarantee that it will.

It breaks the interface of reduce. The properties that are broken are associativity for both the accumulator function and the combiner function. It also doesn't require that the stream respects the order of the original transactions.

This makes it possibly dangerous to use, and might well give wrong results depending on what the implementation of reduce happens to be as well as whether the stream respects the original order of the deposits and withdrawals or not.

Using sequential() here is not sufficient, since sequential() is about sequential/parallel execution. An example of a stream that executes sequentially but does not have ordering is a stream created from a HashSet and then have sequential() called on it.

A correct solution

The problem uses the concept of a "current balance", and that is only meaningful when computed from the first transaction and then in order to the end. For instance, if you have the list [-1000, 10, 10, -1000], you cannot start in the middle and then say that the "current balance" was 20 at some point. You must apply the operations reg. "current balance" in the order of the original transactions.

So, one straight-forward solution is to:

  • Require that the stream respects the original order of transactions, with a defined "encounter order".
  • Apply forEachOrdered​().
MelvinWM
  • 749
  • 4
  • 13