Spark – repartition () vs coalesce ()

De acordo com o Learning Spark

Tenha em mente que reparticionar seus dados é uma operação bastante cara. O Spark também tem uma versão otimizada de repartition () chamada coalesce () que permite evitar a movimentação de dados, mas somente se você estiver diminuindo o número de partições RDD.

Uma diferença que eu vejo é que com o repartition () o número de partições pode ser aumentado / diminuído, mas com o coalesce () o número de partições só pode ser diminuído.

Se as partições estão espalhadas por várias máquinas e a coalescência () é executada, como ela pode evitar a movimentação de dados?

Evita um shuffle completo . Se soubermos que o número está diminuindo, o executor poderá manter os dados com segurança no número mínimo de partições, apenas movendo os dados dos nós extras para os nós que mantivemos.

Então, seria algo como isto:

Node 1 = 1,2,3 Node 2 = 4,5,6 Node 3 = 7,8,9 Node 4 = 10,11,12 

Em seguida, coalesce se a duas partições:

 Node 1 = 1,2,3 + (10,11,12) Node 3 = 7,8,9 + (4,5,6) 

Observe que o Nó 1 e o Nó 3 não exigiram que seus dados originais fossem movidos.

A resposta de Justin é incrível e essa resposta é mais profunda.

O algoritmo de repartition faz um shuffle completo e cria novas partições com dados distribuídos uniformemente. Vamos criar um DataFrame com os números de 1 a 12.

 val x = (1 to 12).toList val numbersDf = x.toDF("number") 

numbersDf contém 4 partições na minha máquina.

 numbersDf.rdd.partitions.size // => 4 

Aqui está como os dados são divididos nas partições:

 Partition 00000: 1, 2, 3 Partition 00001: 4, 5, 6 Partition 00002: 7, 8, 9 Partition 00003: 10, 11, 12 

Vamos fazer um shuffle completo com o método de repartition e obter esses dados em dois nós.

 val numbersDfR = numbersDf.repartition(2) 

Aqui está como os dados numbersDfR são particionados na minha máquina:

 Partition A: 1, 3, 4, 6, 7, 9, 10, 12 Partition B: 2, 5, 8, 11 

O método de repartition cria novas partições e distribui uniformemente os dados nas novas partições (a distribuição de dados é mais uniforme para conjuntos de dados maiores).

Diferença entre coalesce e repartition

coalesce usa partições existentes para minimizar a quantidade de dados embaralhados. repartition cria novas partições e faz um shuffle completo. coalesce resulta em partições com diferentes quantidades de dados (às vezes, partições que têm tamanhos muito diferentes) e a partição resulta em partições de tamanho aproximadamente igual.

coalesce ou repartition mais rápido?

coalesce pode ser mais rápida do que a repartition , mas partições de tamanho desigual geralmente são mais lentas para se trabalhar do que partições de tamanho igual. Você geralmente precisará reparticionar os conjuntos de dados depois de filtrar um grande dataset. Descobri que a repartition é mais rápida no geral, porque o Spark é construído para funcionar com partições de tamanho igual.

Leia esta postagem do blog se você quiser mais detalhes.

Um ponto adicional a ser observado aqui é que, como o princípio básico do Spark RDD é a imutabilidade. A repartição ou coalesce criará novo RDD. O RDD base continuará a existir com o seu número original de partições. No caso de o caso de uso exigir a persistência do RDD no cache, o mesmo deve ser feito para o RDD recém-criado.

 scala> pairMrkt.repartition(10) res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at :26 scala> res16.partitions.length res17: Int = 10 scala> pairMrkt.partitions.length res20: Int = 2 

Todas as respostas estão adicionando um grande conhecimento a essa pergunta muito freqüente.

Então, seguindo a tradição do cronograma desta questão, aqui estão meus 2 centavos.

Eu achei a repartição mais rápida do que a coalescência , em um caso muito específico.

No meu aplicativo, quando o número de arquivos que estimamos é menor do que o limite determinado, a repartição funciona mais rápido.

Aqui é o que eu quero dizer

 if(numFiles > 20) df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest) else df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest) 

No snippet acima, se meus arquivos fossem menores que 20, a união estava demorando para terminar, enquanto a divisão era muito mais rápida e o código acima.

Naturalmente, esse número (20) dependerá do número de trabalhadores e da quantidade de dados.

Espero que ajude.

De uma forma simples COALESCE: – é apenas para diminuir o não de partições, sem embaralhar de dados apenas comprimir as partições

REPARTIÇÃO: – é para aumentar e diminuir o número de partições, mas o embaralhamento ocorre

Exemplo:-

 val rdd = sc.textFile("path",7) rdd.repartition(10) rdd.repartition(2) 

Ambos funcionam bem

Mas geralmente vamos para essas duas coisas quando precisamos ver a saída em um cluster, vamos com isso.

Mas também você deve certificar-se de que os dados que estão chegando nós coalescentes devem ter altamente configurado, se você está lidando com dados enormes. Porque todos os dados serão carregados para esses nós, podem levar a exceção de memory. Embora a reparação seja cara, prefiro usá-la. Já que embaralha e distribui os dados igualmente.

Seja sábio para escolher entre coalescer e reparticionar.

repartition – é recomendado usar a repartição enquanto aumenta o número de partições, porque envolve o embaralhamento de todos os dados.

coalesce- Recomenda-se o uso de coalesce reduzindo o número de partições. Por exemplo, se você tiver 3 partições e quiser reduzi-las a 2 partições, Coalesce moverá os dados da 3ª partição para as partições 1 e 2. As partições 1 e 2 permanecerão no mesmo Container.but, a repartição embaralhará os dados em todas as partições para uso de rede entre o executor será alto e impactará o desempenho.

O desempenho combina melhor o desempenho do que a repartição, reduzindo o número de partições.