3

I have a large Java 8 Stream (Stream<MyObject>) with objects that looks like this:

class MyObject {
   private String string;
   private Date timestamp;

   // Getters and setter removed from brevity 
}

I know that all timestamps for day 1 will arrive before those in day 2 but within each day the timestamps could be out of order. I'd like to sort the MyObject's in timestamp order on a per daily basis using the Stream API. Since the Stream is large I have to do this as lazily as possible, i.e. it would be OK to hold one days worth of MyObject's in memory but it would not be OK to hold much more than that.

How can I achieve this?

Update 2017-04-29:

A requirement is that I want to continue working on the same stream after the sorting! I'd like something like this (pseudo code):

Stream<MyObject> sortedStream = myStreamUnsorted().sort(onADailyBasis());
Johan
  • 37,479
  • 32
  • 149
  • 237
  • is it the question more about scheduling (when to do) or processing (how to do)? are you using the Spring stack? – Andrew Tobilko Apr 23 '17 at 12:48
  • I think most likely you'll have to work with the Stream's iterator in order to group the elements day by day before sorting. I don't think the Stream API can help you much with this kind of requirement. – Didier L Apr 23 '17 at 12:53
  • @AndrewTobilko It's about processing and I'm not using Spring. – Johan Apr 23 '17 at 12:56
  • What is the smallest TimeUnit between your timestamps? Are we talking about seconds or even less? –  Apr 23 '17 at 13:00
  • 1
    I'm afraid that java streams are not well suited for sorting - especially if there is a lot of elements. – Konstantin Pribluda Apr 23 '17 at 13:03
  • @DiabolicWords it's millisecond resolution – Johan Apr 23 '17 at 13:28
  • `Stream is large I have to do this as lazily as possible` Java 8 Streams are lazy by nature, you can construct a complexly long pipeline and it won't be processed until a terminal operation is invoked. Also, streams are not data structures so use them to store your items don't make sense – Trash Can Apr 23 '17 at 19:48
  • @Dummy Sure but given that the there are built-in collectors to do "group by" (which I would count as a part of the Stream API, but this is where I might be wrong?) I would have guessed that something like this might be possible by making use other data structures. – Johan Apr 24 '17 at 05:00
  • `Collectors.groupingBy()` and every other static methods in side that class is part of the Java 8 Stream package APIs. And these static methods return implementations of the `Collector` interface which **collects** stream elements into a result container, so they are called as part of a terminal operation, specifically the `collect` stream method. Also, as a optimization step, instead of making `timestamp` property have `Date` type, use primitive type `long` to store the miliseconds since epoch because `Date` objects ared compared using the same logic (mili since epoch) – Trash Can Apr 24 '17 at 05:13

3 Answers3

2

I'd suggest the following solution:

Store each value of your stream in a TreeMap to get it immediately sorted. As key use the object's timestamp.

 Map<Date, MyObject> objectsOfTheDaySorted = new TreeMap<>();

We need to know which object has to be removed from the map at the end. It'll be only one object but the member to store it in has to be (effectively) final. So I chose a plain list.

 List<MyObject> lastObject = new ArrayList<>();

Set the current day as integer.

 // just an example
 int currentDay = 23;

Use a predicate that determins whether the currentDay and the day of any passing by object don't match.

 Predicate<MyObject> predicate = myObject -> myObject.getTimestamp()
                    .toInstant()
                    .atZone(ZoneId.systemDefault())
                    .toLocalDate()
                    .getDayOfMonth() != currentDay;

Now stream your stream. Use peek() twice. First to put the object into the map. Second to overwrite the object in the list. Use anyMatch() as terminal operation and hand in the formerly created predicate. As soon as the first object appears that matches the criteria beeing from the next day, anyMatch() terminates the stream and returns true.

 stream.peek(myObject -> objectsOfTheDaySorted.put(myObject.getTimestamp(), myObject))
       .peek(myObject -> lastObject.set(0, myObject))
       .anyMatch(predicate);

Now you only have to remove the last passing by object which belongs already to the next day and therefore not to your map.

 objectsOfTheDaySorted.remove(lastObject.get(0).getTimestamp());

