0

Can some one tell me how Pig instantiates UDF objects? I used Pig to construct a pipeline to process some data. I deployed the pipeline in multi-node Hadoop cluster And I want to save all intermediate results that is produced after each step in the pipeline. So I wrote a UDF in Java that will open a HTTP connection at initialization and transmit data in exec. Also, I will close the connection in finalize of the object.

My script can be simplified as follow:

REGISTER MyPackage.jar;
DEFINE InterStore test.InterStore('localhost', '58888');
DEFINE Clean      test.Clean();

raw = LOAD 'mydata';
cleaned = FILTER (FOREACH raw GENERATE FLATTEN(Clean(*))) BY NOT ($0 MATCHES '');
cleaned = FOREACH cleaned GENERATE FLATTEN(InterStore(*));
named = FOREACH cleaned GENERATE $1 AS LocationID, $2 AS AccessCount;
named = FOREACH named GENERATE FLATTEN(InterStore(*)) AS (LocationID, AccessCount);
grp = GROUP named BY LocationID;
grp = FOREACH grp GENERATE FLATTEN(InterStore(*)) AS (group, named:{(LocationID, AccessCount)});
sum = FOREACH grp GENERATE group AS LocationID, SUM(named.AccessCount) AS TotalAccesses;
sum = FOREACH sum GENERATE FLATTEN(InterStore(*)) AS (LocationID, TotalAccesses);
ordered = ORDER sum BY TotalAccesses DESC;
STORE ordered INTO 'result';

And the code for InterStore can be simplified like below:

class InterStore extends EvalFunc<Tuple>{
  HttpURLConnection con;  //Avoid redundant connection establishment in exec
  public InterStore(String ip, String port) throws IOException
  {
    URL url = new URL("http://" + ip + ':' + port);
    con = (HttpURLConnection)url.openConnection();
    con.setRequestMethod("PUT");
    con.setDoOutput(true);
    con.setDoInput(true);
  }
  public Tuple exec(Tuple input) throws IOException
  {
    con.getOutputStream().write((input.toDelimitedString(",")+'\n').getBytes());
    return input;
  }
  @Override
  protected void finalize() throws Throwable
  {
    con.getOutputStream().close();
    int respcode = con.getResponseCode();
    BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
    System.out.printf("Resp Code:%d, %s\n", respcode, in.readLine());
    in.close();
  }
}

However, I found that the HTTP connection cannot transmit data successfully as it does in local mode. How to deal with that?

Trams
  • 239
  • 1
  • 3
  • 11

1 Answers1

0

Is there a service listening on 'localhost', '58888'?

Note that the local host are differs by each execution node, you might want to do this:

%default LHOST `localhost` 

and use this variable as parameter

DEFINE InterStore test.InterStore('$LHOST', '58888');

In general I would do some printouts in the UDF and double check the parameters passed to it, and test the connection (like ping and check if the port is accessible from the hadoop node)

kecso
  • 2,387
  • 2
  • 18
  • 29