Hello: I wonder how to write an asynchronous table iterator. Suppose the input table consists of many rows, and when the table is received, it is in serialized format. When the table is received, the iterator is called to retrieve one row by one row.
It performs the reading and deserialization in the following way: 1) It first reads the integer about the size of the row and deserialize it. 2) Then it reads and deserialize the contents of the row, in which, a. timestamp is first ready by calling in.readint(), b. then each key of the row is read and deserialized, c. then the bitmap string about the non-key columns is read and deserialized. d. then calls in.readint() to read and deserialize the integer which represents the number of non-key columns, and then it reads and deserialize each non-key column. 3) Finally it reads and deserializes the file end marker, which indicates if the end of the file is reached.
Finally it returns the deserialized row.
Here is the code
enter code here
public Row next() {
/* It first reads the integer about the size of the row and
deserialize it. */
int size = in.readInt();
/*Then it reads and deserialize the contents of the row*/
Row row = Row.deserialize(descriptor, in);
/*Finally it reads and deserializes the file end marker, which
indicates if the end of the file is reached.*/
int signal = in.readInt();
if (signal == FILE.END) {
file_end = true;
return row;
}
return row;
}
public Row deserialize(DataInput in) throws IOException {
/*timestamp is first ready by calling in.readint()*/
long timestamp= in.readLong();
Object[] Key = new Object[KeyColumns().size()];
Map<Column, Object> columns = new HashMap<>();
/*then each key of the row is read and deserialized */
int i = 0;
for (Column<?> col : KeyColumns()) {
Key[i++] = col.type.deserialize(in);
}
/* then the bitmap string about the non-key columns is read and
deserialized. */
int bitstring= in.readInt();
/*then calls in.readint() to read and deserialize the integer which
represents the number of non-key columns, and then it reads and
deserialize each non-key column.*/
i = 0;
for (Column<?> col : rowColumns()) {
if ((bitstring & (1 << i)) != 0){
columns.put(col, col.type.deserialize(in));
}
i++;
}
return new Row(timestamp, Key, columns);
}
To convert this iterator into an asynchronous iterator, I am thinking about using CompletableFuture in Java 8 and decoupling the read from deserialization. That is, using a separate thend to handle the reading, like below
public Row next() {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
int size= 0;
try {
size = in.readInt();
} catch (IOException e) {
e.printStackTrace();
}
});
Row row = Row.deserialize(descriptor, in);
int signal = in.readInt();
if (signal == FILE.END) {
file_end = true;
return row;
}
return row;
}
But it seems to me that because the thread which does “size = in.readInt();” and the main thread which does “Row row = Row.deserialize(descriptor, in);” shares the same stream. They need to happen one after one. Still no parallelism is achieved. Any better way to implement this asynchronous iterator? Thanks.