2

I am reading data from Kafka and trying to write it to the HDFS file system in ORC format. I have used the below link reference from their official website. But I can see that Flink write exact same content for all data and make so many files and all files are ok 103KB

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#orc-format

Please find my code below.

object BeaconBatchIngest extends StreamingBase {
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  def getTopicConfig(configs: List[Config]): Map[String, String]  = (for (config: Config <- configs) yield (config.getString("sourceTopic"), config.getString("destinationTopic"))).toMap

  def setKafkaConfig():Unit ={
    val kafkaParams = new Properties()
    kafkaParams.setProperty("bootstrap.servers","")
    kafkaParams.setProperty("zookeeper.connect","")
    kafkaParams.setProperty("group.id", DEFAULT_KAFKA_GROUP_ID)
    kafkaParams.setProperty("auto.offset.reset", "latest")
    
    val kafka_consumer:FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String]("sourceTopics", new SimpleStringSchema(),kafkaParams)
    kafka_consumer.setStartFromLatest()
    val stream: DataStream[DataParse] = env.addSource(kafka_consumer).map(new temp)
    val schema: String = "struct<_col0:string,_col1:bigint,_col2:string,_col3:string,_col4:string>"
    val writerProperties = new Properties()

    writerProperties.setProperty("orc.compress", "ZLIB")
    val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema),writerProperties,new org.apache.hadoop.conf.Configuration);
    val sink: StreamingFileSink[DataParse] = StreamingFileSink
          .forBulkFormat(new Path("hdfs://warehousestore/hive/warehouse/metrics_test.db/upp_raw_prod/hour=1/"), writerFactory)
          .build()
    stream.addSink(sink)
  }


  def main(args: Array[String]): Unit = {
    setKafkaConfig()
    env.enableCheckpointing(5000)
    env.execute("Kafka_Flink_HIVE")
  }
}
class temp extends MapFunction[String,DataParse]{

  override def map(record: String): DataParse = {
    new DataParse(record)
  }
}

class DataParse(data : String){
  val parsedJason = parse(data)
  val timestamp = compact(render(parsedJason \ "timestamp")).replaceAll("\"", "").toLong
  val event = compact(render(parsedJason \ "event")).replaceAll("\"", "")
  val source_id = compact(render(parsedJason \ "source_id")).replaceAll("\"", "")
  val app = compact(render(parsedJason \ "app")).replaceAll("\"", "")
  val json = data
}
class PersonVectorizer(schema: String) extends Vectorizer[DataParse](schema) {

  override def vectorize(element: DataParse, batch: VectorizedRowBatch): Unit = {
    val eventColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
    val timeColVector = batch.cols(1).asInstanceOf[LongColumnVector]
    val sourceIdColVector = batch.cols(2).asInstanceOf[BytesColumnVector]
    val appColVector = batch.cols(3).asInstanceOf[BytesColumnVector]
    val jsonColVector = batch.cols(4).asInstanceOf[BytesColumnVector]
    timeColVector.vector(batch.size + 1) = element.timestamp
    eventColVector.setVal(batch.size + 1, element.event.getBytes(StandardCharsets.UTF_8))
    sourceIdColVector.setVal(batch.size + 1, element.source_id.getBytes(StandardCharsets.UTF_8))
    appColVector.setVal(batch.size + 1, element.app.getBytes(StandardCharsets.UTF_8))
    jsonColVector.setVal(batch.size + 1, element.json.getBytes(StandardCharsets.UTF_8))
  }

}
David Anderson
  • 39,434
  • 4
  • 33
  • 60

1 Answers1

0

With bulk formats (such as ORC), the StreamingFileSink rolls over to new files with every checkpoint. If you reduce the checkpointing interval (currently 5 seconds), it won't write so many files.

David Anderson
  • 39,434
  • 4
  • 33
  • 60
  • Yes, correct. I changed the time to 3mins but still, Flink generates 103B files only. Is there any problem with code? I can read data from Kafka as I checked it by printing it. – patel akash Jul 10 '20 at 17:57
  • Have you tried printing `stream`? I'm not convinced that `.map(new temp)` is doing what you expect. – David Anderson Jul 10 '20 at 18:28
  • Yes, I tried printing it and it is printing, Maybe I am creating class wrongly. My stream will come in JSON String format and wanted to convert in class so I can use Flink's ORC lib. Can you please look it if correct in code? – patel akash Jul 10 '20 at 18:33
  • I can't tell if your code will work or not, but I have some doubts. You might want to run it in a debugger and see what it's doing. I'm not sure I understand "But I can see that Flink write exact same content for all data and make so many files and all files are ok 103KB" but it sounds like you are getting many files, all of which contain the same thing. If that's the case, then something is probably wrong with `DataParse` or `PersonVectorizer`. – David Anderson Jul 10 '20 at 18:41
  • By the way, it would be better to use a DeserializationSchema or a KafkaDeserializationSchema rather than a SimpleStringSchema -- but reading the stream as a string and then parsing can be made to work, it's just less efficient. – David Anderson Jul 10 '20 at 18:53
  • `PersonVectorizer` is just copy-paste from the official website and `DataParse` is just to create new objects. Can you point me to any sample code for this? this will be really helpful. – patel akash Jul 10 '20 at 19:19
  • https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#orc-format Just want to point out that here in sample code for `PersonVectorizer`, in the java code sample, `int row = batch.size++;` which increments batch.size value and same increment is not done in scala code, is it correct? – patel akash Jul 10 '20 at 19:22
  • And I tried to removed `DataParse` and kept data in String and changed `class PersonVectorizer(schema: String) extends Vectorizer[String](schema)` still getting the same result, files are generated after checkPointInterval but with same size and content – patel akash Jul 10 '20 at 19:47
  • I don't see anything wrong. You might compare with https://github.com/apache/flink/blob/release-1.11/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java – David Anderson Jul 10 '20 at 20:13