Hadoop Oozie学习笔记E0720: Fork/join mismatch, node [join_
Oozie是个针对Hadoop的工作流,有些自己的语法. 这两天碰到一个异常,查看源码才明白Oozie的join只允许承接fork下来的任务,否则会报以下错误.整个异常如下:
WARN CallableQueueService$CallableWrapper:528 - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[-] ACTION[-] exception callable [signal], E0720: Fork/join mismatch, node [join_node_name]
org.apache.oozie.command.CommandException: E0720: Fork/join mismatch, node [tianqi_sawlog_transformation_done]
at org.apache.oozie.command.wf.SignalCommand.call(SignalCommand.java:213)
at org.apache.oozie.command.wf.SignalCommand.execute(SignalCommand.java:305)
at org.apache.oozie.command.wf.SignalCommand.execute(SignalCommand.java:59)
at org.apache.oozie.command.Command.call(Command.java:202)
at org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:128)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: org.apache.oozie.workflow.WorkflowException: E0720: Fork/join mismatch, node [tianqi_sawlog_transformation_done]
at org.apache.oozie.workflow.lite.JoinNodeDef$JoinNodeHandler.loopDetection(JoinNodeDef.java:44)
at org.apache.oozie.workflow.lite.LiteWorkflowInstance.signal(LiteWorkflowInstance.java:203)
at org.apache.oozie.workflow.lite.LiteWorkflowInstance.signal(LiteWorkflowInstance.java:284)
at org.apache.oozie.command.wf.SignalCommand.call(SignalCommand.java:120)
... 7 more
源码来自org.apache.oozie.workflow.lite.JoinNodeDef,检测这个语法的代码如下:
- public void loopDetection(Context context) throws WorkflowException {
- String flag = getLoopFlag(context.getNodeDef().getName());
- if (context.getVar(flag) != null) {
- throw new WorkflowException(ErrorCode.E0709, context.getNodeDef().getName());
- }
- String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
- String forkCount = context.getVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath);
- if (forkCount == null) {
- throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
- }
- int count = Integer.parseInt(forkCount) - 1;
- if (count == 0) {
- context.setVar(flag, "true");
- }
- }
- public boolean enter(Context context) throws WorkflowException {
- String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
- String forkCount = context.getVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath);
- if (forkCount == null) {
- throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
- }
- int count = Integer.parseInt(forkCount) - 1;
- if (count > 0) {
- context.setVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath, "" + count);
- context.deleteExecutionPath();
- }
- else {
- context.setVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath, null);
- }
- return (count == 0);
- }