Introdução ao Apache Spark
Este artigo mostrará as principais funcionalidades do Apache Spark, que é uma ferramenta Big Data para o processamento de grandes conjuntos de dados de forma distribuída e paralela.
O Apache Spark é uma ferramenta Big Data que tem o objetivo de processar grandes conjuntos de dados de forma paralela e distribuída. Ela estende o modelo de programação MapReduce popularizado pelo Apache Hadoop, facilitando bastante o desenvolvimento de aplicações de processamento de grandes volumes de dados. Além do modelo de programação estendido, o Spark também apresenta uma performance muito superior ao Hadoop, chegando em alguns casos a apresentar uma performance quase 100x maior.
Outra grande vantagem do Spark, é que todos os componentes funcionam integrados na própria ferramenta, como o Spark Streamming, o Spark SQL e o GraphX, diferentemente do Hadoop, onde é necessário utilizar ferramentas que se integram a ele, mas que são distribuídas separadamente, como o Apache Hive. Além disso, outro aspecto importante é que ele permite a programação em três linguagens: Java, Scala e Python.
O Spark tem diversos componentes para diferentes tipos de processamentos, todos construídos sobre o Spark Core, que é o componente que disponibiliza as funções básicas para o processamento como as funções map, reduce, filter e collect. Entre estes destacam-se s presentes na Figura 1:
- O Spark Streamming, que possibilita o processamento de fluxos em tempo real;
- O GraphX, que realiza o processamento sobre grafos;
- O SparkSQL para a utilização de SQL na realização de consultas e processamento sobre os dados no Spark;
- A MLlib, que é a biblioteca de aprendizado de máquina, com deferentes algoritmos para as mais diversas atividades, como clustering.
Esse artigo mostrará as principais funcionalidades do Spark Core como as transformações, que são métodos para realizar operações como filtros e mapeamentos, além das ações que são operações para a realização de contagens e somatórios.
Arquitetura do Spark
Nessa seção serão explicadas as principais funcionalidades do Spark Core. Primeiro, será mostrada a arquitetura das aplicações e depois veremos os conceitos básicos no modelo de programação para o processamento de conjuntos de dados.
A arquitetura de uma aplicação Spark é constituída por três partes principais:
- O Driver Program, que é a aplicação principal que gerencia a criação e é quem executará o processamento definido pelo programados;
- O Cluster Manager é um componente opcional que só é necessário se o Spark for executado de forma distribuída. Ele é responsável por administrar as máquinas que serão utilizadas como workers;
- Os Workers, que são as máquinas que realmente executarão as tarefas que são enviadas pelo Driver Program. Se o Spark for executado de forma local, a máquina desempenhará tanto o papel de Driver Program como de Worker.
A Figura 2 mostra a arquitetura do Spark e seus principais componentes.
Além da arquitetura, é importante conhecer os principais componentes do modelo de programação do Spark. Existem três conceitos fundamentais que serão utilizados em todas as aplicações desenvolvidas:
- Resilient Distributed Datasets (RDD): abstraem um conjunto de objetos distribuídos no cluster, geralmente executados na memória principal. Estes podem estar armazenados em sistemas de arquivo tradicional, no HDFS (HadoopDistributed File System) e em alguns Banco de Dados NoSQL, como Cassandra e HBase. Ele é o objeto principal do modelo de programação do Spark, pois são nesses objetos que serão executados os processamentos dos dados.
- Operações: representam transformações (como agrupamentos, filtros e mapeamentos entre os dados) ou ações (como contagens e persistências) que são realizados em um RDD. Um programa Spark normalmente é definido como uma sequência de transformações ou ações que são realizadas em um conjunto de dados.
- Spark Context: o contexto é o objeto que conecta o Spark ao programa que está sendo desenvolvido. Ele pode ser acessado como uma variável em um programa que para utilizar os seus recursos.
O objetivo desse artigo é apresentar apenas o modelo de programação do Spark utilizando os RDDs e as operações executando apenas em uma máquina local. Você pode configurar e criar um cluster Spark para execução distribuída e paralela das aplicações, mas boa notícia é que as aplicações criadas para rodar localmente funcionam da mesma forma em um cluster.
Desenvolvimento de Aplicações
Configurar uma aplicação Spark é bastante simples: basta adicionar a dependência da ferramenta no Maven. Para o desenvolvimento das aplicações desse artigo foi utilizado o Maven na IDE Eclipse, porém é possível desenvolver os mesmos exemplos em qualquer IDE. A Listagem 1 mostra o arquivo pom.xml do projeto.
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.santana.devmedia</groupId>
<artifactId>spark-examples</artifactId>
<version>0.0.1</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.2</version>
</dependency>
</dependencies>
</project>
Os RDDs, que foram descritos na seção arquitetura do Spark, são o principal componente para a programação no Spark, pois eles armazenam os dados na memória, podendo fazer diversas operações. Além dos comandos para carregar os dados, o Spark tem dois tipos de operações principais: as transformações e as ações.
As transformações são responsáveis por transformar um RDD em outro RDD como, por exemplo, filtros e mapeamentos. Já as ações são operações para manipular diretamente os dados como, por exemplo, realizar algum cálculo com os valores do RDD ou salvar os dados em um arquivo.
Todos os exemplos desse artigo utilizarão como dado de entrada um arquivo com as leituras das posições dos ônibus que é disponibilizada pela prefeitura de São Paulo para o acompanhamento do transporte público da cidade. Caso tenha interesse nesses dados, na seção Links há o endereço da API OlhoVivo, ou download pelo site.
A Listagem 2 mostra um exemplo desse arquivo com algumas das leituras dos dados, que são o código do ônibus, o código da linha do ônibus, o nome da linha, o horário da leitura da posição do ônibus, e a latitude e longitude do ônibus na hora da leitura.
546 1745 SHOP.CENTER.NORTE 18:40 -23.511788000000003 -46.62516575
33314 1745 VL.NOVA.CACHOEIRINHA 18:40 -23.479581500000002 -46.65016075
673 174M MUSEU.DO.IPIRANGA 18:40 -23.500357 -46.615757
33431 715M JD.MARIA.LUIZA 18:40 -23.534662124999997 -46.62369675
33441 775A JD.ADALGIZA 18:40 -23.5346621253459997 -46.6546369675
33441 174M JD.BRASIL 18:40 -23.534662124999997 -46.64562369675
Como vimos, o código apenas conta o número de linhas de registros de ônibus que existem no arquivo. A Listagem 3 mostra o código desse exemplo. Todo código do Spark é um processo que pode ser todo programado no método main de uma classe Java: as duas primeiras linhas são a configuração do Spark na aplicação com as classes SparkConf e JavaSparkContext. O parâmetro “local” do método setMaster indica que o processo será executado apenas na máquina local, e o parâmetro “BusProcessor” do método setAppName apenas indica um nome para a aplicação. A classe JavaRDD é a que contém um conjunto de dados que será processado 3 os dados são carregados de um arquivo texto no método textFile, onde o parâmetro é o caminho para um arquivo texto no formato apresentado na Listagem 2. Na linha seguinte é utilizada a ação vcount para contar o número de registros que existe no RDD linhas e depois é apenas apresentado esse número.
package com.devmedia.transformation;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class Exemplo2 {
public static void main(String[] args) {
// configuração do Spark
SparkConf conf = new SparkConf().setMaster("local").setAppName("BusProcessor");
JavaSparkContext ctx = new JavaSparkContext(conf);
// carrega os dados dos ônibus de sp
JavaRDD<String> linhas = ctx.textFile("c:/dev/teste7.log");
long numeroLinhas = linhas.count();
// escreve o número de ônibus que existem no arquivo
System.out.println(numeroLinhas);
ctx.close();
}
}
Uma operação bastante útil é a filter, que filtra apenas os RDDs que contenham a parte de uma String passada como parâmetro, como mostra a Listagem 4. Nesse exemplo, a configuração do Spark e o criação do RDD são iguais às do exemplo anterior, a diferença, é que agora a partir do RDD original que tem todos os registros dos ônibus são filtrados apenas aqueles que contenham a String “JD.BONFIGLIOLI”. Depois disso é utilizada a operação collect, que transforma um RDD em um ArrayList que pode ser manipulado normalmente.
package com.devmedia.transformation;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class Exemplo3 {
public static void main(String[] args) {
// configuração do Spark
SparkConf conf = new SparkConf().setMaster("local").setAppName("BusProcessor");
JavaSparkContext ctx = new JavaSparkContext(conf);
// carrega os dados dos ônibus de sp
JavaRDD<String> linhas = ctx.textFile("c:/dev/teste7.log");
// filtra os registros de ônibus pelo nome da linha
JavaRDD<String> linhasFiltradas = linhas.filter(s -> s.contains("JD.BONFIGLIOLI"));
// mostra todos os ônibus filtrados
List<String> resultados = linhasFiltradas.collect();
for (String linha : resultados) {
System.out.println(linha);
}
ctx.close();
}
}
Outra operação interessante é a Union, que une os dados de dois RDDs, como por exemplo, caso seja necessário juntar os registros dos ônibus de dois dias diferentes para fazer alguma análise, como mostra a Listagem 5.
Novamente, a configuração da aplicação é a mesma, porém, ao carregar os dados são criados dois RDDs: um com os dados de sábado e outro com os dados de domingo. Ambos são filtrados com apenas os registros que sejam da linha “JD.BONFIGLIOLI”, e por fim, utilizando o método union, os RDD são unidos em apenas um RDD chamado linhasUniao.
package com.devmedia.transformation;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class Exemplo4 {
public static void main(String[] args) {
// configuração do Spark
SparkConf conf = new SparkConf().setMaster("local")
.setAppName("BusProcessor");
JavaSparkContext ctx = new JavaSparkContext(conf);
// carrega os dados dos ônibus de sp de sábado e domingo
JavaRDD<String> linhasSabado = ctx.textFile("c:/dev/teste7.log");
JavaRDD<String> linhasDomingo = ctx.textFile("c:/dev/teste8.log");
// filtra os ônibus pelo nome da linha
JavaRDD<String> linhasFiltradasSabado = linhasSabado.filter(s ->
s.contains("JD.BONFIGLIOLI"));
JavaRDD<String> linhasFiltradasDomingo = linhasDomingo.filter(s ->
s.contains("JD.BONFIGLIOLI"));
// une os dados de sábado e domingo
JavaRDD<String> linhasUniao =
linhasFiltradasSabado.union(linhasFiltradasDomingo);
// mostra os ônibus resultantes da união
List<String> resultados = linhasUniao.collect();
for (String linha : resultados) {
System.out.println(linha);
}
ctx.close();
}
}
Assim como é possível carregar os dados de um arquivo texto, também é possível salvar os resultados. Os dados do exemplo anterior que une os registros dos ônibus de dois dias podem ser salvos utilizando o método saveAsTextFile e passando como parâmetro o caminho do arquivo que será salvo. A Listagem 6 mostra o código dessa operação.
kage com.devmedia.load.save;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class Exemplo1 {
public static void main(String[] args) {
// configuração do Spark
SparkConf conf = new SparkConf().setMaster("local").setAppName("BusProcessor");
JavaSparkContext ctx = new JavaSparkContext(conf);
// carrega os dados dos ônibus de sp
JavaRDD<String> linhasSabado = ctx.textFile("c:/dev/teste7.log");
JavaRDD<String> linhasDomingo = ctx.textFile("c:/dev/teste8.log");
// filtra os ônibus pelo nome da linha
JavaRDD<String> linhasFiltradasSabado = linhasSabado.filter
(s -> s.contains("JD.BONFIGLIOLI"));
JavaRDD<String> linhasFiltradasDomingo = linhasDomingo.filter
(s -> s.contains("JD.BONFIGLIOLI"));
// une os dados de sábado e domingo
JavaRDD<String> linhasUniao = linhasFiltradasSabado.union(linhasFiltradasDomingo);
// salva os dados da união em um arquivo
linhasUniao.saveAsTextFile("c:/dev/onibus-uniao.txt");
ctx.close();
}
}
Por último, a operação mais conhecida desse tipo de ferramenta que é o map reduce, como mostra a Listagem 7.
Inicialmente os dados do arquivo são carregados em um RDD e em seguida, utilizando o método mapToPair, as Strings do arquivo são mapeados para o nome da linha do ônibus. Por isso o s.split(“ “)[2], que divide a String em tokens separados por um espaço em branco. O terceiro dado dos tokens é o nome da linha do ônibus e o número 1, que indica que é um registro de uma linha. Depois é executado o método reduceByKey, que agrupa todos os resultados que tem a mesma chave, isso é, a mesma linha de ônibus, e soma os valores das linhas (que são todos 1). Por fim, todos os registros iguais serão agrupados e o somatório de quantos ônibus da mesma linha existem é feito.
package com.devmedia.map.reduce;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class Exemplo1 {
public static void main(String[] args) {
// configuração do Spark
SparkConf conf = new SparkConf().setMaster("local").setAppName("BusProcessor");
JavaSparkContext ctx = new JavaSparkContext(conf);
// carrega os dados dos ônibus de sp
JavaRDD<String> onibus = ctx.textFile("c:/dev/teste7.log");
// faz o map com as linhas de ônibus
JavaPairRDD<String, Integer> agrupaOnibus = onibus
.mapToPair(s -> new Tuple2<String, Integer>(s.split(" ")[2], 1));
JavaPairRDD<String, Integer> numeroOnibus = agrupaOnibus.reduceByKey((x, y) -> x + y);
List<Tuple2<String, Integer>> lista = numeroOnibus.collect();
// mostra as linhas e o número de ônibus da linha
for (Tuple2<String, Integer> onibusNumero : lista) {
System.out.println("Linha: " + onibusNumero._1());
System.out.println("Número de ônibus: " + onibusNumero._2());
}
}
}
Já o método collect retorna uma lista com objetos do tipo Tuple2, que é um mapa onde a chave é o nome da linha dos ônibus e o valor é o número de ônibus de uma determinada linha. No fim do código é feita uma iteração por essa lista. Para mostrar o nome da linha é utilizado o método onibusNumero._1(), e para mostra o número de ônibus da linha é utilizado o método onibusNumero._2().
O resultado da execução desse código, visto na Listagem 8, mostra o nome da linha de ônibus e a quantidade de ônibus que realizam essa rota.
Linha: TATUAPE
Número de ônibus: 7
Linha: JD.PERY.ALTO
Número de ônibus: 11
Linha: METRO.TUCURUVI
Número de ônibus: 8
Linha: SAO.MIGUEL
Número de ônibus: 6
Linha: JD.BONFIGLIOLI
Número de ônibus: 8
Linha: TERMINAL.PINHEIROS
Número de ônibus: 14
Linha: TERM.PRINCESA.ISABEL
Número de ônibus: 2
Linha: PEDRA.BRANCA
Número de ônibus: 4
Linha: Jardim.Pery
Número de ônibus: 4
Linha: COHAB.BRASILANDIA
Número de ônibus: 13
Linha: JD.MARIA.LUIZA
Número de ônibus: 10
Linha: COHAB.CHRIS
Número de ônibus: 12
Linha: JD.ADALGIZA
Número de ônibus: 11
Como vimos, é mostrado o nome da linha do ônibus e o número de ônibus que existem no arquivo que fazem essa linha.
Esse artigo mostrou os conceitos introdutórios do Apache Spark, que é uma das principais ferramentas Big Data para o processamento de grandes conjuntos de dados. Existem muito mais detalhes e usos dessa ferramenta que não foi possível cobrir nesse artigo, mas para quem quiser ir mais a fundo no Spark verifiquem os links que são apresentados na seção Links, como a documentação oficial do mesmo, onde podem ser encontradas outras funcionalidades da ferramenta, como o processamento em tempo real e outras operações disponíveis.
Artigos relacionados
-
Artigo
-
Artigo
-
Artigo
-
Artigo
-
Vídeo