Hadoop MapReduce: Como criar um Programa MapReduce Base

Veja neste artigo o MapReduce, ferramenta poderosa para o processamento de dados massivos. Veja também como criar um programa MapReduce Base

O Hadoop é um framework para processamento e armazenamento de dados massivos em clusters de computadores. Seu modelo de processamento, o MapReduce, é tido como uma grande ferramenta para análise de dados massivos paralelamente. A grande vantagem desse estilo de programação baseia-se no fato de que o programador não precisa se preocupar com detalhes importantes no processamento paralelo, como escalonamento de tarefas. Tudo isso é controlado intrinsecamente pelo Hadoop. Este artigo explica o modelo de programação do MapReduce, como funciona, vantagens e desvantagens do modelo.

Escalonamento de Processos

Primeiramente, antes de tratar especificamente do Hadoop MapReduce, é importante entender o escalonamento de processos. Esse conceito é essencial, uma vez que o MapReduce trabalha internamente com ele para realizar o processamento paralelo entre vários nós do cluster. E, com isso, é possível entender os desafios na hora de escalonar um programa de processamento de dados. Isso auxilia na observação dos benefícios do uso do modelo de programação MapReduce.

Para entender melhor esse conceito, vamos analisar um exemplo. No exemplo, será contado o número de vezes que cada palavra aparece em um conjunto de documentos de texto. Para uma tarefa simples assim, é possível escrever um programa sequencial, como o pseudocódigo da Listagem 1, que obterá o resultado.

Listagem 1. Contador de palavras sequencial

for each documento in conj { palavras = tokenize(document); for each p in palavras { contPalavra[p]++; } }

Como é possível observar, o programa percorre todos os documentos, extraindo as palavras uma a uma, em um processo de tokenização (salva todos os conjuntos de caracteres até um espaço), e incrementando a contagem de cada palavra em um. Observe que o índice no contador de palavras é a própria palavra, “p”. Isso é interessante, uma vez que, geralmente, se está acostumado com um índice numérico. Ao final, todas as entradas (palavras) podem ser mostradas em uma tabela, contendo o índice (a própria palavra) e o número de vezes que essa palavra apareceu.

É importante perceber que, embora o programa atinja o seu objetivo para pequenos conjuntos de documentos, conforme o conjunto aumenta é muito provável que sejam encontrados problemas com essa abordagem, como falta de memória. Para evitar esses problemas, é possível distribuir o trabalho entre várias máquinas, fazendo com que cada uma delas processe uma parte dos documentos e, ao final, realizar a junção desses resultados em um único arquivo, por exemplo. Porém, essa abordagem ignoraria alguns requisitos de performance (como o tempo de distribuição dos processos e junção dos resultados), além de possíveis problemas de falta de memória, falhas de servidor, entre outros. Para evitar esses problemas, a resposta é a utilização do MapReduce, onde o Hadoop tira esses requisitos da alçada do desenvolvedor, cuidando de todos esses problemas de escalabilidade, que tornam o desenvolvimento da aplicação muito complexo, a ponto de inviabilizá-la.

Hadoop MapReduce

Porém, o que é o MapReduce? É um modelo de programação para processamento de dados de forma paralela. O modelo é simples, embora não seja tão simples escrever programas úteis o utilizando. Isto ocorre porque o MapReduce trabalha com duas primitivas de processamento de dados, Mapper e Reducer, o que torna a programação não muito trivial, além de ser diferente do que a maior parte dos desenvolvedores está acostumada. Mas o mais importante a se ressaltar é que os programas MapReduce são inerentemente paralelos, o que o coloca como uma solução fantástica para problemas que envolvam análise de dados em larga escala. Com isso, uma vez que a aplicação MapReduce está escrita, escalar a mesma para rodar em cima de centenas, milhares ou milhões de dados é apenas uma questão de mudança de configuração.

Como é possível deduzir, o MapReduce possui duas fases: mapeamento e redução. Na fase de mapeamento, o MapReduce pega os dados de entrada e envia cada um dos elementos de dados para a função Mapper. Já na fase de redução, a função Reducer processa todas as saídas da função Mapper e chega a um resultado final. Em outros termos, a função Mapper é feita para filtrar e transformar os dados que serão agregados pela função Reducer.

O MapReduce foi desenvolvido após muitas experiências em escalonamento de processos, o que faz com que ele seja excelente para aplicações distribuídas. Também é daí que vem a similaridade do modelo de processamento MapReduce com muitos programas escalonáveis. Porém, a despeito da maior facilidade no uso, ainda é necessário um entendimento com relação à estrutura de dados que será processada. No caso do MapReduce, as estruturas utilizadas são listas e pares chave/valor.

Como sabemos, qualquer aplicação possui um fluxo de dados. No MapReduce, ele funciona da seguinte forma: a entrada da aplicação é, como já informado, uma lista de pares chave/valor. Então, esses pares são pegos um a um e processados, cada um gerando um par chave/lista de valores. Os detalhes dessa transformação é que normalmente definem o que o programa MapReduce faz. A partir daí, essa nova lista de pares é pega como entrada pela função Reducer e é agregada de alguma forma, gerando uma saída final.

