Thanks to all the answerers who mentioned Collectors.groupingBy()
. This was key to setting up a stream where I could use reduce()
. I had erroneously believed I should be able to use reduce
on its own to solve the problem, without groupingBy
.
Thanks also to the suggestion to create a fluent API. I added IncomingFlatItem.getEmailAddress()
and IncomingFlatItem.getReport()
to fluently grab the domain objects from IncomingFlatItem
- and also a method to convert the whole flat item to a proper domain object with its email and report nested already:
public Client getClient() {
Client client = new Client();
client.setClientCode(clientCode);
client.setClientName(clientName);
client.setEmailAddresses(new ArrayList());
client.getEmailAddresses().add(this.getEmailAddress());
client.setReports(new ArrayList<>());
client.getReports().add(this.getReport());
return client;
}
I also created business ID-based .equals()
and .hashCode()
methods on Client
, EmailAddress
and Report
as recommended by @SamuelPhilip
Lastly for the domain objects, I created .addReport(Report r)
and .addEmail(EmailAddress e)
on my Client
class, which would add the child object to Client
if not already present. I ditched the Set
collection type for List
because the domain model standard is List
and Sets
would have meant lots of conversions to Lists
.
So with that, the stream code and lambdas look succinct.
There are 3 steps:
- map
IncomingFlatItems
to Clients
- group the
Clients
into a map by client (relying heavily on Client.equals()
)
- reduce each group to one
Client
So this is the functional algorithm:
List<Client> unflatten(List<IncomingFlatItem> flatItems) {
return flatItems.parallelStream()
.map(IncomingFlatItem::getClient)
.collect(Collectors.groupingByConcurrent(client -> client))
.entrySet().parallelStream()
.map(kvp -> kvp.getValue()
.stream()
.reduce(new Client(),
(client1, client2) -> {
client1.getReports()
.forEach(client2::addReport);
client1.getEmailAddresses()
.forEach(client2::addEmail);
return client2;
}))
.collect(Collectors.toList());
}
I took a long time due to going off on a tangent before I really understood reduce
- I found a solution which passed my tests while using .stream()
but totally failed with .parallelStream()
hence its usage here. I had to use CopyOnWriteArrayList
as well otherwise it would fall over randomly with ConcurrentModificationExceptions