Son dos los jars necesarios que importaremos en nuestro proyecto de eclipse y que se pueden descargar de :
- common
- mapreduce-client-core
Para implementar un Map Reduce se deben crear tres clases :
Map
public static class ExampleMapper extends Mapper
Ha de extender de la clase Mapper de hadoop y tendremos que sobreescribir el metodo map, que tiene como atributos key, value y context
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
Aquí se reciben los bloques de datos de información y la deberemos tratar linía a linía, para ello definiremos el comportamiento deseado de nuestro algoritmo y tambien determinaremos los valores de entrada ( key: un apuntador hacía nuestro fichero | value: cada una de las linias) y el formato de salida que se convertirá en la entrada del Reduce (context), por ejemplo :
context.write(new Text(valorTratado1), new DoubleWritable(Double.parseDouble(valorTratado2)));
Reduce
public static class ExampleReducer extends Reducer
Al igual que en el caso de map, reduce debe sobreescibir el método reduce :
public void reduce(Text key, Iterable
Dónde key sera valorTratado1 y values sera un Iterador del tipo de objecto que hayamos definido en el context de map (en nuestro caso DoubleWritable) que tendrá un array con todos aquellos valores que tengan la misma key
Aquí se recogen los datos agrupados por clave que nos envia la clase map definida anteriormente y se tratan de la manera deseada. Una vez realizado este tratamiento se prepara la salida mediante el objeto context :
context.write(key, new Text(valorTratado3));
Veamos unas imágenes del proceso de map reduce para un algoritmo WordCount ( el "hello world" de hadoop) :
Driver
La clase driver es la encargada de ejecutar y configurar el Job de Map Reduce que hayamos creado. Para ello crearemos la clase y extenderemos de , lo que nos obligara a sobreescribir el método run dónde crearemos y configuraremos el job a ejectuar :
public class ExampleDriver extends Configured implements Tool{
@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
return 0;
}
}
Aquí definiremos todo lo relacionado a nuestra ejecución o job, para ello crearemos un nuevo job:
final Job job = Job.getInstance(getConf(), "NombreDelJob");
job.setJarByClass(ExampleDriver.class);
//Archivo de entrada HDFS
job.setInputFormatClass(TextInputFormat.class);
//Archivo de salida HDFS
job.setOutputFormatClass(TextOutputFormat.class);
//Definimos clase Map
job.setMapperClass(ExampleMapper.class);
//Definimos clase Redice
job.setReducerClass(ExampleReducer.class);
//Sortida del map, definim tipus parametre
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
//Sortida Reduce, definim tipus parametre
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//Definimos los paths de entrada y salida segun los parametros recogidos por linia de comandos
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return 0;
Es una buena práctica crear un método que borre el direcotrio de destino de los datos y ejecutarlo antes de crear la instancia de job ya que hadoop falla si ya existe esa carpeta y tiene contenido :
private void deleteOutputFileIfExists(String path) throws IOException {
final Path output = new Path(path);
FileSystem.get(output.toUri(), getConf()).delete(output, true);
}
Luego crearemos el punto de entrada con un método main donde, mediante el ToolRunner crearemos una instancia de nuestra clase ExampleDriver pasandole los argumentos que hayamos recogido des de la linia de comandos :
public static void main(String[] args) throws Exception {
ToolRunner.run(new ExampleDriver(), args);
}
Para realizar la ejecución del jar de Map reduce creado ejecutaremos :
hadoop jar MapReduceEx1.jar com.mapreduceex1.ExampleDriver /user/add/archivo.csv /user/add/salida
Donde /user/add/calidad.csv será la ruta de nuestro hdfs (en hadoop) previamente importado y
/user/add/salida será el directorio donde se dejará el resultado (en hadoop)
Estos dos parámetros los hemos configurado por código