Done. You have a sorted Map of Objects that all belong to just one day. Hope this matches your expectations. Please find below the entire code in one block to get it better copied at once.

 Map<Date, MyObject> objectsOfTheDaySorted = new TreeMap<>();
 List<MyObject> lastObject = new ArrayList<>();

 // just an example
 int currentDay = 23;

 Predicate<MyObject> predicate = myObject -> myObject.getTimestamp()
                    .toInstant()
                    .atZone(ZoneId.systemDefault())
                    .toLocalDate()
                    .getDayOfMonth() != currentDay;

 stream.peek(myObject -> objectsOfTheDaySorted.put(myObject.getTimestamp(), myObject))
       .peek(myObject -> lastObject.set(0, myObject))
       .anyMatch(predicate);

 objectsOfTheDaySorted.remove(lastObject.get(0).getTimestamp());
  • You might want to use `set(0, myObject)` instead of `add`. Otherwise, the `lastObject` list gets quite large. Alternatively, make `lastObject` an array of length 1 (`lastObject[0] = myObject`). – Malte Hartwig Apr 24 '17 at 12:35
  • @Malte Hartwig: Thanks for this hint. I've totally overlooked this although I actually intended to use the set()-method. I've re-edited my post. –  Apr 24 '17 at 12:40
  • This will not work if the streams starts with objects from before the specified day, as the predicate will match the first one and terminate the stream already. Furthermore, you use dayOfMonth, which causes errors if the stream contains objects from more than 30 days. – Malte Hartwig Apr 24 '17 at 12:41
  • Well this is a fact I knew before. I only focussed on the description above which says that the whole stream starts with one day, delivers all the objects of this day and somewhen reaches the next day. There is truely space for optimization. –  Apr 24 '17 at 12:44
  • Ah, I see what you mean. Anyway, the use of peek is nice. Have not had the chance to use it myself in the real world yet. – Malte Hartwig Apr 24 '17 at 12:50
  • 1
    @MalteHartwig That's because “_[This method](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#peek-java.util.function.Consumer-) exists mainly to support debugging, where you want to see the elements as they flow past a certain point in a pipeline_”. See also [is peek really only for debugging?](http://stackoverflow.com/questions/33635717/in-java-streams-is-peek-really-only-for-debugging) – Didier L Apr 25 '17 at 09:26
2

It depends whether you need to process the objects of all days or one specific day.

Building on DiabolicWords's answer, this is an example to process all days:

TreeSet<MyObject> currentDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp));
LocalDate[] currentDay = new LocalDate[1];
incoming.peek(o -> {
    LocalDate date = o.getTimestamp().toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
    if (!date.equals(currentDay[0]))
    {
        if (currentDay != null)
        {
            processOneDaysObjects(currentDaysObjects);
            currentDaysObjects.clear();
        }
        currentDay[0] = date;
    }
}).forEach(currentDaysObjects::add);

This will collect the objects for one day, process them, reset the collection and continue with the next day.

If you only want one specific day:

TreeSet<MyObject> currentDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp));
LocalDate specificDay = LocalDate.now();
incoming.filter(o -> !o.getTimestamp()
                       .toInstant()
                       .atZone(ZoneId.systemDefault())
                       .toLocalDate()
                       .isBefore(specificDay))
        .peek(o -> currentDaysObjects.add(o))
        .anyMatch(o -> {
            if (o.getTimestamp().toInstant().atZone(ZoneId.systemDefault()).toLocalDate().isAfter(specificDay))
            {
                currentDaysObjects.remove(o);
                return true;
            }
            return false;
        });

The filter will skip objects from before the specificDay, and the anyMatch will terminate the stream after the specificDay.

I have read that there will be methods like skipWhile or takeWhile on streams with Java 9. These would make this a lot easier.

Edit after Op specified goal more in detail

Wow, this is a nice exercise, and quite a tough nut to crack. The problem is that an obvious solution (collecting the stream) always goes through the whole stream. You cannot take the next x elements, order them, stream them, then repeat without doing it for the whole stream (i.e. all days) at once. For the same reason, calling sorted() on a stream will go through it completely (especially as the stream does not know the fact that the elements are sorted by days already). For reference, read this comment here: https://stackoverflow.com/a/27595803/7653073.

As they recommend, here is an Iterator implementation wrapped in a stream that kind of looks ahead in the original stream, takes the elements of one day, sorts them, and gives you the whole thing in a nice new stream (without keeping all days in memory!). The implementation is more complicated as we do not have a fixed chunk size, but always have to find the first element of the next next day to know when to stop.

