Threads: paralelizando tarefas com os diferentes recursos do Java
Saiba como criar threads utilizando desde os recursos da primeira versão até as novidades do Java 8 e garanta mais desempenho ao seu software.
A plataforma Java disponibiliza diversas APIs para implementar o paralelismo desde as suas primeiras versões, e estas veem evoluindo até hoje, trazendo novos recursos e frameworks de alto nível que auxiliam na programação. No entanto, deve-se lembrar que a tecnologia não é tudo. É importante, também, conhecer os conceitos desse tipo de programação e boas práticas no desenvolvimento voltado para esse cenário.
O processamento paralelo, ou concorrente, tem como base um hardware multicore, onde dispõe-se de vários núcleos de processamento. Estas arquiteturas, no início do Java, não eram tão comuns. No entanto, atualmente já se encontram amplamente difundidas, tanto no contexto comercial como doméstico. Diante disso, para que não haja desperdício desses recursos de hardware e possamos extrair mais desempenho do software desenvolvido, é recomendado que alguma técnica de paralelismo seja utilizada.
Como sabemos, existem diversas formas de criar uma aplicação que implemente paralelismo, formas estas que se diferem tanto em técnicas como em tecnologias empregadas. Em vista disso, no decorrer deste artigo serão contextualizadas as principais APIs Java, desde as threads “clássicas” a modernos frameworks de alto nível, visando otimizar a construção, a qualidade e o desempenho do software.
Arquitetura Multicore
Uma arquitetura multicore consiste em uma CPU que possui mais de um núcleo de processamento. Este tipo de hardware permite a execução de mais de uma tarefa simultaneamente, ao contrário das CPUs singlecore, que eram constituídas por apenas um núcleo, o que significa, na prática, que nada era executado efetivamente em paralelo.
A partir do momento em que se tornou inviável desenvolver CPUs com frequências (GHz) mais altas, devido ao superaquecimento, partiu-se para outra abordagem: criar CPUs multicore, isto é, inserir vários núcleos no mesmo chip, com a premissa base de dividir para conquistar.
Ao contrário do que muitos pensam, no entanto, os processadores multicore não somam a capacidade de processamento, e sim possibilitam a divisão das tarefas entre si. Deste modo, um processador de dois núcleos com clock de 2.0 GHz não equivale a um processador com um núcleo de 4.0 GHz. A tecnologia multicore simplesmente permite a divisão de tarefas entre os núcleos de tal forma que efetivamente se tenha um processamento paralelo e, com isso, seja alcançado o tão almejado ganho de performance.
Contudo, este ganho é possível apenas se o software implementar paralelismo. Neste contexto, os Sistemas Operacionais, há anos, já possuem suporte a multicore, mas isso somente otimiza o desempenho do próprio SO, o que não é suficiente. O ideal é cada software desenvolvido esteja apto a usufruir de todos os recursos de hardware disponíveis para ele.
Ademais, considerando o fato de que hoje já nos deparamos com celulares com processadores de quatro ou oito núcleos, os softwares a eles disponibilizados devem estar preparados para lidar com esta arquitetura. Desde um simples projeto de robótica a um software massivamente paralelo para um supercomputador de milhões de núcleos, a opção por paralelizar ou não, pode significar a diferença entre passar dias processando uma determinada tarefa ou apenas alguns minutos.
Multitasking
O multitasking, ou multitarefa, é a capacidade que sistemas possuem de executar várias tarefas ou processos ao mesmo tempo, compartilhando recursos de processamento como a CPU. Esta habilidade permite ao sistema operacional intercalar rapidamente os processos ativos para ocuparem a CPU, dando a impressão de que estão sendo executados simultaneamente, conforme a Figura 1.
No caso de uma arquitetura singlecore, é possível executar apenas uma tarefa por vez. Mas com o multitasking esse problema é contornado gerenciando as tarefas a serem executadas através de uma fila, onde cada uma executa por um determinado tempo na CPU. Nos sistemas operacionais isto se chama escalonamento de processos.
Em arquiteturas multicore, efetivamente os processos podem ser executados simultaneamente, conforme a Figura 2, mas ainda depende do escalonamento no sistema operacional, pois geralmente temos mais processos ativos do que núcleos disponíveis para processar.
Desta forma, mais núcleos de processamento significam que mais tarefas simultâneas podem ser desempenhadas. Contudo, vale ressaltar que isto só é possível se o software que está sendo executado sobre tal arquitetura implementa o processamento concorrente. De nada adianta um processador de oito núcleos se o software utiliza apenas um.
Multithreading
De certo modo, podemos compreender multithreading como uma evolução do multitasking, mas em nível de processo. Ele, basicamente, permite ao software subdividir suas tarefas em trechos de código independentes e capazes de executar em paralelo, chamados de threads. Com isto, cada uma destas tarefas pode ser executada em paralelo caso haja vários núcleos, conforme demonstra a Figura 3.
Diversos benefícios são adquiridos com este recurso, mas, sem dúvida, o mais procurado é o ganho de performance. Além deste, no entanto, também é válido destacar o uso mais eficiente da CPU. Sabendo dessa importância, nosso próximo passo é entender o que são as threads e como criá-las para subdividir as tarefas do software.
Threads
Na plataforma Java, as threads são, de fato, o único mecanismo de concorrência suportado. De forma simples, podemos entender esse recurso como trechos de código que operam independentemente da sequência de execução principal. Como diferencial, enquanto os processos de software não dividem um mesmo espaço de memória, as threads, sim, e isso lhes permite compartilhar dados e informações dentro do contexto do software.
Cada objeto de thread possui um identificador único e inalterável, um nome, uma prioridade, um estado, um gerenciador de exceções, um espaço para armazenamento local e uma série de estruturas utilizadas pela JVM e pelo sistema operacional, salvando seu contexto enquanto ela permanece pausada pelo escalonador.
Na JVM, as threads são escalonadas de forma preemptiva seguindo a metodologia “round-robin”. Isso quer dizer que o escalonador pode pausá-las e dar espaço e tempo para outra thread ser executada, conforme a Figura 4. O tempo que cada thread recebe para processar se dá conforme a prioridade que ela possui, ou seja, threads com prioridade mais alta ganham mais tempo para processar e são escalonadas com mais frequência do que as outras.
Também é possível observar na Figura 4 que apenas uma thread é executada por vez. Isto normalmente acontece em casos onde só há um núcleo de processamento, o software implementa um sincronismo de threads que não as permite executar em paralelo ou quando o sistema não faz uso de threads. Na Figura 5, por outro lado, temos um cenário bem diferente, com várias threads executando paralelamente e otimizando o uso da CPU.
Desde seu início a plataforma Java foi projetada para suportar programação concorrente. De lá para cá, principalmente a partir da versão 5, foram incluídas APIs de alto nível que nos fornecem cada vez mais recursos para a implementação de tarefas paralelas, como as APIs presentes nos pacotes java.util.concurrent.*.
Saiba que toda aplicação Java possui, no mínimo, uma thread. Esta é criada e iniciada pela JVM quando iniciamos a aplicação e sua tarefa é executar o método main() da classe principal. Ela, portanto, executará sequencialmente os códigos contidos neste método até que termine, quando a thread encerrará seu processamento e a aplicação poderá ser finalizada.
Em Java, existem basicamente duas maneiras de criar threads:
- Estender a classe Thread (java.lang.Thread); e
- Implementar a interface Runnable (java.lang.Runnable).
Na Listagem 1, de forma simples e objetiva, é apresentado um exemplo de como implementar uma Thread para executar uma subtarefa em paralelo. Para isso, primeiramente é necessário codificar um Runnable, o que pode ser feito diretamente na criação da Thread, como demonstrado na Listagem 1, ou implementar uma classe própria que estenda Runnable. Posteriormente, basta executá-lo com um objeto Thread através do método start().
public class ExemploThread {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
//código para executar em paralelo
System.out.println("ID: " + Thread.currentThread().getId());
System.out.println("Nome: " + Thread.currentThread().getName());
System.out.println("Prioridade: " + Thread.currentThread().getPriority());
System.out.println("Estado: " + Thread.currentThread().getState());
}
}).start();
}
}
Neste exemplo pode-se observar também o código utilizado para buscar alguns dados da thread atual, tais como ID, nome, prioridade, estado e até mesmo capturar o código que ela está executando. Além de tais informações que podem ser capturadas, é possível manipular as threads utilizando alguns dos seguintes métodos:
- O método estático Thread.sleep(), por exemplo, faz com que a thread em execução espere por um período de tempo sem consumir muito (ou possivelmente nenhum) tempo de CPU;
- O método join() congela a execução da thread corrente e aguarda a conclusão da thread na qual esse método foi invocado;
- Já o método wait() faz a thread aguardar até que outra invoque o método notify() ou notifyAll(); e
- O método interrupt() acorda uma thread que está dormindo devido a uma operação de sleep() ou wait(), ou foi bloqueada por causa de um processamento longo de I/O.
A forma clássica de se criar uma thread é estendendo a classe Thread, como demonstrado na Listagem 2. Neste código, temos a classe Tarefa estendendo a Thread. A partir disso, basta sobrescrever o método run(), o qual fica encarregado de executar o código da thread.
Na prática, nossa classe Tarefa é responsável por realizar o somatório do intervalo de valores recebido no momento em que ela é criada e armazená-lo em uma variável para que possa ser lido posteriormente.
public class Tarefa extends Thread {
private final long valorInicial;
private final long valorFinal;
private long total = 0;
//método construtor que receberá os parâmetros da tarefa
public Tarefa(int valorInicial, int valorFinal) {
this.valorInicial = valorInicial;
this.valorFinal = valorFinal;
}
//método que retorna o total calculado
public long getTotal() {
return total;
}
/*
Este método se faz necessário para que possamos dar start() na Thread
e iniciar a tarefa em paralelo
*/
@Override
public void run() {
for (long i = valorInicial; i <= valorFinal; i++) {
total += i;
}
}
}
public class Exemplo {
public static void main(String[] args) {
//cria três tarefas
Tarefa t1 = new Tarefa(0, 1000);
t1.setName("Tarefa1");
Tarefa t2 = new Tarefa(1001, 2000);
t2.setName("Tarefa2");
Tarefa t3 = new Tarefa(2001, 3000);
t3.setName("Tarefa3");
//inicia a execução paralela das três tarefas, iniciando três novas threads no programa
t1.start();
t2.start();
t3.start();
//aguarda a finalização das tarefas
try {
t1.join();
t2.join();
t3.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
//Exibimos o somatório dos totalizadores de cada Thread
System.out.println("Total: " + (t1.getTotal() + t2.getTotal() + t3.getTotal()));
}
}
Para testarmos o paralelismo com a classe da Listagem 2, criamos a classe Exemplo com o método main(), responsável por executar o programa (vide Listagem 3). Neste exemplo, após criar as threads, chama-se o método start() de cada uma delas, para que iniciem suas tarefas. Logo após, em um bloco try-catch, temos a invocação dos métodos join(). Este faz com que o programa aguarde a finalização de cada thread para que depois possa ler o valor totalizado por cada tarefa.
Observe, na Listagem 3, que cada tarefa recebe seu intervalo de valores a calcular, sendo somado, ao todo, de 0 a 3000, mas e se tivéssemos uma única lista de valores que gostaríamos de somar para obter o valor total? Neste caso, as threads precisariam concorrer pela lista. Isso é o que chamamos de concorrência de dados e geralmente traz consigo diversos problemas.
Concorrência de dados
A concorrência de dados é um dos principais problemas a se enfrentar quando empregamos multithreading em uma aplicação. Ela é capaz de gerar desde inconsistência nos dados compartilhados até erros em tempo de execução. No entanto, felizmente isto pode ser evitado, sendo necessário, portanto, se precaver para que nosso aplicativo não apresente tais problemas.
Uma boa forma de evitar problemas de concorrência é sincronizar as threads que compartilham dados entre si. A partir disso, estas threads passam a executar em sincronia com outras, e assim, uma por vez acessará o recurso. O sincronismo previne que duas ou mais threads acessem o mesmo recurso simultaneamente. Por outro lado, temos as threads assíncronas, que executam independentemente umas das outras e geralmente não compartilham recursos, como é o caso do exemplo das Listagens 2 e 3.
No exemplo da Listagem 4, por sua vez, é possível visualizar três threads disputando a mesma variável varCompartilhada para incrementá-la de forma assíncrona. Basicamente, a ideia desse código é incrementar uma variável com diferentes valores e, a cada valor gerado, adicioná-lo em uma lista (ArrayList).
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class ExemploAssincrono1 {
private static int varCompartilhada = 0;
private static final Integer QUANTIDADE = 10000;
private static final List<Integer> VALORES = new ArrayList<>();
public static void main(String[] args) {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < QUANTIDADE; i++) {
VALORES.add(++varCompartilhada);
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < QUANTIDADE; i++) {
VALORES.add(++varCompartilhada);
}
}
});
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < QUANTIDADE; i++) {
VALORES.add(++varCompartilhada);
}
}
});
t1.start();
t2.start();
t3.start();
try {
t1.join();
t2.join();
t3.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
int soma = 0;
for (Integer valor : VALORES) {
soma += valor;
}
System.out.println("Soma: " + soma);
}
}
No entanto, ao executar este algoritmo é provável que seja gerada a exceção java.lang.ArrayIndexOutOfBoundsException, devido à concorrência pela lista, visto que há mais de uma thread tentando inserir dados nela. Como o “ponto fraco” desta estrutura de dados é seu mecanismo dinâmico de tamanho variável, a cada novo valor a ser inserido é preciso expandir a lista. Desta forma, a thread perde tempo para fazer esta operação, aumentando assim a possibilidade de ser pausada pelo escalonador. Quando isto acontece e alguma outra thread tenta realizar a mesma operação de add(), a exceção é gerada. Com o intuito de solucionar esse problema, uma das opções é adotar uma lista sincronizada, conforme o código a seguir:
private static final List<Integer> VALORES = Collections.synchronizedList(new ArrayList<>());
Apesar de solucionar o problema anterior, ainda é possível que a thread sofra interrupção durante o incremento da variável varCompartilhada e passe a gerar valores inconsistentes. Isto porque no processo atual de incremento da variável, primeiramente deve ser pego o valor atual desta, somá-lo com 1 e então obter o novo valor a ser armazenado.
Esse problema acontece porque nesse código existem três threads alterando o valor da mesma variável (nesse caso, com o operador ++) e o escalonador, quando aloca uma thread ao processador, permite que ela execute seu código por um determinado período de tempo e depois a interrompe, possibilitando que outra thread ocupe seu lugar e opere sobre os mesmos dados. Assim, quando a thread anterior voltar a processar, trabalhará com valores desatualizados.
Para aferir o resultado deste algoritmo, toda atualização de valor da variável varCompartilhada é adicionada a uma lista e ao final é realizada a soma de todos esses valores. Por causa das situações supracitadas, no entanto, o resultado gerado a cada execução pode ser diferente. Isto demonstra que o incremento de uma variável assíncrona em threads é, sem dúvidas, um problema.
Nota: É preciso destacar que nem sempre ocorrerá esse problema, ou seja, nem sempre uma thread será interrompida durante o seu processamento. Para aumentar as chances desse problema acontecer, foi utilizado um intervalo de 10.000 repetições e três threads. Com um número baixo de iterações, coincidentemente pode ser gerado o mesmo resultado em quase todas as execuções.
O exemplo apresentado na Listagem 5 traz uma derivação do código da Listagem 4. Neste caso, o List foi substituído por um Set, que suporta a inserção de valores de modo assíncrono e ainda garante a unicidade dos valores inseridos. Assim, não mais teremos problemas com o ArrayList e poderemos dar sequência à demonstração do problema de concorrência com a varCompartilhada.
import java.util.HashSet;
import java.util.Set;
public class ExemploAssincrono2 {
private static int varCompartilhada = 0;
private static final Integer QUANTIDADE = 10000;
private static final Set<Integer> VALORES = new HashSet<>();
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < QUANTIDADE; i++) {
boolean novo = VALORES.add(++varCompartilhada);
if (!novo) {
System.out.println("Já existe: " + varCompartilhada);
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < QUANTIDADE; i++) {
boolean novo = VALORES.add(++varCompartilhada);
if (!novo) {
System.out.println("Já existe: " + varCompartilhada);
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < QUANTIDADE; i++) {
boolean novo = VALORES.add(++varCompartilhada);
if (!novo) {
System.out.println("Já existe: " + varCompartilhada);
}
}
}
}).start();
}
}
Ao executar este algoritmo diversas vezes é possível observar (vide Figura 6) que ele imprime no console alguns valores a serem inseridos que já existem no Set, o que demonstra que as threads estão incrementando a variável, mas em algum momento geram o mesmo valor. Isso acontece por causa da concorrência pela variável varCompartilhada de maneira assíncrona, onde ao incrementar esta variável, mais de uma thread acaba gerando o mesmo valor.
Sincronização de Threads
Caso não seja uma opção substituir o ArrayList, uma alternativa para solucionar o problema obtido na Listagem 4 é sincronizar o objeto concorrido; neste caso, a lista (vide Listagem 6). Isso é possível porque todo objeto Java possui um lock associado, que pode ser disputado por qualquer trecho de código sincronizado e em qualquer thread.
import java.util.ArrayList;
import java.util.List;
public class ExemploBlocoSincronizado {
//declaração das variáveis - vide Listagem 4
public static void main(String[] args) {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < QUANTIDADE; i++) {
synchronized (VALORES) {
VALORES.add(++varCompartilhada);
}
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < QUANTIDADE; i++) {
synchronized (VALORES) {
VALORES.add(++varCompartilhada);
}
}
}
});
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < QUANTIDADE; i++) {
synchronized (VALORES) {
VALORES.add(++varCompartilhada);
}
}
}
});
//Idem Listagem 4...
}
}
Um bloco sincronizado previne que mais de uma thread consiga executá-lo simultaneamente. Para isso, a thread que for utilizar esse bloco adquire o lock associado ao objeto sincronizado e as demais que tentarem acessá-lo entrarão em estado de BLOCKED, até que o objeto seja liberado. Na Figura 7 é possível observar o ciclo de vida de uma thread, da sua criação à sua finalização.
A seguir são descritos os possíveis estados que elas podem assumir:
- New: A thread dica neste estado após criar sua instância e antes de invocar o método start();
- Runnable: Indica que ela está executando na máquina virtual Java;
- Blocked: Ainda está ativa, mas está à espera por algum recurso que está em uso por outra thread;
- Waiting: Quando neste estado, ela está à espera por tempo indeterminado pelo fato de outra thread ter executado uma determinada ação. Isto ocorre quando se invoca o método wait() ou join(), por exemplo;
- Timed_Waiting: Neste estado a thread está à espera de uma operação por um tempo pré-determinado. Por exemplo, esta situação ocorre ao invocar métodos como Thread.sleep(sleeptime), wait(timeout) ou join(timeout); e
- Terminated: Este estado sinaliza que o método run() finalizou.
Nota: Ao sincronizar operações, prefira sempre o uso de métodos sincronizados no lugar de blocos desse tipo. Isso porque os bytecodes gerados para um método sincronizado são relativamente menores do que os gerados para um bloco sincronizado.
Outra forma de acessar um dado compartilhado entre threads é criando um método sincronizado. Essa técnica é muito parecida com a anterior, mas ao invés de sincronizar o mesmo bloco de código em cada thread, ele é transferido para um método que contém a notação synchronized na assinatura. Assim, as threads terão que invocá-lo para realizar a operação sobre o dado concorrente. Veja um exemplo na Listagem 7.
import java.util.ArrayList;
import java.util.List;
public class ExemploMetodoSincronizado {
//Idem Listagem 4...
public static void main(String[] args) {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < QUANTIDADE; i++) {
incrementaEAdd();
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < QUANTIDADE; i++) {
incrementaEAdd();
}
}
});
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < QUANTIDADE; i++) {
incrementaEAdd();
}
}
});
//Idem Listagem 4
}
private synchronized static void incrementaEAdd() {
VALORES.add(++varCompartilhada);
}
}
Nota: O ato de adquirir bloqueios para sincronizar threads consome tempo, mesmo quando nenhuma precisa aguardar a liberação do objeto sincronizado. Esse processo é uma faca de dois gumes: se por um lado ele resolve problemas de concorrência, por outro serializa o processamento das threads sobre esse bloco; ou seja, as threads nunca estarão processando esse código simultaneamente, o que pode degradar o desempenho. Portanto, esse recurso deve ser usado com moderação e somente onde for necessário.
Variáveis atômicas
Quando é preciso utilizar tipos primitivos de forma concorrente uma boa opção é adotar seu respectivo tipo atômico, presente no pacote java.util.concurrent.atomic. Este tipo de objeto disponibiliza operações como incremento através de métodos próprios e são executadas em baixo nível de hardware, de forma que a thread não será interrompida durante o processo. Deste modo não é necessário sincronizar o objeto, gerando um algoritmo sem bloqueios e muito mais rápido. Veja o código a seguir:
private static AtomicInteger varCompartilhada = new AtomicInteger(0);
Neste caso, ao invés de utilizar um Integer para armazenar o valor, foi instanciado um AtomicInteger. Com isso, pode-se trocar o varCompartilhada++ pela chamada varCompartilhada.incrementAndGet(), que realizará uma função semelhante de forma atômica, o que garantirá que a thread não seja interrompida no meio do processo de incremento da variável.
Nota: Em tipos atômicos, métodos que não modificam seu valor são sincronizados.
Interface Callable
A interface Runnable é utilizada desde as primeiras versões da plataforma Java e como todos já sabem, ela fornece um único método – run() – que não aceita parâmetros e não retorna valor, assim como não pode lançar qualquer tipo de exceção. No entanto, e se precisássemos executar uma tarefa em paralelo e ao final obter um resultado como retorno? Para solucionar esse problema, você poderia criar um método na classe que implementa Thread ou Runnable e esperar pela conclusão da tarefa para acessar o resultado, assim como no cenário da Listagem 8.
ThreadTarefa t = new ThreadTarefa();
t.start(); //inicia o trabalho da thread
t.join(); //aguarda a thread finalizar
String valor = t.getRetornoTarefa(); //acessa o resultado do processamento da tarefa.
Basicamente não há nada de errado com esse código, mas a partir do Java 5 este processo pode ser feito de forma diferente, graças à interface Callable. Deste modo, em vez de ter um método run(), a interface Callable oferece um método call(), que pode retornar um objeto qualquer, além da grande vantagem de poder capturar uma exceção gerada pela tarefa da thread.
Para tirar proveito dos benefícios de um objeto Callable, é altamente recomendável não utilizar um objeto Thread para executá-lo, e sim alguma outra API, como:
- ExecutorService: É uma API de alto nível para trabalhar diretamente com threads. Permite criar um pool de threads, reutilizá-las e gerenciá-las; e
- ExecutorCompletionService: É uma implementação da interface CompletionService que, associada a um ExecutorService, permite, através do método take(), receber o resultado de cada tarefa conforme elas vão finalizando, independente da ordem em que as tarefas foram criadas.
As implementações apresentadas nas Listagens 9 e 10 demonstram uma boa prática no uso de Callables. Este código cria três tarefas que levam um determinado tempo para concluir e, ao terminar, retornam o nome da thread que a realizou. O código da tarefa se encontra na classe ExemploCallable, que implementa a interface Callable, com retorno do tipo String. Com esta interface a tarefa que se deseja executar deve ser implementada no método call() (vide Listagem 9), o qual é invocado ao executar o objeto Callable.
import java.util.concurrent.Callable;
public class ExemploCallable implements Callable<String> {
private final long tempoDeEspera;
public ExemploCallable(int time) {
this.tempoDeEspera = time;
}
@Override
public String call() throws Exception {
Thread.sleep(tempoDeEspera);
return Thread.currentThread().getName();
}
}
package javamagazine.threads;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExemploRetornoDeTarefa {
public static void main(String[] args) {
List<ExemploCallable> tarefas = Arrays.asList(
new ExemploCallable(8000),
new ExemploCallable(4000),
new ExemploCallable(6000));
ExecutorService threadPool = Executors.newFixedThreadPool(3);
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(threadPool);
//executa as tarefas
for (ExemploCallable tarefa : tarefas) {
completionService.submit(tarefa);
}
System.out.println("Tarefas iniciadas, aguardando conclusão");
//aguarda e imprime o retorno de cada uma
for (int i = 0; i < tarefas.size(); i++) {
try {
System.out.println(completionService.take().get());
} catch (InterruptedException | ExecutionException ex) {
ex.printStackTrace();
}
}
threadPool.shutdown();
}
}
O código da Listagem 10 tem o objetivo de criar e executar três tarefas armazenadas em uma lista. Para simular uma diferença no tempo de execução das threads, cada uma foi desenvolvida para aguardar um certo tempo em milissegundos, que lhe é fornecido no método construtor. Antes de executá-las, no entanto, note que é criado um pool de threads com um ExecutorService, o qual posteriormente é utilizado para criar um ExecutorCompletionService, que será encarregado de executar as tarefas e também nos será útil para receber o retorno de cada uma delas conforme forem concluindo.
Dito isso, uma a uma as tarefas são executadas através do método submit() e, por fim, é utilizado o método take(), para buscar a tarefa concluída, e o método get(), que lê o retorno dela e o imprime no console (Figura 8).
Nota: Note, pelo resultado da Figura 8, que, por mais que a tarefa que tinha duração de oito segundos seja executada primeiro, seu resultado aparece por último. Esse resultado é obtido porque a leitura dos retornos de cada tarefa não tem relação com a ordem de execução, e sim com sua conclusão, graças ao ExecutorCompletionService.
Coleções concorrentes vs Coleções sincronizadas
Um recurso bastante utilizado no desenvolvimento de software são as coleções de dados. Na plataforma Java estas estruturas estão disponíveis em uma série de implementações para os mais diversos fins. Como sabemos, não há nenhum “mistério” em declará-las, no entanto, como é comum nos depararmos com bugs ao acessar essas estruturas de maneira concorrente, vale dedicar um tópico deste artigo para explorar suas peculiaridades.
Dentre as coleções disponíveis no Java, existem variados tipos de estruturas de dados, como, listas, pilhas e filas, e estas, por sua vez, ainda se subdividem quanto a forma de implementação, que compreende:
- Coleções sem suporte a threads: São as coleções normalmente utilizadas. Encontradas no pacote java.util, como ArrayList, HashMap, HashSet, não devem ser utilizadas de forma concorrente, a menos que seja feito um sincronismo externo sobre a coleção;
- Coleções sincronizadas: Podem ser criadas a partir de métodos estáticos disponíveis na classe java.util.Collections, por exemplo: java.util.Collections.synchronizedList(objetoLista). Como estes métodos retornam uma coleção sincronizada, isto significa que seu acesso para modificações ocorre de forma serializada, ou seja, somente uma thread por vez pode acessá-la; e
- Coleções concorrentes: Não necessitam de nenhum sincronismo adicional, como sincronizar seu objeto ou algum método, pois possuem um sofisticado suporte para concorrência. Estas coleções, livres de problemas advindos da concorrência entre threads, podem ser encontradas no pacote java.util.concurrent.
Sabendo disso, preferencialmente, opte por utilizar coleções concorrentes, ao invés das sincronizadas, pois as coleções concorrentes possuem maior escalabilidade e suportam modificações simultâneas de diversas threads sem precisar estabelecer um bloqueio. Já as coleções sincronizadas têm sua performance degradada devido ao bloqueio que precisam estabelecer quando uma thread as acessa. Logo, isso também significa que somente uma thread por vez pode modificá-las.
Um detalhe que costuma passar despercebido nas entrelinhas da programação concorrente é que não existe a garantia de execução paralela ou de que cada thread vai executar em um núcleo diferente. Criar threads apenas sugere à JVM que aquilo seja paralelizado. Por exemplo, você pode ter um processador de quatro núcleos e criar um aplicativo com quatro threads que processem exaustivamente, mas isso não lhe garante que cada uma das quatro threads serão executadas por um núcleo diferente, tão pouco consumirão 100% de processamento. Portanto, não basta criar threads pensando que isto é a solução dos seus problemas. Neste caso, ao criar threads em demasia estar-se-ia degradando a performance, já que a JVM gastaria muito tempo com o escalonamento delas, se comparado ao tempo total de processamento utilizado pelas threads.
Primeiramente, a aplicação deve ser inteligente o bastante para criar o número ideal de threads, ou seja, deve ser levada em consideração a quantidade de processadores/núcleos disponíveis no sistema. Criar um número de threads menor do que o número de núcleos disponíveis gera desperdício. Por outro lado, gerar um número excessivamente maior de threads, causará outro problema. Será perdido mais tempo com o escalonamento das threads do que com as próprias tarefas que elas precisam executar, e assim, por mais que se esteja consumindo 100% da CPU, não se tem o desempenho máximo que se pode atingir.
Para amenizar este problema, um recurso muito útil da plataforma Java pode ser verificado no código apresentado a seguir, que permite ler a quantidade de núcleos disponíveis. A partir disso, podemos calcular o número ideal de threads necessárias para atingir os 100% de processamento sem desperdícios, quando temos uma aplicação que precisa realizar um cálculo exaustivo:
int nucleos = Runtime.getRuntime().availableProcessors();
FrameworkFork/Join
O frameworkFork/Join, introduzido na versão 7 da plataforma Java, é uma implementação da interface ExecutorService que auxilia o desenvolvedor a tirar proveito das arquiteturas multicore. Esta API foi projetada para as tarefas que podem ser quebradas em pequenas partes recursivamente, com o objetivo de usar todo o poder de processamento disponível para melhorar o desempenho da aplicação.
O exemplo apresentado nas Listagens 11 e 12 demonstra um cenário onde o objetivo é buscar, recursivamente em um sistema de arquivos, os arquivos com determinada extensão. Ao iniciar, a tarefa recebe um diretório base onde o algoritmo começa as buscas. O conteúdo do diretório é então analisado e caso haja outra pasta dentro desta, é criada outra tarefa para analisar aquele diretório, e assim recursivamente o algoritmo realiza a busca pelos arquivos e retorna os resultados à tarefa pai.
Tecnicamente, para realizar este processo foi implementada uma classe que estende RecursiveTask e recebe um List de String, o qual é utilizado para informar o tipo de retorno da tarefa (vide Listagem 11). Ao criar a tarefa, ou seja, uma instância da classe ProcessadorDePastas, é necessário informar por parâmetros o diretório base onde se iniciará a busca e a extensão de arquivo pela qual se dará a busca.
Quando se estende a classe RecursiveTask, deve ser implementado o método compute(), que é responsável por desempenhar a tarefa desejada, assim como devemos codificar o método run(), quando se implementa a interface Runnable. É neste método que está especificada a busca pelos arquivos. Nele, o ponto mais importante pode ser verificado na recursividade, local que cria as tarefas paralelas com a chamada ao método fork() para cada pasta localizada dentro da pasta na qual se está pesquisando. Ao final, cada subtarefa retorna os dados de sua busca à tarefa que a criou, e esta, por sua vez, adiciona estes dados na lista “tarefas”. Este é o processo de desempilhar a recursão, que é realizado até chegar à primeira tarefa criada na classe ForkJoinMain, momento este em que os dados são retornados para a lista resultados pelo método join() (vide Listagem 12).
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;
public class ProcessadorDePastas extends RecursiveTask<List<String>> {
private final String diretorio;
private final String extensao;
public ProcessadorDePastas(String diretorio, String extension) {
this.diretorio = diretorio;
this.extensao = extension;
}
@Override
protected List<String> compute() {
List<String> lista = new ArrayList<>();
List<ProcessadorDePastas> tarefas = new ArrayList<>();
File arquivo = new File(diretorio);
File conteudo[] = arquivo.listFiles();
if (conteudo != null) {
for (int i = 0; i < conteudo.length; i++) {
if (conteudo[i].isDirectory()) {
ProcessadorDePastas tarefa = new ProcessadorDePastas(conteudo[i].getAbsolutePath(), extensao);
tarefa.fork();
tarefas.add(tarefa);
} else if (verificaArquivo(conteudo[i].getName())) {
lista.add(conteudo[i].getAbsolutePath());
}
}
}
if (tarefas.size() > 50) {
System.out.printf("%s: %d tarefas executando.\n", arquivo.getAbsolutePath(), tarefas.size());
}
addResultadosDaTarefa(lista, tarefas);
return lista;
}
private void addResultadosDaTarefa(List<String> lista, List<ProcessadorDePastas> tarefas) {
for (ProcessadorDePastas item : tarefas) {
lista.addAll(item.join());
}
}
private boolean verificaArquivo(String nome) {
return nome.endsWith(extensao);
}
}
Na Listagem 12 temos o código responsável por iniciar a tarefa principal, ler e exibir os resultados. Para tal, foram criadas três tarefas base que farão as buscas em três pastas distintas, e a fim de executá-las, foi instanciado um pool de threads com um ForkJoinPool. Este tipo de pool gerencia de forma mais eficiente o trabalho das threads, pois utiliza uma técnica chamada de “roubo de tarefa” para executar as tarefas em espera. Nesta abordagem cada thread possui uma fila de tarefas em espera e no momento em que uma thread não tiver mais nada em sua fila, poderá “roubar” o trabalho de outra, possibilitando mais uma melhoria na performance.
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
public class ForkJoinMain {
public static void main(String[] args) {
ProcessadorDePastas sistema = new ProcessadorDePastas("C:/Windows", ".exe");
ProcessadorDePastas aplicativos = new ProcessadorDePastas("C:/Program Files", ".exe");
ProcessadorDePastas documentos = new ProcessadorDePastas("C:/users", ".doc");
ForkJoinPool pool = new ForkJoinPool();
pool.execute(sistema);
pool.execute(aplicativos);
pool.execute(documentos);
do {
System.out.printf("----------------------------------------\n");
System.out.printf("-> Paralelismo: %d\n", pool.getParallelism());
System.out.printf("-> Threads Ativas: %d\n", pool.getActiveThreadCount());
System.out.printf("-> Tarefas: %d\n", pool.getQueuedTaskCount());
System.out.printf("-> Roubos: %d\n", pool.getStealCount());
System.out.printf("----------------------------------------\n");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while ((!sistema.isDone()) || (!aplicativos.isDone()) || (!documentos.isDone()));
pool.shutdown();
List<String> resultados;
resultados = sistema.join();
System.out.printf("Sistema: %d aplicativos encontrados.\n", resultados.size());
resultados = aplicativos.join();
System.out.printf("Aplicativos: %d encontrados.\n", resultados.size());
resultados = documentos.join();
System.out.printf("Documentos: %d encontrados.\n", resultados.size());
}
}
Por fim, saiba que enquanto o aplicativo processa é possível extrair algumas informações úteis, a fim de monitorar o trabalho do framework e do pool de threads. Estes dados podem ser obtidos com o próprio objeto do pool, através dos seguintes métodos:
- getParallelism(): Retorna o nível do paralelismo. Por default e por recomendação, é a quantidade de núcleos do processador;
- getActiveThreadCount(): Retorna a quantidade de threads ativas;
- getQueuedTaskCount(): Retorna a quantidade total de tarefas na fila de espera; e
- getStealCount(): Retorna a quantidade de roubos que ocorreram. Um roubo ocorre quando uma thread fica sem trabalho. Então ela rouba tarefas da fila de espera de outra thread.
Java 8 – Lambdas e Streams
A versão 8 da plataforma Java tem como uma das suas principais características o suporte a expressões Lambda, recurso que foi projetado com o intuito de facilitar a programação funcional e reduzir o tempo de desenvolvimento. Isso pode ser exemplificado criando um objeto Thread, como expõe o código da Listagem 13, onde é possível notar que a criação do objeto Runnable se torna implícita, reduzindo de cinco para duas a quantidade de linhas necessárias para a criação de uma thread.
new Thread(() -> {
//Código da tarefa a ser executada
}).start();
Ainda no Java 8, uma nova abstração, chamada Stream, foi desenvolvida. Esta permite processar dados de forma declarativa, assim como possibilita a execução de tarefas utilizando vários núcleos sem que seja necessário implementar uma linha de código multithreading, através da função parallelStream. Quando um stream é executado em paralelo, a JVM o particiona em vários substreams, os quais são iterados individualmente por threads e, por fim, seus resultados são combinados (veja Listagem 14).
Além de ser extremamente simples e funcional, em poucas linhas é possível extrair várias informações de uma lista numérica, como valor máximo, mínimo, soma total e média, sem ter que se preocupar em desenvolver estas funções. E mesmo que sua lista não seja numérica, ainda assim se tornou mais fácil transformar ou extrair informações por meio das expressões lambda.
import java.util.ArrayList;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Random;
public class ExemploParallelStream {
public static void main(String[] args) {
List<Long> numeros = new ArrayList<>();
Random random = new Random();
for (int i = 0; i < 10000000; i++) {
numeros.add(random.nextLong());
}
LongSummaryStatistics stats = numeros.parallelStream().mapToLong((x) -> x ).summaryStatistics();
System.out.println("Maior número na lista: " + stats.getMax());
System.out.println("Menor número na lista: " + stats.getMin());
System.out.println("Soma de todos os números: " + stats.getSum());
System.out.println("Média de todos os números: " + stats.getAverage());
}
}
O Java foi uma das primeiras plataformas a fornecer suporte a multithreading no nível de linguagem e agora é uma das primeiras a padronizar utilitários e APIs de alto nível para lidar com threads, como a introdução do framework Fork/Join na versão 7, e a API de streams e o suporte a expressões lambda na versão 8.
Atualmente, qualquer computador ou smartphone tem mais de um núcleo de processamento e a cada novo lançamento esta quantidade só aumenta, assim como a importância do software ser desenvolvido em multithreading. Atendendo a esse cenário, o Java fornece uma base sólida para a criação de uma ampla variedade de soluções paralelas.
Para finalizar, note que é possível alcançar bons resultados com as técnicas aqui demonstradas. Contudo, sempre utilize a programação concorrente com bastante atenção, pois ao manipular dados compartilhados entre threads poderá cair em alguns cenários de depuração bem difíceis.
Artigos relacionados
-
Artigo
-
Artigo
-
Artigo
-
Artigo
-
Vídeo