spark-submit
--master yarn
--deploy-mode cluster
--num-executors=8
--executor-memory=4G
--driver-memory=10G
--conf spark.yarn.submit.waitAppCompletion=false
--conf spark.yarn.maxAppAttempts=1
--conf spark.default.parallelism=16
--jars jars/hive-warehouse-connector-assembly.jar
--class classnameMain jarname.jar TEST1
object KC {
def sendMsg(topic: String, key: String, value: String): Unit = {
val kafkaServer = configManager.getString("Kafka.Server")
val props = new Properties()
props.put("bootstrap.servers", kafkaServer)
props.put("key.serializer", )
props.put("value.serializer",)
props.put("batch.size", "1")
props.put("acks", "all")
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String](topic, key, value)
producer.send(message)
}
def completeMSG(Id: BigInt, tableName: String, topic: String, envi:String ): Unit = {
val kafkaServer = configManager.getString("Kafka.Server")
val props = new Properties()
props.put("bootstrap.servers", kafkaServer)
props.put("key.serializer", )
props.put("value.serializer",)
props.put("batch.size", "1")
props.put("acks", "all")
val producer = new KafkaProducer[String, String](props)
val key = key_names
val value = value_names
val message = new ProducerRecord[String, String](s"{$envi_$topic}", key, value)
producer.send(message)
}
}
def main(args: Array[String]): Unit = {
configManager.setup("application.conf")
val warehouseLocation = ""
val spark = SparkSession.builder()
.master("local[*]")
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.config("hive.metastore.uris", "")
.config("hive.exec.dynamic.partition", "")
.config("hive.exec.dynamic.partition.mode", "")
.enableHiveSupport()
.getOrCreate()
val Id = 12345
val tableName = employee
logManager.logInfo(spark, jobId, "start of Kafka message")
val topic = "XLEMP"
val key = "XL"
val value = "Id +valueId+"
sendMsg(topic, key, value)
logManager.logInfo(spark, jobId, "end of Kafka message")
}
import KC
employeedetail.scala --> this is a file under which we are calling completeMSG method
KC.completeMSG(Id, "tblename", topicname, envi = ?)
So I want (TEST1) which is envi in completeMSG method. TEST1 is a argument which nothing but environment name. It may be TEST2 , Q2 and so on. I want to pass this argument(environment) name along with kafka topic. So it should be like envi_topic how the value of envi comes ?