2

I am building a gradle java project (please refer below) using Apache Beam code and executing on Eclipse Oxygen.

package com.xxxx.beam;

import java.io.IOException;

import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkPipelineResult;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;

public class ApacheBeamTestProject {

    public void modelExecution(){

        SparkContextOptions options = (SparkContextOptions) PipelineOptionsFactory.create();
         options.setSparkMaster("xxxxxxxxx"); 


         JavaSparkContext sc = options.getProvidedSparkContext();

         JavaLinearRegressionWithSGDExample.runJavaLinearRegressionWithSGDExample(sc);

         Pipeline p = Pipeline.create(options);


              p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
                     // withCompression can be omitted - by default compression is detected from the filename.
                     .apply(FileIO.readMatches())
                     .apply(MapElements
                         // uses imports from TypeDescriptors
                             .via(
                                     new SimpleFunction <ReadableFile, KV<String,String>>() {

                                        private static final long serialVersionUID = -5715607038612883677L;

                                                @SuppressWarnings("unused")
                                                public KV<String,String> createKV(ReadableFile f) {
                                                      String temp = null;
                                                      try{
                                                          temp = f.readFullyAsUTF8String();
                                                      }catch(IOException e){

                                                      }
                                                    return KV.of(f.getMetadata().resourceId().toString(), temp);
                                                  }
                                                }
                                     ))
                     .apply(FileIO.write())

                     ;

         SparkPipelineResult result = (SparkPipelineResult) p.run();

         result.getState();
    }

    public static void main(String[] args) throws IOException {
        System.out.println("Test log");

         PipelineOptions options = PipelineOptionsFactory.create();

         Pipeline p = Pipeline.create(options);

          p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
                 // withCompression can be omitted - by default compression is detected from the filename.
                 .apply(FileIO.readMatches())
                 .apply(MapElements
                     // uses imports from TypeDescriptors
                         .via(
                                 new SimpleFunction <ReadableFile, KV<String,String>>() {

                                    private static final long serialVersionUID = -5715607038612883677L;

                                            @SuppressWarnings("unused")
                                            public KV<String,String> createKV(ReadableFile f) {
                                                  String temp = null;
                                                  try{
                                                      temp = f.readFullyAsUTF8String();
                                                  }catch(IOException e){

                                                  }
                                                return KV.of(f.getMetadata().resourceId().toString(), temp);
                                              }
                                            }
                                 ))
                 .apply(FileIO.write());
         p.run();
    }
}

I am observing the following error when executing this project in Eclipse.

Test log
Exception in thread "main" java.lang.IllegalArgumentException: No Runner was specified and the DirectRunner was not found on the classpath.
Specify a runner by either:
    Explicitly specifying a runner by providing the 'runner' property
    Adding the DirectRunner to the classpath
    Calling 'PipelineOptions.setRunner(PipelineRunner)' directly
    at org.apache.beam.sdk.options.PipelineOptions$DirectRunner.create(PipelineOptions.java:291)
    at org.apache.beam.sdk.options.PipelineOptions$DirectRunner.create(PipelineOptions.java:281)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:591)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:532)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:155)
    at org.apache.beam.sdk.options.PipelineOptionsValidator.validate(PipelineOptionsValidator.java:95)
    at org.apache.beam.sdk.options.PipelineOptionsValidator.validate(PipelineOptionsValidator.java:49)
    at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:44)
    at org.apache.beam.sdk.Pipeline.create(Pipeline.java:150)

This project doesn't contain pom.xml file. Gradle has setup for all the links. I am not sure how to fix this error? Could someone advise?

Stella
  • 1,728
  • 5
  • 41
  • 95

2 Answers2

4

It seems that you are trying to use the DirectRunner and it is not on the classpath of your application. You can supply it by adding beam-runners-direct-java dependency to your application:

https://mvnrepository.com/artifact/org.apache.beam/beam-runners-direct-java

EDIT (answered in comment): you are trying to run this code on spark, but didn't specify it in PipelineOptions. Beam by default tries to run the code on DirectRunner, so I think this is why you get this error. Specifying: options.setRunner(SparkRunner.class); before creating the pipeline sets the correct runner and fixes the issue.

gajos
  • 897
  • 2
  • 10
  • 21
  • 1
    Actually, in the "project and external dependencies" in the project, i have all the dependency jar files available. for ex: "org.apache.beam:beam-sdks-java-core:2.3.0", "org.apache.beam:beam-runners-spark:2.3.0", "org.apache.spark:spark-core_2.11:2.2.1" are already available in the project. I am not sure why still I'm getting adding DirectRunner error. – Stella May 22 '18 at 15:51
  • ok, so you are trying to run this code on spark, but didn't specify it in PipelineOptions. Beam by default tries to run the code on DirectRunner, so I think this is why you get this error. Does specifying: `options.setRunner(SparkRunner.class);` before creating the pipeline help? – gajos May 22 '18 at 16:19
  • I added it, and it is working fine in that case. I am getting an error like this : Exception in thread "main" java.lang.IllegalStateException: Subclass of SimpleFunction must override 'apply' method or pass a SerializableFunction to the constructor, usually via a lambda or method reference. at org.apache.beam.sdk.transforms.SimpleFunction.(SimpleFunction.java:48) – Stella May 22 '18 at 16:23
  • This is yet another issue in this code - you should overwrite the apply method, see here: https://beam.apache.org/documentation/sdks/javadoc/0.4.0/org/apache/beam/sdk/transforms/SimpleFunction.html – gajos May 22 '18 at 16:49
  • Also it could be helpful to check out the Beam's Word count examples: https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java – gajos May 22 '18 at 16:50
  • Ok thanks. Can you advise which function should i add for my code? I tried adding the apply function, but not solved. – Stella May 22 '18 at 16:59
  • /** A SimpleFunction that converts a Word and Count into a printable string. */ public static class FormatAsTextFn extends SimpleFunction, String> { /** * */ private static final long serialVersionUID = -5715607038612883677L; @Override public String apply(KV input) { return input.getKey() + ": " + input.getValue(); } } – Stella May 22 '18 at 17:03
0

Downloading the beam-runners-direct-java-x.x.x.jar and adding it to the project classpath worked for me. Please refer to this maven repository to download the DirectRunner jar file.

Furthermore, if you need a specific beam runner for your project, you can pass the runner name as a program argument (eg: --runner=DataflowRunner) and add the corresponding jar to the project classpath.