0

I have the following code to read from Chronicle queue (it's written in Kotlin but that does not matter):

val queue = ChronicleQueueBuilder.single(path).build()
val tailer = queue.createTailer()

tailer.toEnd()

// // This code is wrong
//    val lastIndex = tailer.index()
//
//    val shift = lastIndex - 10
//    if (shift > 0) {
//        tailer.moveToIndex(lastIndex)
//    }

while (true) {
    val text = await(tailer)

    if (prefix == null) {
        println(text)
    } else {
        if (text.startsWith(prefix)) {
            // Would be nice without additional allocation ...
            println(text.substring(prefix.length + 1))
        }
    }
}

How can I modify the commented code to read previous 10 records from the queue and continue on?

Rationale: It is useful in situations where the queue is used for displaying logs. You want to see a few previous logging statements and see new logging statements as they come.

MartyIX
  • 27,828
  • 29
  • 136
  • 207
  • what is the issue with the commented code.? Usually it should move back. – Devas Jun 27 '17 at 16:47
  • I'll test again. It threw some exception I think. I'm not sure if it can work as I don't know what exactly the index is. Is it pointer to binary data? Is it logical index of my records? I will look at project's tests to better understand this. – MartyIX Jun 27 '17 at 18:41
  • I'm not a committer, but in my experience the index increments for each new document and you will get the latest 10 documents by just subtracting the index by 10 and read from there. Also the index should contain the pointer and the cycle count. – Devas Jun 28 '17 at 04:22

2 Answers2

2

I have written a test for you. Please run the same it should work.

public class ChronicleTest {

private String chroniclePath = "/tmp/chronicle-test";

private int msgCount = 10;

private int i = 0;

    @Test
    public void writeToQ() {
        ChronicleQueue queue = ChronicleQueueBuilder.single(chroniclePath).build();
        ExcerptAppender appender = queue.acquireAppender();
        for (i = 1; i <= msgCount; i++) {
            appender.writeBytes(b -> {
                b.writeInt(i);
            });
        }
        ExcerptTailer tailer = queue.createTailer();
        tailer.toEnd();
        long lastIndex = tailer.index();
        tailer.moveToIndex(lastIndex - 5);

        while (tailer.readBytes(b -> {
            int value = b.readInt();
            System.out.println("Received:" + value);
        }))
            System.out.println("Completed");
    }
}
Devas
  • 1,544
  • 4
  • 23
  • 28
0

In addition to directly using the index, you could use the direction property of the ExcerptTailer:

    final SingleChronicleQueue queue = createQueue();

    final int totalRecords = 20;
    final int tailRecords = 10;

    final ExcerptAppender appender = queue.acquireAppender();
    for (int i = 0; i < totalRecords; i++) {
        try(final DocumentContext ctx = appender.writingDocument()) {
            ctx.wire().writeText(Integer.toString(i));
        }
    }

    final ExcerptTailer tailer = queue.createTailer();
    tailer.direction(TailerDirection.BACKWARD).toEnd();

    int rewind = tailRecords;
    final int endCycle = tailer.cycle();
    while(--rewind != 0) {
        try(final DocumentContext ctx = tailer.readingDocument()) {
            if (!ctx.isPresent()) {
                break;
            }

            if (endCycle != tailer.cycle()) {
                System.out.println("Rewound past beginning of cycle");
            }
        }
    }

    tailer.direction(TailerDirection.FORWARD);

    for (int i = 0; i < tailRecords; i++) {
        try(final DocumentContext ctx = tailer.readingDocument()) {
            if (!ctx.isPresent()) {
                break;
            }

            System.out.println(ctx.wire().readText());
        }
    }
Mark Price
  • 296
  • 1
  • 3