I'm new to spring framework, and am trying to use @Service
annotation. Have read online but am still confused to undertsand.
Here I've three classes
ProcessingApplication class
@SpringBootApplication
public class ProcessingApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessingApplication.class);
public ProcessingApplication() {
}
public static void main(String[] args) {
SpringApplication app = new SpringApplication(ProcessingApplication.class);
app.run(args);
}
public void run(String...args) {
}
}
ProcessingService Class
@Service
public class ProcessingService {
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessingService.class);
public ProcessingService() {
}
public void processing() {
try {
SparkSession ss = SparkServiceUtil.getSparkSession(ServiceConstants.RUN_LOCAL_WITH_AVAILABLE_CORES);
} catch (Exception e) {
LOGGER.info(e.getMessage());
}
}
}
SparkServiceUtil class
@Component
public class SparkServiceUtil implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LoggerFactory.getLogger(SparkServiceUtil.class);
@Autowired
private static DatabaseProperties dbProperties;
@Autowired
private static HdfsProperties hdfsProperties;
private static SparkConf sparkConf;
private static SparkSession sparkSession;
private static UDF1<String, String> uuid = str -> UUID.randomUUID().toString();
private static void getSparkConf(String master) {
LOGGER.info("Getting Spark Conf");
sparkConf = new SparkConf().setMaster(master)
.setAppName(ServiceConstants.SPARK_APP_NAME)
.set(ServiceConstants.SPARK_DB_CON_HOST, dbProperties.getHost())
.set(ServiceConstants.SPARK_DB_CON_PORT, dbProperties.getNativePort())
.set(ServiceConstants.SPARK_DB_CON_USERNAME, dbProperties.getAuthUsername())
.set(ServiceConstants.SPARK_DB_CON_PASSWORD, dbProperties.getAuthPassword())
.set("spark.sql.caseSensitive", ServiceConstants.CASE_SENSITIVE);
sparkConf.setExecutorEnv(sparkConf.getAll());
}
public static synchronized SparkSession getSparkSession(String master) {
if (sparkSession == null) {
if (sparkConf == null) {
getSparkConf(master);
}
}
sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
sparkSession.udf().register("uuid", uuid, DataTypes.StringType);
return sparkSession;
}
public static Dataset<Row> startRSVPStream(){
return sparkSession.readStream()
.format(KafkaConstants.STREAM_FORMAT)
.option("kafka.bootstrap.servers", KafkaConstants.KAFKA_BROKERS)
.option("subscribe", KafkaConstants.KAFKA_TOPIC)
.option("failOnDataLoss", false)
.load();
/*.selectExpr("CAST(value AS String")
.writeStream()
.format(KafkaConstants.STREAM_FORMAT)
.option("kafka.bootstrap.servers", KafkaConstants.KAFKA_BROKERS)
.option("subscribe", KafkaConstants.KAFKA_TOPIC)
.option("checkpointLocation", KafkaConstants.CHECKPOINT_LOCATION)
.trigger(Trigger.Continuous("1 second"))
.start();
.selectExpr(col("timestamp"),
from_json(col("value").cast("string"), RSVpSchema.RSVP_SCHEMA)
.alias("rsvp"))
.alias("meetup")
.select("meetup.*"))*/
}
public static Configuration getHDFSConf() {
LOGGER.info("Getting Hadoop Config");
Configuration conf = new Configuration();
conf.set(ServiceConstants.HDFS_FS_DEFAULTS, hdfsProperties.getFsDefaultFs());
conf.set(ServiceConstants.HDFS_FS_IMPL, hdfsProperties.getFsHdfsImpl());
conf.set(ServiceConstants.HDFS_DFS_NAMENODE_KERBORSE_PRINCIPAL, hdfsProperties.getDfsNameNodeKerberosePrincipal());
conf.set(ServiceConstants.HDFS_HADOOP_SECURITY_AUTH, hdfsProperties.getHadoopSecurityAuth());
return conf;
}
}
I'm using ProcessingService
as a @Service
class. Becasue am thinking its going to be act as service layer which is going to intact with all spark initilzaion and start processing the messages. Does it make sense to be used as @Service
class ?
If am wrong, can anyone please help me to understand in simple terms when I can use @Servcie
annotation and what's appropriate annotation I should be using for ProcessingService
class ?