Como o particionamento do Spark funciona em arquivos no HDFS?

Estou trabalhando com o Apache Spark em um cluster usando o HDFS. Tanto quanto eu entendo, o HDFS está distribuindo arquivos em nós de dados. Então, se colocar um “arquivo.txt” no sistema de arquivos, ele será dividido em partições. Agora eu estou chamando

rdd = SparkContext().textFile("hdfs://.../file.txt") 

do Apache Spark. Tem rdd agora automaticamente as mesmas partições como “file.txt” no sistema de arquivos? O que acontece quando eu chamo

 rdd.repartition(x) 

onde x> então as partições usadas pelo hdfs? O Spark irá reorganizar fisicamente os dados no hdfs para funcionar localmente?

Exemplo: eu coloquei um TextFile de 30GB no HDFS-System, que está distribuindo em 10 nós. Will Spark a) usa os mesmos 10 partitons? e b) embaralhar 30 GB no cluster quando eu chamar repartição (1000)?

Quando o Spark lê um arquivo do HDFS, ele cria uma única partição para uma única divisão de input. A divisão de input é definida pelo Hadoop InputFormat usado para ler esse arquivo. Por exemplo, se você usar textFile() seria TextInputFormat no Hadoop, que retornaria uma única partição para um único bloco de HDFS (mas a divisão entre as partições seria feita na divisão de linha, não na divisão de bloco exata), a menos que você tem um arquivo de texto compactado. No caso de arquivo compactado, você obteria uma única partição para um único arquivo (já que os arquivos de texto compactados não são divisíveis).

Quando você chama rdd.repartition(x) ele executa uma mistura dos dados de N partititons que você tem no rdd para x partições que você deseja ter, o particionamento seria feito na base de round robin.

Se você tiver um arquivo de texto descompactado de 30 GB armazenado no HDFS, com a configuração padrão de tamanho de bloco HDFS (128 MB), ele será armazenado em 235 blocos, o que significa que o RDD que você leu desse arquivo teria 235 partições. Quando você chama repartition(1000) seu RDD seria marcado como reparticionado , mas, na verdade, ele seria embaralhado para 1000 partições somente quando você executasse uma ação sobre esse RDD (conceito de execução lenta).

Aqui está o instantâneo de ” Como os blocos no HDFS são carregados nos trabalhadores do Spark como partições

Nestas imagens 4 blocos HDFS são carregados como partições Spark dentro da memory de 3 trabalhadores

Conjunto de dados no HDFS dividido em partições


Exemplo: eu coloquei um TextFile de 30GB no HDFS-System, que está distribuindo em 10 nós.

Will Spark

a) usar as mesmas 10 partições?

O Spark carrega os mesmos 10 blocos HDFS para a memory dos trabalhadores como partições. Eu suponho que o tamanho do bloco de 30 GB deve ser de 3 GB para obter 10 partições / blocos (com conf padrão)

b) embaralhe 30 GB no cluster quando eu chamar repartição (1000)?

Sim , faça uma reprodução aleatória dos dados entre os nós do trabalhador para criar 1000 partições na memory dos trabalhadores.

Nota:

 HDFS Block -> Spark partition : One block can represent as One partition (by default) Spark partition -> Workers : Many/One partitions can present in One workers 

Além de @ 0x0FFF Se tomar HDFS como arquivo de input, ele irá calcular como para este rdd = SparkContext().textFile("hdfs://.../file.txt") e quando você faz rdd.getNumPatitions isso irá resultar Max(2, Number of HDFS block) . Eu fiz muitos experimentos e achei isso como resultado. Mais uma vez explicitamente você pode fazer rdd = SparkContext().textFile("hdfs://.../file.txt", 400) para obter 400 como partições ou mesmo pode fazer re-partições por rdd.repartition ou diminuir para 10 por rdd.coalesce(10)