public class DayByDayIterator implements Iterator<MyObject>
{
    private Iterator<MyObject> incoming;
    private MyObject next;

    private Iterator<MyObject> currentDay;

    private MyObject firstOfNextDay;
    private Set<MyObject> nextDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp));

    public static Stream<MyObject> streamOf(Stream<MyObject> incoming)
    {
        Iterable<MyObject> iterable = () -> new DayByDayIterator(incoming);
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    private DayByDayIterator(Stream<MyObject> stream)
    {
        this.incoming = stream.iterator();
        firstOfNextDay = incoming.next();
        nextDaysObjects.add(firstOfNextDay);
        next();
    }

    @Override
    public boolean hasNext()
    {
        return next != null;
    }

    @Override
    public MyObject next()
    {
        if (currentDay == null || !currentDay.hasNext() && incoming.hasNext())
        {
            nextDay();
        }

        MyObject result = next;

        if (currentDay != null && currentDay.hasNext())
        {
            this.next = currentDay.next();
        }
        else
        {
            this.next = null;
        }

        return result;
    }

    private void nextDay()
    {
        while (incoming.hasNext()
                && firstOfNextDay.getTimestamp().toLocalDate()
                .isEqual((firstOfNextDay = incoming.next()).getTimestamp().toLocalDate()))
        {
            nextDaysObjects.add(firstOfNextDay);
        }
        currentDay = nextDaysObjects.iterator();

        if (incoming.hasNext())
        {
            nextDaysObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp));
            nextDaysObjects.add(firstOfNextDay);
        }
    }
}

Use it like this:

public static void main(String[] args)
{
    Stream<MyObject> stream = Stream.of(
            new MyObject(LocalDateTime.now().plusHours(1)),
            new MyObject(LocalDateTime.now()),
            new MyObject(LocalDateTime.now().plusDays(1).plusHours(2)),
            new MyObject(LocalDateTime.now().plusDays(1)),
            new MyObject(LocalDateTime.now().plusDays(1).plusHours(1)),
            new MyObject(LocalDateTime.now().plusDays(2)),
            new MyObject(LocalDateTime.now().plusDays(2).plusHours(1)));

    DayByDayIterator.streamOf(stream).forEach(System.out::println);
}

------------------- Output -----------------

2017-04-30T17:39:46.353
2017-04-30T18:39:46.333
2017-05-01T17:39:46.353
2017-05-01T18:39:46.353
2017-05-01T19:39:46.353
2017-05-02T17:39:46.353
2017-05-02T18:39:46.353

Explanation: currentDay and next are the basis for the iterator, while firstOfNextDay and nextDaysObjects already look at the first element of the next day. When currentDay is exhausted, nextDay() is called and continues adding incoming's element to nextDaysObjects until the next next day is reached, then turns nextDaysObjects into currentDay.

One thing: If the incoming stream is null or empty, it will fail. You can test for null, but the empty case requires to catch an Exception in the factory method. I did not want to add this for readability.

I hope this is what you need, let me know how it goes.

Community
  • 1
  • 1
Malte Hartwig
  • 4,477
  • 2
  • 14
  • 30
  • My problem is that I'd like to continue working with same stream afterwards which I'm not quite sure how to achieve with your suggested solution. I've updated the question to mention this in a clearer way. – Johan Apr 29 '17 at 07:34
  • @Johan I have added an alternative solution. It basically uses Iterator instead of Stream, but I added a factory util method that wraaps that iterator into a stream again. Use it as follows: `Stream sortedStream = DayByDayIterator.streamOf(unsortedStream)` – Malte Hartwig Apr 30 '17 at 16:04
1

If you consider an iterative approach, I think it becomes much simpler:

TreeSet<MyObject> currentDayObjects = new TreeSet<>(Comparator.comparing(MyObject::getTimestamp));
LocalDate currentDay = null;
for (MyObject m: stream::iterator) {
    LocalDate objectDay = m.getTimestamp().toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
    if (currentDay == null) {
        currentDay = objectDay;
    } else if (!currentDay.equals(objectDay)) {
        // process a whole day of objects at once
        process(currentDayObjects);
        currentDay = objectDay;
        currentDayObjects.clear();
    }
    currentDayObjects.add(m);
}
// process the data of the last day
process(currentDayObjects);
Didier L
  • 18,905
  • 10
  • 61
  • 103