2

I use CQ v5.16.11 (with openjdk 11) to save data with daily roll cycles. The process runs non-stop from Sun to Fri so I have 5 cq4 files per week. I ran the process for 1.5 weeks and have 8 files (3 for 1st and 5 for 2nd week).

So the files that I have are:

20181003.cq4 cycle=17807,
20181004.cq4 cycle=17808,
20181005.cq4 cycle=17809,
20181007.cq4 cycle=17811,
20181008.cq4 cycle=17812,
20181009.cq4 cycle=17813,
20181010.cq4 cycle=17814,
20181011.cq4 cycle=17815,

Note the missing file for 20181006.cq4 (cycle=17810) as the process does not run on Saturday.

I use this code to read data:

tailer.toEnd();
lastTailerIndex = tailer.index();
tailer.toStart();

while (tailer.index() <= lastTailerIndex) {
    // read data
    if (tailer.readBytes(data) {
     /// do something with data bytes
    }
    if (tailer.index() == lastTailerIndex) {
        break;
    }
}

This correctly reads the 1st week data but does not read the 2nd week data as it does not auto-roll to next cycle.

Any idea why this is happening or how to fix this?

The issue is similar to this which was for an older version

Logs:

2018-10-12 12:41:15,784 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/metadata.cq4t took 19.237 ms.
2018-10-12 12:41:15,876 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181011.cq4 took 0.063 ms.
2018-10-12 12:41:15,881 DEBUG [main] net.openhft.chronicle.queue.impl.single.PretoucherState - /site/data/20181011.cq4 - Reset pretoucher to pos 4835096 as the underlying MappedBytes changed.
2018-10-12 12:41:15,887 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181003.cq4 took 0.065 ms.
2018-10-12 12:41:15,995 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181011.cq4 took 0.082 ms.
2018-10-12 12:41:15,996 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181003.cq4
2018-10-12 12:41:15,997 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181011.cq4
2018-10-12 12:41:16,418 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181004.cq4 took 0.112 ms.
2018-10-12 12:41:16,418 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181003.cq4
2018-10-12 12:41:16,813 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181005.cq4 took 0.084 ms.
2018-10-12 12:41:16,813 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181004.cq4

[Edit 1]: The same thing happened on this last weekend, i.e., as expected no new file for Oct 13. Now I have files from Oct 7th to Oct 15th (with missing Oct 13th file). If I do tailer.toStart(); while(tailer.readBytes() { ...} it only reads files from Oct7th till Oct 12th and does not read Oct 14th and 15th.

[Edit 2]: Replicated the issue as below Chronicle-Queue/issues/537

  1. Setup / Libs: jvm openjdk 11, Ubuntu 16.04, openhft.affinity/3.1.9, chronicle-map/3.16.0, chronicle-queue/5.16.11, chronicle-bytes/1.16.23, chronicle-core/1.16.20, chronicle-wire/1.16.16, chronicle-threads/1.16.3, jna/4.4.0
  2. Steps:
    • Start WriterProcess - let it finish.
    • Start ReaderProcess - see the 5 print statements.
    • Stop ReaderProcess
    • Wait for some time - 10 mins.
    • Start WriterProcess again - let it finish or keep running this process.
    • Start ReaderProcess - it prints only the first 5 print statements and nothing prints after this. Even if the WriterProcess is running/writing to queue the tailer in this process does not move forward.

public class WriterProcess {
        public static void main(String[] args) throws InterruptedException {
            final String dir = "/tmp/demo/";
            final LocalTime localTime = LocalTime.of(17, 0);
            final ZoneId zoneID = ZoneId.of("America/New_York");
            final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
            final SingleChronicleQueue queue  = SingleChronicleQueueBuilder.binary(dir)
                    .blockSize((long) Math.pow(2, 23))
                    .rollCycle(RollCycles.MINUTELY)
                    .rollTime(localTime, zoneID)
                    .build();
            final ExcerptAppender appender = queue.acquireAppender();
            // pre touch
            scheduledExecutorService.scheduleAtFixedRate(appender::pretouch,0,30, TimeUnit.SECONDS);
            // write data
            System.out.println("writing data ...");
            writeData(appender, 5);
            // close queue
            System.out.println("shutting down now ...");
            queue.close();
            scheduledExecutorService.shutdown();
            scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS);
        }
        public static void writeData(ExcerptAppender appender, int count) {
            int ctr = 0;
            String dateStr;
            Date date = new Date();
            while (true) {
                dateStr = date.toString();
                appender.writeText("["+ctr+"] Written " + dateStr);
                System.out.println("["+ctr+"] Written " + dateStr);
                ctr++;
                if (ctr >= count) {
                    break;
                }
                try {
                    Thread.sleep(65_000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public class ReaderProcess {
        public static void main(String[] args) {
            final String dir = "/tmp/demo/";
            final LocalTime localTime = LocalTime.of(17, 0);
            final ZoneId zoneID = ZoneId.of("America/New_York");
            final SingleChronicleQueue queue  = SingleChronicleQueueBuilder.binary(dir)
                    .blockSize((long) Math.pow(2, 23))
                    .rollCycle(RollCycles.MINUTELY)
                    .rollTime(localTime, zoneID)
                    .build();
            final ExcerptTailer tailer = queue.createTailer();
            tailer.toStart();
            // read data
            System.out.println("reading data ...");
            readData(tailer, 25);
            // close
            System.out.println("shutting down now ...");
            queue.close();
        }
        public static void readData(ExcerptTailer tailer, int count) {
            int ctr = 0;
            Bytes data = Bytes.allocateDirect(new byte[500]);
            while (true) {
                if (tailer.readBytes(data)) {
                    System.out.println("["+ctr+"] Read {"+ data + "}");
                    ctr++;
                    if (ctr >= count) {
                        break;
                    }
                }
            }
        }
    }
Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
Sam
  • 61
  • 6
  • Why not use `tailer.toStart(); while(tailer.readBytes(data)) { /// do something with data bytes }` – Peter Lawrey Oct 13 '18 at 15:35
  • It doesn't create a file is nothing was written on that day. If the queue failed to roll on write, this wouldn't pevent the data being read. – Peter Lawrey Oct 13 '18 at 15:36
  • @PeterLawrey I want to read to a specific point in time in the queue, then do some other stuff and start reading from the lastTailerIndex again. This is the use case mentioned in the wiki to recover processing a queue after a process is stopped/restarted. Also, tried your suggestion `tailer.toStart(); while(tailer.readBytes(data)) { /// do something with data bytes }` and got same error. – Sam Oct 15 '18 at 16:05
  • That's what the queue does by default so it's not clear to me what the extra code is doing. – Peter Lawrey Oct 15 '18 at 17:36
  • We have tests for this specific situation so are you able to create a test which reproduces the problem? – Peter Lawrey Oct 15 '18 at 17:37
  • Yes - this test ran ok on my machine (net.openhft.chronicle.queue.impl.single.SingleChronicleQueueTest#testReadingWritingWhenCycleIsSkipped). My only ques now would be is to mimic my prod setup: process runs from Sun17hrs to Fri17hrs with daily roll cycle, data being stored every minute, pre-touch every 30 secs touch on different thread, one process writes and separate process reads the queue. – Sam Oct 15 '18 at 19:43
  • @PeterLawrey If I do a simple `tailer.toStart(); while(tailer.readBytes(data)) { // print data }` even then the while-loop does not print after the last entry in Fri queue and never reads the Sunday queue file. I ran another test in qa where the queue Writer process does not stop/start on weekend. Because of pre-touch it created a file for Sat (which is expected) and then the above code works ok, i.e., I can read all the data correctly. I think the process stop (Fri) / start (Sun) maybe affecting the state of the queue. I do `close` all queues before the process exits. – Sam Oct 15 '18 at 19:53
  • 2
    @PeterLawrey I have replicated the issue. See [Edit 2] and ref code in the main question. – Sam Oct 15 '18 at 21:23
  • 1
    I just want to add that i encountered the same problem and noticed that it is possible to reach the index at which it's not possible to move foreward by iterating backwards from the end of the queue. So a workaround, until this issue is solved, may be to detect the situation by comparing the cycle or index id at the end of the queue with the current tailers position when `documentContext.isPresent()` returns `false` and then find the next index after the broken entry by iterating backward from the end and move the tailer to that index with `moveToIndex()`. – teano Jan 11 '19 at 11:06

1 Answers1

1

I have written a slightly simpler version which works with chronicle-bom 2.17 and the versions it uses. The biggest change I made was to clear the Bytes data before reading otherwise it only appends so as to not overwrite anything.

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;

import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class WriterProcess {
    static final String dir = OS.TMP + "/demo-" + System.nanoTime() + "/";

    public static void main(String[] args) throws InterruptedException {
        final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
        final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(dir)
                .testBlockSize()
                .rollCycle(RollCycles.TEST_SECONDLY)
                .build();
        final ExcerptAppender appender = queue.acquireAppender();
        // pre touch
        scheduledExecutorService.scheduleAtFixedRate(appender::pretouch, 3, 30, TimeUnit.SECONDS);
        new Thread(ReaderProcess::main).start();
        // write data
        System.out.println("writing data ...");
        writeData(appender, 100);
        // close queue
        System.out.println("shutting down now ...");
        queue.close();
        scheduledExecutorService.shutdown();
        scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS);
    }

    public static void writeData(ExcerptAppender appender, int count) {
        int ctr = 0;
        while (true) {
            LocalDateTime date = LocalDateTime.now();
            appender.writeText("[" + ctr + "] Written " + date);
            System.out.println("[" + ctr + "] Written " + date);
            ctr++;
            if (ctr >= count) {
                break;
            }
            try {
                Thread.sleep(2_200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class ReaderProcess {
    public static void main(String... args) {
        final String dir = WriterProcess.dir;
        final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(dir)
                .testBlockSize()
                .rollCycle(RollCycles.TEST_SECONDLY)
                .build();
        final ExcerptTailer tailer = queue.createTailer();
        tailer.toStart();
        // read data
        System.out.println("reading data ...");
        readData(tailer, 100);
        // close
        System.out.println("shutting down now ...");
        queue.close();
    }

    public static void readData(ExcerptTailer tailer, int count) {
        int ctr = 0;
        Bytes data = Bytes.allocateDirect(64);
        while (true) {
            data.clear();
            if (tailer.readBytes(data)) {
                System.out.println("[" + ctr + "] Read {" + data + "}");
                ctr++;
                if (ctr >= count) {
                    break;
                }
            }
        }
    }
}
Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
  • If this is a long running process, how would I go about polling for new messages and read them as they come in, real time. Any caveat if using index-of-last-read-message minus current-index-in-file – z atef May 15 '21 at 13:44
  • Reading same files in my case. – z atef May 15 '21 at 13:46
  • @zatef You can poll this in a busy loop or pausing whenever there is no messages e.g. use LongPauser to control this. It doesn't matter how long running the process is. – Peter Lawrey May 16 '21 at 14:07
  • busy loop is something like while( trailer.nextIndex)){....}. Will this keep the file open for read through the day? – z atef May 17 '21 at 14:48
  • @zatef it will however `while(true) { try (DocumentContext dc = tailer.readingDocument()) { if (dc.isPresent()) { /* read message */ } else { /* no message */ } }` – Peter Lawrey May 18 '21 at 08:12