I am building a Flink pipeline and based on live input data need to read records from archive files in a RichFlatMapFunction (e.g. each day I want to read files from the previous day and week). I'm wondering what is the best way to do that?
I could use the Hadoop APIs directly, so that is what I'm trying next.
That would be something like this:
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
class LoadHistory(
var basePath: String,
var pathTemplate: String,
) extends RichFlatMapFunction[(TypeAlias.GridId, TypeAlias.Timestamp), ArchiveRecord] {
// see
// https://programmerall.com/article/34422316834/
// https://stackoverflow.com/questions/37085528/hadoop-with-binary-files
// https://data-flair.training/blogs/hdfs-data-read-operation
val fileSystem = FileSystem.get(new conf.Configuration())
def formatPath(pathTemplate: String, gridId: TypeAlias.GridId, archiveDate: TypeAlias.Timestamp): String = ???
override def flatMap(value: (TypeAlias.GridId, TypeAlias.Timestamp), out: Collector[ArchiveRecord]): Unit = {
val pathStr = formatPath(pathTemplate, value._1, value._2)
val path = new Path(pathStr)
if (!fileSystem.exists(path)) {
return
}
val in: FSDataInputStream = fileSystem.open(path)
if (pathStr.endsWith(".protobuf")) {
// TODO read file
} else {
assert(pathStr.endsWith(".lz4"))
// TODO read file
}
}
}
I'm new with Hadoop, so I figure I'll need to configure it before reading data from cloud storage (e.g. replace new Configuration()
with something meaningful). I know Flink uses Hadoop to read files internally, so I am wondering if I can access the configuration or configured HadoopFileSystem object being used by Flink at runtime.
Previously I tried starting a Flink batch job inside the FlatMapFunction (ending with env.collect), but it seems to have resulted in thread-locking (job 2 won't start until job 1 is done).