Criando um Programa MapReduce Base

A grande maioria dos programas MapReduce são escritos brevemente, como variações de um template. Ou seja, quando escreve-se um novo programa MapReduce, geralmente pega-se um existente e o modifica até que ele faça o que o desenvolvedor deseja. Por isso, é muito importante a criação de um programa básico, para o qual servirão de base todos os demais.

Para isso, a convenção é que uma única classe defina completamente os Jobs do MapReduce. Isso porque essa classe será executada apenas na máquina cliente, enquanto, quando rodando em um cluster de computadores, as classes Mapper e Reducer estarão executando em vários nós diferentes. Mas é importante ter em mente que essas classes são independentes, e não irão interagir muito com a classe do job. Além disso, outra grande vantagem dessa abordagem é que tudo cabe em apenas um arquivo, simplificando a administração do código.

O núcleo da classe do job é o método run(), também conhecido como driver, ou motorista. Esse método é o grande responsável por, como seu próprio nome sugere, executar um job do Hadoop MapReduce. Ele instancia, configura e passa um objeto de configuração do job (JobConf) para o método runJob(), do JobClient, para iniciar o job MapReduce. Esse objeto que é passado contém todas as configurações necessárias para que o job seja iniciado e consiga executar satisfatoriamente. Como é possível ver-se pela Listagem 2, a função especifica os caminhos de entrada e saída e as classes Mapper e Reducer, que são essenciais para qualquer job MapReduce. Há diversas outras configurações que podem ser modificadas de acordo com a necessidade, como também se vê abaixo. Entre essas configurações, é válido destacar o nome do job (job.setJobName()), que pode ser definido da forma que o desenvolvedor desejar. Além disso, o formato dos dados de entrada e saída também podem, e devem, ser definidos conforme a aplicação sendo desenvolvida. Como trata-se de um template genérico, que será utilizado para várias aplicações MapReduce, o formato de dados é o mais comum, o Text.

Listagem 2. Método especial run()

public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, TemplateHadoop.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPath(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("Teste"); job.setMapperClass(MapClass.class); job.setReducerClass(ReducerClass.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); JobClient.runJob(job); return 0; }

A configuração do job não precisa estar necessariamente no método run(). Na realidade, ele nem precisaria existir e tudo poderia estar dentro do método main. A importância do método run() está no fato de que ele permite que o usuário altere alguns dos parâmetros de configuração durante a chamada do job. Isso é interessante, por exemplo, para depuração de código, quando o desenvolvedor deseja que apenas a saída da função mapper (ou reducer) seja observada. E isso só é possível através desse método, que é capaz de definir seu próprio conjunto de comando e processar os argumentos do usuário. Por isso, o método main apenas utiliza a chamada para um ToolRunner, responsável por habilitar o job a entender opções de usuário, como a especificação de um arquivo de configuração, a definição de um valor para uma propriedade ou a especificação de um nó no qual o job irá ser executado. A Listagem 3 mostra o método main, utilizando um ToolRunner, do esqueleto de uma aplicação MapReduce.

Listagem 3. Método main()

public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new TemplateHadoop(), args); System.exit(res); }

Agora, falando das funções que dão nome ao MapReduce, temos as classes estáticas MapClass e ReducerClass. Ambas herdam da classe MapReduceBase, que é uma pequena classe que provê implementações de dois métodos necessários por ambas, configure() e close(). Esses métodos são utilizados para setar e limpar as tarefas map e reduce. Eles podem ser sobrescrevidos, mas nesse caso isso não é necessário; apenas em Jobs mais avançados. Ambas as classes possuem uma assinatura, que pode ser vista na Listagem 4. É importante observar que cada uma delas possui dois pares chave/valor, sendo que o par de saída da classe Mapper é o par de entrada da classe Reducer. Além disso, esses pares são utilizados como parâmetros nas funções map e reduce. É importante ressaltar a classe Context. Ela é utilizada para escrever o par chave/valor na saída. Outra classe utilizada com o mesmo fim é a OutputCollector<K, V>.

Listagem 4. Assinatura das classes Mapper e Reducer

public static class MapClass extends MapReduceBase implements Mapper<K1, V1, K2, V2> { public void map(K1 key, V1 value, Context context) throws IOException { } } public static class ReducerClass extends MapReduceBase implements Reducer<K2, V2, K3, V3> { public void reduce(K2 key, Iterator<V2> values, Context context) throws IOException { } }

O centro das ações em ambas as classes são suas funções map() e reduce(). Cada chamada da função map() recebe um par chave/valor de qualquer tipo K1 e V1. O par gerado por ela é escrito pelo função write() do objeto Context. Já cada chamada do método reduce() recebe uma chave do tipo K2 e um valor do tipo V2, que devem ser os mesmos da saída da função map(). Geralmente esse método possuirá um laço para percorrer todos os valores de tipo V2. E, novamente, o par gerado pela função será escrito pelo objeto Context através da função write(). Nesse caso, como está sendo criado um template genérico, todos os valores e chaves serão do tipo Text. A Listagem 5 mostra o código das classes Mapper e Reducer do template base de programas MapReduce.

