lunes, 22 de junio de 2015

Ejemplo Map Reduce

Vamos a ver el proceso necesario para crear un job de map reduce en hadoop.

Son dos los jars necesarios que importaremos en nuestro proyecto de eclipse y que se pueden descargar de :

common
mapreduce-client-core

Para implementar un Map Reduce se deben crear tres clases :

Map


public static class ExampleMapper extends Mapper



Ha de extender de la clase Mapper de hadoop y tendremos que sobreescribir el metodo map, que tiene como atributos key, value y context



public void map(Object key, Text value, Context context) throws IOException, InterruptedException {



Aquí se reciben los bloques de datos de información y la deberemos tratar linía a linía, para ello definiremos el comportamiento deseado de nuestro algoritmo y tambien determinaremos los valores de entrada ( key: un apuntador hacía nuestro fichero | value: cada una de las linias) y el formato de salida que se convertirá en la entrada del Reduce (context), por ejemplo :



context.write(new Text(valorTratado1), new DoubleWritable(Double.parseDouble(valorTratado2)));




Reduce



public static class ExampleReducer extends Reducer{



Al igual que en el caso de map, reduce debe sobreescibir el método reduce :



public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {



Dónde key sera valorTratado1 y values sera un Iterador del tipo de objecto que hayamos definido en el context de map (en nuestro caso DoubleWritable) que tendrá un array con todos aquellos valores que tengan la misma key

Aquí se recogen los datos agrupados por clave que nos envia la clase map definida anteriormente y se tratan de la manera deseada. Una vez realizado este tratamiento se prepara la salida mediante el objeto context  :



context.write(key, new Text(valorTratado3));




Veamos unas imágenes del proceso de map reduce para un algoritmo WordCount ( el "hello world" de hadoop) :







Driver


La clase driver es la encargada de ejecutar y configurar el Job de Map Reduce que hayamos creado. Para ello crearemos la clase y extenderemos de , lo que nos obligara a sobreescribir el método run dónde crearemos y configuraremos el job a ejectuar :


public class ExampleDriver extends Configured implements Tool{

@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
return 0;
}
}


Aquí definiremos todo lo relacionado a nuestra ejecución o job, para ello crearemos un nuevo job:



final Job job = Job.getInstance(getConf(), "NombreDelJob");

job.setJarByClass(ExampleDriver.class);


//Archivo de entrada HDFS
        job.setInputFormatClass(TextInputFormat.class);

//Archivo de salida HDFS
        job.setOutputFormatClass(TextOutputFormat.class);


//Definimos clase Map
        job.setMapperClass(ExampleMapper.class);
//Definimos clase Redice

        job.setReducerClass(ExampleReducer.class);
 //Sortida del map, definim tipus parametre
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);
//Sortida Reduce, definim tipus parametre
        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

//Definimos los paths de entrada y salida segun los parametros recogidos por linia de comandos
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));


        job.waitForCompletion(true);

        return 0;


Es una buena práctica crear un método que borre el direcotrio de destino de los datos y ejecutarlo antes de crear la instancia de job ya que hadoop falla si ya existe esa carpeta y tiene contenido :


private void deleteOutputFileIfExists(String path) throws IOException {
       final Path output = new Path(path);
       FileSystem.get(output.toUri(), getConf()).delete(output, true);
}



Luego crearemos el punto de entrada con un método main donde, mediante el ToolRunner crearemos una instancia de nuestra clase ExampleDriver pasandole los argumentos que hayamos recogido des de la linia de comandos :



 public static void main(String[] args) throws Exception {
       ToolRunner.run(new ExampleDriver(), args);
}



Para realizar la ejecución del jar de Map reduce creado ejecutaremos :


hadoop jar MapReduceEx1.jar com.mapreduceex1.ExampleDriver /user/add/archivo.csv /user/add/salida

Donde /user/add/calidad.csv será la ruta de nuestro hdfs (en hadoop) previamente importado y
 /user/add/salida será el directorio donde se dejará el resultado (en hadoop)

Estos dos parámetros los hemos configurado por código