0

I am building a Flink pipeline and based on live input data need to read records from archive files in a RichFlatMapFunction (e.g. each day I want to read files from the previous day and week). I'm wondering what is the best way to do that?

I could use the Hadoop APIs directly, so that is what I'm trying next.

That would be something like this:

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;

class LoadHistory(
  var basePath: String,
  var pathTemplate: String,
) extends RichFlatMapFunction[(TypeAlias.GridId, TypeAlias.Timestamp), ArchiveRecord] {

  // see
  // https://programmerall.com/article/34422316834/
  // https://stackoverflow.com/questions/37085528/hadoop-with-binary-files
  // https://data-flair.training/blogs/hdfs-data-read-operation

  val fileSystem = FileSystem.get(new conf.Configuration())

  def formatPath(pathTemplate: String, gridId: TypeAlias.GridId, archiveDate: TypeAlias.Timestamp): String = ???

  override def flatMap(value: (TypeAlias.GridId, TypeAlias.Timestamp), out: Collector[ArchiveRecord]): Unit = {
    val pathStr = formatPath(pathTemplate, value._1, value._2)
    val path = new Path(pathStr)

    if (!fileSystem.exists(path)) {
      return
    }
    
    val in: FSDataInputStream = fileSystem.open(path)
    if (pathStr.endsWith(".protobuf")) {
      // TODO read file
    } else {
      assert(pathStr.endsWith(".lz4"))
      // TODO read file
    }
  }
}

I'm new with Hadoop, so I figure I'll need to configure it before reading data from cloud storage (e.g. replace new Configuration() with something meaningful). I know Flink uses Hadoop to read files internally, so I am wondering if I can access the configuration or configured HadoopFileSystem object being used by Flink at runtime.

Previously I tried starting a Flink batch job inside the FlatMapFunction (ending with env.collect), but it seems to have resulted in thread-locking (job 2 won't start until job 1 is done).

  • 2
    I feel like you'd be better with Flink SQL to create a table from those files, that you can join/filter/query against datetime fields. In the stream, you can try joining against the created table... Without seeing your code or your file-content, hard to really know what you're trying to do – OneCricketeer Jan 11 '23 at 07:23
  • The pipeline is a streaming pipeline and there are petabytes of archived data, so the Table API doesn't work for my use case. – Ivan Webber Jan 11 '23 at 18:30
  • I did find this article (https://programmerall.com/article/34422316834/). It's a little poorly written, but I think the person is trying to do the same thing I am trying: using Hadoop to read files within a Flink job, and using Flink's Hadoop configuration. – Ivan Webber Jan 11 '23 at 18:36
  • As long as you know the exact file you want to read, rather than consulting the the Hive metastore for the same, I guess that's fine. Only problem with this is that HDFS files are not typically just "one file", e.g. if you use other ETL tools to write to HDFS rather than just copy files directly in – OneCricketeer Jan 12 '23 at 14:45

1 Answers1

0

I dug into the Flink source code a little bit and found a way to get an initialized org.apache.flink.core.fs.FileSystem object from a org.apache.flink.core.fs.Path. Then that can be used to read the files:

import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FSDataInputStream;

class LoadHistory(
  var basePath: String,
  var pathTemplate: String,
) extends RichFlatMapFunction[(TypeAlias.GridId, TypeAlias.Timestamp), ArchiveRecord] {
  
  val fileSystem = new Path(basePath).getFileSystem()

  def formatPath(gridId: TypeAlias.GridId, archiveDate: TypeAlias.Timestamp): String = ???

  override def flatMap(value: (TypeAlias.GridId, TypeAlias.Timestamp), out: Collector[ArchiveRecord]): Unit = {
    val pathStr = formatPath(value._1, value._2)
    val path = new Path(pathStr)

    if (!fileSystem.exists(path)) {
      return
    }
    
    val in: FSDataInputStream = fileSystem.open(path)
    
    if (pathStr.endsWith(".protobuf")) {
      // TODO read file
    } else {
      assert(pathStr.endsWith(".lz4"))
      // TODO read file
    }
  }
}