Listagem 5. Classes Mapper e Reducer

public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> { public void map(Text key, Text value, Context context) throws IOException { context.write(value, key); } } public static class ReducerClass extends MapReduceBase implements Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterator<Text> values, Context context) throws IOException { context.write(key, new Text()); } }

Vale lembrar que todos os valores e chaves devem ser de subtipos de Writable. Isso garante a interface de serialização para o Hadoop enviar os dados entre os nós de um cluster de computadores. De fato, as chaves implementam WritableComparable, uma subinterface da primeira. Isso porque as chaves precisam dar suporte adicional ao método compareTo(), pois são utilizadas para ordenação em vários lugares do framework.

Modificando o Programa Base para a Necessidade: Contador de Palavras

Raramente escreve-se um programa MapReduce do zero; o template é modificado para que atenda as especificações do novo programa. Como o template já está construído, basicamente o que precisa ser feito é ajustar o programa para que ele conte o número de palavras de um documento qualquer. Para isso, é preciso ter atenção com o formato do arquivo, e a forma como os dados estão dispostos no mesmo.

Como se trata de um programa simples, a partir do template não há muito a ser feito. É necessário ter atenção com os tipos de dados, entretanto. Como já é sabido, o MapReduce trabalha com pares key/value. Primeiramente, é interessante focar no par de entrada da função Mapper. Como a chave desse par não será utilizada para nenhum tipo de processamento, é interessante que ela seja do tipo Object, para ficar o mais genérica possível. Já o valor terá o tipo Text, pois conterá as linhas dos arquivos que serão processados. A partir daí, a linha é processada para que seja obtido cada palavra, que será a chave do par de saída da função map (e, como sabemos, entrada da função reduce). A partir do fato de que será realizada uma contagem, o valor do par de saída é um IntWritable de valor 1 (um), o que irá facilitar a contagem durante a execução da função reduce().

No exemplo do contador de palavras da Listagem 6, observa-se que a contagem de palavras chega ao mesmo resultado do contador sequencial discutido anteriormente, porém com uma abordagem totalmente diferente. No caso a seguir, a função map pega o valor (do par key/value), uma das linhas do documento, e faz a tokenização da mesma, através da classe StringTokenizer. Assim, enquanto houver palavras na linha, elas serão adicionadas ao parâmetro context, gerando um novo par chave/valor, que será utilizado como entrada na função reduce. A chave será do tipo texto, enquanto o valor será um IntWritable de valor 1. Já a função reduce é utilizada apenas para realizar a contagem das vezes que as palavras foram encontradas, gerando um resultado, novamente em um par chave/valor, cuja chave é o nome da palavra e o valor é o número de vezes que a mesma foi encontrada.

Listagem 6. Contador de palavras com MapReduce

private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ StringTokenizer itr = new StringTokenizer(value.toString()); while(itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word, one); } } ... private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{ int sum = 0; for (IntWritable val : values){ sum += val.get(); } result.set(sum); context.write(key, result); }

Como é possível observar analisando o código, o programa na Listagem 6 diferencia palavras antes de pontuação e com letras maiúsculas. Deste modo, se houvesse no texto as frases: “Almocei ontem, mas...”, “Ontem fiz...” e “Saí ontem e...”, as três palavras “ontem” seriam contadas como três palavras diferentes (“ontem,”, “Ontem” e “ontem”). Isto acontece porque a classe StringTokenizer, por padrão, seleciona conjuntos de caracteres separados por espaço. Para resolver esse pequeno problema, basta adicionar a pontuação à lista de caracteres delimitadores do objeto itr e colocar todas as palavras para minúsculas (ou maiúsculas, é uma questão de escolha) antes de adicioná-las à lista de valores. Além disso, vale ressaltar as classes IntWritable e Text, definidas no pacote org.apache.hadoop.io para inteiros e strings.

Este artigo procurou abordar em detalhes o funcionamento do Hadoop MapReduce e seu modelo de programação. Trata-se de uma ferramenta muito interessante para todos que precisam trabalhar com dados massivos de qualquer natureza, pois fornece diversos elementos excelentes para processamento paralelo. Porém, é preciso estar atento à aplicação desse tipo de programação. O Hadoop MapReduce pode não ser uma boa para situações que não envolvam muitos dados, devido à dificuldades intrínsecas à programação distribuída.


A grande vantagem do MapReduce é que ele é fácil de ser utilizado. O desenvolvedor não tem necessidade de aprender toda a teoria que envolve o processamento de dados massivos, bem como sistemas de arquivos distribuídos. O Hadoop já lida com isso, e muito bem. Além disso, ele não é totalmente independente, no sentido de que não se sabe o que está acontecendo durante o processamento. Há uma série de ferramentas de administração, entre as quais a interface Reporter, que funciona para fornecer relatórios do funcionamento dos Jobs do MapReduce, principalmente. E a programação é simples, desde que sejam entendidos os conceitos que envolvem o paradigma. Raramente serão encontradas funções map e reduce muito grandes em um programa, o que mostra a simplicidade do modelo de programação MapReduce, e como ele pode ser útil em diversas aplicações de processamento de dados.

Artigos relacionados