3

We are trying to run a Oozie workflow with 3 sub workflows running in parallel using fork. The sub-workflows contains a node running a native map reduce job, and subsequent two nodes running some complex PIG jobs. Finally the three sub-workflows are joined to a single end node.

When we run this workflow, we get LeaseExpiredException. The exception occurs randomly while running the PIG jobs. There is no definite place when it occurs, but it occurs every time we run the WF.

Also, if we remove the fork and run the sub-workflows sequentially, it works fine. However, our expectation is to have them run in parallel and same on some execution time.

Can you please help me understand this issue and some pointers on where we could be going wrong. We are starting with hadoop development and haven't faced such an issue earlier.

It looks like due to several tasks running in parallel, one of the threads closed a part file and when another thread tried to close the same, it throws the error.

Following is the stack trace of the exception from the hadoop logs.

2013-02-19 10:23:54,815 INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher: 57% complete 
2013-02-19 10:26:55,361 INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher: 59% complete
2013-02-19 10:27:59,666 ERROR org.apache.hadoop.hdfs.DFSClient: Exception closing file <hdfspath>/oozie-oozi/0000105-130218000850190-oozie-oozi-W/aggregateData--pig/output/_temporary/_attempt_201302180007_0380_m_000000_0/part-00000 : org.apache.hadoop.ipc.RemoteException: org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on <hdfspath>/oozie-oozi/0000105-130218000850190-oozie-oozi-W/aggregateData--pig/output/_temporary/_attempt_201302180007_0380_m_000000_0/part-00000 File does not exist. Holder DFSClient_attempt_201302180007_0380_m_000000_0 does not have any open files.
                at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:1664)
                at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:1655)
                at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:1710)
                at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:1698)
                at org.apache.hadoop.hdfs.server.namenode.NameNode.complete(NameNode.java:793)
                at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
                at java.lang.reflect.Method.invoke(Method.java:597)
                at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:557)
                at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1439)
                at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1435)
                at java.security.AccessController.doPrivileged(Native Method)
                at javax.security.auth.Subject.doAs(Subject.java:396)
                at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1278)
                at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1433)

Following is the sample for main workflow and one sub-workflow.

Main Work-Flow:

<workflow-app xmlns="uri:oozie:workflow:0.2" name="MainProcess">
<start to="forkProcessMain"/>
<fork name="forkProcessMain">
    <path start="Proc1"/>
    <path start="Proc2"/>
    <path start="Proc3"/>
</fork>
<join name="joinProcessMain" to="end"/>
<action name="Proc1">
    <sub-workflow>
        <app-path>${nameNode}${wfPath}/proc1_workflow.xml</app-path>
        <propagate-configuration/>
    </sub-workflow>
    <ok to="joinProcessMain"/>
    <error to="fail"/>
</action>   
<action name="Proc2">
    <sub-workflow>
        <app-path>${nameNode}${wfPath}/proc2_workflow.xml</app-path>
        <propagate-configuration/>
    </sub-workflow>
    <ok to="joinProcessMain"/>
    <error to="fail"/>
</action>   
<action name="Proc3">
    <sub-workflow>
        <app-path>${nameNode}${wfPath}/proc3_workflow.xml</app-path>
        <propagate-configuration/>
    </sub-workflow>
    <ok to="joinProcessMain"/>
    <error to="fail"/>
</action>   
<kill name="fail">
    <message>WF Failure, 'wf:lastErrorNode()' failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>

Sub-WorkFlow:

<workflow-app xmlns="uri:oozie:workflow:0.2" name="Sub Process">
<start to="Step1"/>
<action name="Step1">
    <java>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <prepare>
           <delete path="${step1JoinOutputPath}"/>
        </prepare>
        <configuration>
            <property>
                <name>mapred.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <main-class>com.absd.mr.step1</main-class>
        <arg>${wf:name()}</arg>
        <arg>${wf:id()}</arg>
        <arg>${tbMasterDataOutputPath}</arg>
        <arg>${step1JoinOutputPath}</arg>
        <arg>${tbQueryKeyPath}</arg>
        <capture-output/>
    </java>
    <ok to="generateValidQueryKeys"/>
    <error to="fail"/>
