5

Hello: I wonder how to write an asynchronous table iterator. Suppose the input table consists of many rows, and when the table is received, it is in serialized format. When the table is received, the iterator is called to retrieve one row by one row.

It performs the reading and deserialization in the following way: 1) It first reads the integer about the size of the row and deserialize it. 2) Then it reads and deserialize the contents of the row, in which, a. timestamp is first ready by calling in.readint(), b. then each key of the row is read and deserialized, c. then the bitmap string about the non-key columns is read and deserialized. d. then calls in.readint() to read and deserialize the integer which represents the number of non-key columns, and then it reads and deserialize each non-key column. 3) Finally it reads and deserializes the file end marker, which indicates if the end of the file is reached.

Finally it returns the deserialized row.

Here is the code

enter code here
public Row next() {
/* It first reads the integer about the size of the row and 
deserialize it. */
int size = in.readInt();         
/*Then it reads and deserialize the contents of the row*/
Row row = Row.deserialize(descriptor, in);

/*Finally it reads and deserializes the file end marker, which 
indicates if the end of the file is reached.*/
int signal = in.readInt();
if (signal == FILE.END) {
    file_end = true;
    return row;
}
return row;
}

public Row deserialize(DataInput in) throws IOException {
/*timestamp is first ready by calling in.readint()*/
long timestamp= in.readLong();

Object[] Key = new Object[KeyColumns().size()];
Map<Column, Object> columns = new HashMap<>();

/*then each key of the row is read and deserialized */
int i = 0;
for (Column<?> col : KeyColumns()) {
    Key[i++] = col.type.deserialize(in);
}

/* then the bitmap string about the non-key columns is read and 
deserialized. */
int bitstring= in.readInt();

/*then calls in.readint() to read and deserialize the integer which
represents the number of non-key columns, and then it reads and 
deserialize each non-key column.*/

i = 0;
for (Column<?> col : rowColumns()) {
    if ((bitstring & (1 << i)) != 0){
    columns.put(col, col.type.deserialize(in));
}
    i++;
    }
    return new Row(timestamp, Key, columns);
}

To convert this iterator into an asynchronous iterator, I am thinking about using CompletableFuture in Java 8 and decoupling the read from deserialization. That is, using a separate thend to handle the reading, like below

public Row next() {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            int size= 0;
            try {
                size = in.readInt();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });

        Row row = Row.deserialize(descriptor, in);
        int signal = in.readInt();

        if (signal == FILE.END) {
            file_end = true;
            return row;
        }
        return row;
}

But it seems to me that because the thread which does “size = in.readInt();” and the main thread which does “Row row = Row.deserialize(descriptor, in);” shares the same stream. They need to happen one after one. Still no parallelism is achieved. Any better way to implement this asynchronous iterator? Thanks.

  • 1
    This is s a good question, but I think this is better for the Code Review stack exchange. – Sam Orozco Jul 04 '18 at 05:40
  • it is a hard question, finds no clues. Is it possible to improve the efficiency here? – Code Knight Jul 04 '18 at 06:47
  • if we can overlap the IO with the deserialization, then efficiency is improved. Is AsynchronousFileChannel and network channel used for this kind of problems? Also there are in.readInt() in next() and deserialize(). How to grouping the reading is a challenge? can we use a queue? – Code Knight Jul 04 '18 at 20:11

1 Answers1

2

First of all, you have a blocking resource (DataInput) at the heart. So no matter what you do, you will have sync on reading the DataInput.

In Java 8 I would definitely implement this with streams. See the following question:

How to implement a Java stream?

The easiest would be to implement a Spliterator and create a stream with it using StreamSupport.stream(...). In a Spliterator you will primarily only need to implement the tryAdvance method which is basically your "read next row" routine. There you'll need to synchronize reading from DataInput.

Once you have your Stream<Row> you will be able to apply different functions to it using map or forEach etc.

To achieve parallelism you'll need to implement the trySplit method in the Spliterator. Here comes the problem: if you can't read from your DataInput in parallel, splitting won't bring you much. But still I think it would make sense creating a new instance of Spliterator for the same DataInput and synchronize them on reading. Reading will not be parallelized, but further processing may be (in a parallel stream).

lexicore
  • 42,748
  • 17
  • 132
  • 221