saída de um programa mapreduce como input para outro programa mapreduce

Eu estou tentando um exemplo simples, em que a saída de um trabalho MapReduce deve ser a input de outro trabalho MapReduce.

O stream deve ser assim: Mapper1 --> Reducer1 --> Mapper2 --> Reducer2 (A saída do Mapper1 deve ser a input do Reducer1. A saída do Reducer1 deve ser a input do Mapper2. A saída do Mapper2 deve ser a input de Reducer2. A saída de Reducer2 deve ser armazenada no arquivo de saída).

Como posso adicionar vários mapeadores e redutores ao meu programa, de modo que o stream seja mantido como acima?

Preciso usar mapeadores de cadeia ou redutores de cadeia? Se sim, como posso usá-los?

Você precisa implementar dois trabalhos MapReduce separados para isso. O resultado do primeiro trabalho precisa ser gravado em algum armazenamento persistente (como HDFS) e será lido pelo segundo trabalho. O SequenceOutputFormat / InputFormat é geralmente usado para isso. Ambas as tarefas MapReduce podem ser executadas a partir do mesmo programa de driver.

Eu acho que o que você está procurando é ControlledJob e JobControl. Adapta-se apropriadamente ao seu propósito. Em uma única class Driver, você pode criar várias tarefas com dependencies uma da outra. O código a seguir pode ajudar você a entender.

  Job jobOne = Job(jobOneConf, "Job-1"); FileInputFormat.addInputPath(jobOne, jobOneInput); FileOutputFormat.setOutputPath(jobOne, jobOneOutput); ControlledJob jobOneControl = new ControlledJob(jobOneConf); jobOneControl.setJob(jobOne); Job jobTwo = Job(jobTwoConf, "Job-2"); FileInputFormat.addInputPath(jobTwo, jobOneOutput); // here we set the job-1's output as job-2's input FileOutputFormat.setOutputPath(jobTwo, jobTwoOutput); // final output ControlledJob jobTwoControl = new ControlledJob(jobTwoConf); jobTwoControl.setJob(jobTwo); JobControl jobControl = new JobControl("job-control"); jobControl.add(jobOneControl); jobControl.add(jobTwoControl); jobTwoControl.addDependingJob(jobOneControl); // this condition makes the job-2 wait until job-1 is done Thread jobControlThread = new Thread(jobControl); jobControlThread.start(); jobControlThread.join(); /* The jobControl.allFinished() can also be used to wait until all jobs are done */