</action>
<action name="generateValidQueryKeys">
    <pig>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <prepare>
           <delete path="${tbValidQuerysOutputPath}"/>
        </prepare>
        <configuration>
            <property>
                <name>pig.tmpfilecompression</name>
                <value>true</value>
            </property>
            <property>
                <name>pig.tmpfilecompression.codec</name>
                <value>lzo</value>
            </property>
            <property>
                <name>pig.output.map.compression</name>
                <value>true</value>
            </property>
            <property>
                <name>pig.output.map.compression.codec</name>
                <value>lzo</value>
            </property>
            <property>
                <name>pig.output.compression</name>
                <value>true</value>
            </property>
            <property>
                <name>pig.output.compression.codec</name>
                <value>lzo</value>
            </property>
            <property>
                <name>mapred.compress.map.output</name>
                <value>true</value>
            </property>
        </configuration>
        <script>${pigDir}/tb_calc_valid_accounts.pig</script>
        <param>csvFilesDir=${csvFilesDir}</param>
        <param>step1JoinOutputPath=${step1JoinOutputPath}</param>
        <param>tbValidQuerysOutputPath=${tbValidQuerysOutputPath}</param>
        <param>piMinFAs=${piMinFAs}</param>
        <param>piMinAccounts=${piMinAccounts}</param>
        <param>parallel=80</param>
    </pig>
    <ok to="aggregateAumData"/>
    <error to="fail"/>
</action>
<action name="aggregateAumData">
    <pig>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <prepare>
           <delete path="${tbCacheDataPath}"/>
        </prepare>
        <configuration>
            <property>
                <name>pig.tmpfilecompression</name>
                <value>true</value>
            </property>
            <property>
                <name>pig.tmpfilecompression.codec</name>
                <value>lzo</value>
            </property>
            <property>
                <name>pig.output.map.compression</name>
                <value>true</value>
            </property>
            <property>
                <name>pig.output.map.compression.codec</name>
                <value>lzo</value>
            </property>
            <property>
                <name>pig.output.compression</name>
                <value>true</value>
            </property>
            <property>
                <name>pig.output.compression.codec</name>
                <value>lzo</value>
            </property>
            <property>
                <name>mapred.compress.map.output</name>
                <value>true</value>
            </property>
        </configuration>
        <script>${pigDir}/aggregationLogic.pig</script>
        <param>csvFilesDir=${csvFilesDir}</param>
        <param>tbValidQuerysOutputPath=${tbValidQuerysOutputPath}</param>
        <param>tbCacheDataPath=${tbCacheDataPath}</param>
        <param>currDate=${date}</param>
        <param>udfJarPath=${nameNode}${wfPath}/lib</param>
        <param>parallel=150</param>
      </pig>
    <ok to="loadDataToDB"/>
    <error to="fail"/>
</action>   
<kill name="fail">
    <message>WF Failure, 'wf:lastErrorNode()' failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>

Danish Khan
  • 1,893
  • 5
  • 22
  • 35
  • I have the same problem, if someone can answer this question... – Sebastien Feb 20 '13 at 15:40
  • We have the exact same problem, with Hive queries (oozie actions and beeline), and Streaming. The result: a lot of data is not processed. Some (between 5 and 20%) of the Streaming processes are killed by a SIGTERM. We use YARN and CDH-4.1.1, qnd also tried on CDH-4.1.2 and CDH-4.2.0-rc. – Alex F Feb 20 '13 at 15:47
  • @DanishKhan A similar issue is discussed here: http://stackoverflow.com/questions/14170186/leaseexpiredexception-in-hive . What Hadoop/Oozie version do you use? Do you have any custom OutputFormat in this scenario? – Lorand Bendig Feb 20 '13 at 19:09
  • @LorandBendig Thanks for pointing out the thread.Yes, we are running parallel MR jobs, as its evident from the `fork` structure, but I thought, as its being spawned by the same workflow, `Oozie` would take care of the the parallelism and temp files. We are not using `Hive` with this, also there is no CustomOutputFormat being used. – Danish Khan Feb 21 '13 at 06:42

1 Answers1

1

We've got the same error when we were running three pig actions in parallel and one of them failed. That message error is consequence of an unexpected workflow stop because one action failed, the workflow is stopped and the others actions are trying to continue. You must look at the failed action with status ERROR to know what happened, doesn't look at actions with status KILLED

chech0x
  • 96
  • 7