Hadoop Oozie学习笔记E0720: Fork/join mismatch, node [join_

Oozie是个针对Hadoop的工作流,有些自己的语法. 这两天碰到一个异常,查看源码才明白Oozie的join只允许承接fork下来的任务,否则会报以下错误.整个异常如下:

  WARN CallableQueueService$CallableWrapper:528 - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[-] ACTION[-] exception callable [signal], E0720: Fork/join mismatch, node [join_node_name]

org.apache.oozie.command.CommandException: E0720: Fork/join mismatch, node [tianqi_sawlog_transformation_done]

 at org.apache.oozie.command.wf.SignalCommand.call(SignalCommand.java:213)

 at org.apache.oozie.command.wf.SignalCommand.execute(SignalCommand.java:305)

 at org.apache.oozie.command.wf.SignalCommand.execute(SignalCommand.java:59)

 at org.apache.oozie.command.Command.call(Command.java:202)

 at org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:128)

 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)

 at java.lang.Thread.run(Thread.java:662)

Caused by: org.apache.oozie.workflow.WorkflowException: E0720: Fork/join mismatch, node [tianqi_sawlog_transformation_done]

 at org.apache.oozie.workflow.lite.JoinNodeDef$JoinNodeHandler.loopDetection(JoinNodeDef.java:44)

 at org.apache.oozie.workflow.lite.LiteWorkflowInstance.signal(LiteWorkflowInstance.java:203)

 at org.apache.oozie.workflow.lite.LiteWorkflowInstance.signal(LiteWorkflowInstance.java:284)

 at org.apache.oozie.command.wf.SignalCommand.call(SignalCommand.java:120)

 ... 7 more

源码来自org.apache.oozie.workflow.lite.JoinNodeDef,检测这个语法的代码如下:

  1. public void loopDetection(Context context) throws WorkflowException {   
  2.     String flag = getLoopFlag(context.getNodeDef().getName());   
  3.     if (context.getVar(flag) != null) {   
  4.         throw new WorkflowException(ErrorCode.E0709, context.getNodeDef().getName());   
  5.     }   
  6.     String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());   
  7.     String forkCount = context.getVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath);   
  8.     if (forkCount == null) {   
  9.         throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());   
  10.     }   
  11.     int count = Integer.parseInt(forkCount) - 1;   
  12.     if (count == 0) {   
  13.         context.setVar(flag, "true");   
  14.     }   
  15. }   
  16.   
  17. public boolean enter(Context context) throws WorkflowException {   
  18.     String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());   
  19.     String forkCount = context.getVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath);   
  20.     if (forkCount == null) {   
  21.         throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());   
  22.     }   
  23.     int count = Integer.parseInt(forkCount) - 1;   
  24.     if (count > 0) {   
  25.         context.setVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath, "" + count);   
  26.         context.deleteExecutionPath();   
  27.     }   
  28.     else {   
  29.         context.setVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath, null);   
  30.     }   
  31.     return (count == 0);   
  32. }  

相关推荐