33

I'm developing a Spark Application and I'm used to Spring as a Dependency Injection Framework. Now I'm stuck with the problem, that the processing part uses the @Autowired functionality of Spring, but it is serialized and deserialized by Spark.

So the following code gets me into trouble:

Processor processor = ...; // This is a Spring constructed object
                           // and makes all the trouble
JavaRDD<Txn> rdd = ...; // some data for Spark
rdd.foreachPartition(processor);

The Processor looks like that:

public class Processor implements VoidFunction<Iterator<Txn>>, Serializeable {
    private static final long serialVersionUID = 1L;

    @Autowired // This will not work if the object is deserialized
    private transient DatabaseConnection db;

    @Override
    public void call(Iterator<Txn> txns) {
        ... // do some fance stuff
        db.store(txns);
    }
}

So my question is: Is it even possible to use something like Spring in combination with Spark? If not, what is the most elegant way to do something like that? Any help is appreciated!

itsme
  • 852
  • 1
  • 10
  • 23
  • 2
    if the problem is that you deserialize the object and the `@Autowired` runs only on first initialization, then you *could* technically get the ApplicationContext and force it to inject your transient objects manually. – EpicPandaForce May 05 '15 at 12:56

1 Answers1

32

FROM THE QUESTION ASKER: Added: To interfere the deserialization part directly without modifying your own classes use the following spring-spark project by parapluplu. This projects autowires your bean when it gets deserialized by spring.


EDIT:

In order to use Spark, you need the following setup (also seen in this repository):

  • Spring Boot + Spark:

.

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.2.RELEASE</version>
    <relativePath/>
    <!-- lookup parent from repository -->
</parent>

...

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>

    <!-- fix java.lang.ClassNotFoundException: org.codehaus.commons.compiler.UncheckedCompileException -->
    <dependency>
        <groupId>org.codehaus.janino</groupId>
        <artifactId>commons-compiler</artifactId>
        <version>2.7.8</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.slf4j/log4j-over-slf4j -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>log4j-over-slf4j</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.6.4</version>
    </dependency>

</dependencies>

Then you need the application class, as usual with Spring Boot:

@SpringBootApplication
public class SparkExperimentApplication {

    public static void main(String[] args) {
        SpringApplication.run(SparkExperimentApplication.class, args);
    }
}

And then a configuration that binds it all together

@Configuration
@PropertySource("classpath:application.properties")
public class ApplicationConfig {

    @Autowired
    private Environment env;

    @Value("${app.name:jigsaw}")
    private String appName;

    @Value("${spark.home}")
    private String sparkHome;

    @Value("${master.uri:local}")
    private String masterUri;

    @Bean
    public SparkConf sparkConf() {
        SparkConf sparkConf = new SparkConf()
                .setAppName(appName)
                .setSparkHome(sparkHome)
                .setMaster(masterUri);

        return sparkConf;
    }

    @Bean
    public JavaSparkContext javaSparkContext() {
        return new JavaSparkContext(sparkConf());
    }

    @Bean
    public SparkSession sparkSession() {
        return SparkSession
                .builder()
                .sparkContext(javaSparkContext().sc())
                .appName("Java Spark SQL basic example")
                .getOrCreate();
    }

    @Bean
    public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }
}

Then you can use SparkSession class to communicate with Spark SQL:

/**
 * Created by achat1 on 9/23/15.
 * Just an example to see if it works.
 */
@Component
public class WordCount {
    @Autowired
    private SparkSession sparkSession;

    public List<Count> count() {
        String input = "hello world hello hello hello";
        String[] _words = input.split(" ");
        List<Word> words = Arrays.stream(_words).map(Word::new).collect(Collectors.toList());
        Dataset<Row> dataFrame = sparkSession.createDataFrame(words, Word.class);
        dataFrame.show();
        //StructType structType = dataFrame.schema();

        RelationalGroupedDataset groupedDataset = dataFrame.groupBy(col("word"));
        groupedDataset.count().show();
        List<Row> rows = groupedDataset.count().collectAsList();//JavaConversions.asScalaBuffer(words)).count();
        return rows.stream().map(new Function<Row, Count>() {
            @Override
            public Count apply(Row row) {
                return new Count(row.getString(0), row.getLong(1));
            }
        }).collect(Collectors.toList());
    }
}

Referring to these two classes:

public class Word {
    private String word;

    public Word() {
    }

    public Word(String word) {
        this.word = word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public String getWord() {
        return word;
    }
}

public class Count {
    private String word;
    private long count;

    public Count() {
    }

    public Count(String word, long count) {
        this.word = word;
        this.count = count;
    }

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }
}

Then you can run see it returns the right data:

@RequestMapping("api")
@Controller
public class ApiController {
    @Autowired
    WordCount wordCount;

    @RequestMapping("wordcount")
    public ResponseEntity<List<Count>> words() {
        return new ResponseEntity<>(wordCount.count(), HttpStatus.OK);
    }
}

Says

[{"word":"hello","count":4},{"word":"world","count":1}]
EpicPandaForce
  • 79,669
  • 27
  • 256
  • 428
  • I really would like to keep the Processing class untouched. Isn't there some way to interfere the Deserializer to inject dependencies there? – itsme May 05 '15 at 13:51
  • 2
    I don't know, this is the only solution I could think of based on what I know - and it might not be the best one :) – EpicPandaForce May 05 '15 at 15:30
  • 1
    OK but this helps me anyway. I'll just use it in a slightly different way. I'll inject the dependencies after deserializing automatically. Therefore I'll extend the kryo serialization class. I'll edit your answer when I've done that, because this solution will be based on your answer. – itsme May 06 '15 at 05:55
  • 1
    OK, I created a project on github for that and linked that in your post. I hope that is OK for you. Thank you very much for the hint! – itsme May 06 '15 at 16:55
  • You're welcome! I didn't know you can autowire a `AutowireCapableBeanFactory`, that does indeed make it simpler than hacking it out of the `servlet context`. I checked the project, nice work! – EpicPandaForce May 06 '15 at 17:02
  • @user3290983 that's awkward, I found this fork of it though: https://github.com/lgscofield/spring-spark – EpicPandaForce Jan 04 '17 at 21:44
  • @EpicPandaForce good example on how to use it with Boot. A question, does your code run from the client machine or does it need to be packaged and submitted to the spark master? – selvinsource Dec 17 '17 at 17:15
  • @selvinsource unfortunately I put this together and was happy with it, but in the end we didn't *need* Spark and so I am actually not sure about that part as I didn't have to figure it out. – EpicPandaForce Dec 17 '17 at 18:46
  • 1
    @EpicPandaForce I am doing some tests and it seems you can do this by setting .config("spark.jars", "target/simple-project-1.0.jar") in addition to the master url as you suggested. Thanks anyway. – selvinsource Dec 18 '17 at 08:39
  • @EpicPandaForce do you have an example of using the spring-spark project you mentioned in your answer? Thanks – Popeye Mar 28 '18 at 20:00
  • No i don't, that was added by the original question asker and he has since even removed that repository for whatever reason – EpicPandaForce Mar 28 '18 at 20:18
  • @EpicPandaForce is this recommended to run Spark with Spring boot in this embedded mode. Any issues that you can foresee? – unnik Apr 26 '20 at 13:13
  • @unnik "recommended" I just punched the code until it compiled and run. We never ended up using Spark (as I had previously mentioned above 2.5 years ago) so I don't know if it's "recommended". The only benefit was that it *worked* to the degree of a "hello world", which was already more than any sample I could find. – EpicPandaForce Apr 26 '20 at 13:16