Estou trabalhando com o Apache Spark em um cluster usando o HDFS. Tanto quanto eu entendo, o HDFS está distribuindo arquivos em nós de dados. Então, se colocar um “arquivo.txt” no sistema de arquivos, ele será dividido em partições. Agora eu estou chamando
rdd = SparkContext().textFile("hdfs://.../file.txt")
do Apache Spark. Tem rdd agora automaticamente as mesmas partições como “file.txt” no sistema de arquivos? O que acontece quando eu chamo
rdd.repartition(x)
onde x> então as partições usadas pelo hdfs? O Spark irá reorganizar fisicamente os dados no hdfs para funcionar localmente?
Exemplo: eu coloquei um TextFile de 30GB no HDFS-System, que está distribuindo em 10 nós. Will Spark a) usa os mesmos 10 partitons? e b) embaralhar 30 GB no cluster quando eu chamar repartição (1000)?
Quando o Spark lê um arquivo do HDFS, ele cria uma única partição para uma única divisão de input. A divisão de input é definida pelo Hadoop InputFormat
usado para ler esse arquivo. Por exemplo, se você usar textFile()
seria TextInputFormat
no Hadoop, que retornaria uma única partição para um único bloco de HDFS (mas a divisão entre as partições seria feita na divisão de linha, não na divisão de bloco exata), a menos que você tem um arquivo de texto compactado. No caso de arquivo compactado, você obteria uma única partição para um único arquivo (já que os arquivos de texto compactados não são divisíveis).
Quando você chama rdd.repartition(x)
ele executa uma mistura dos dados de N
partititons que você tem no rdd
para x
partições que você deseja ter, o particionamento seria feito na base de round robin.
Se você tiver um arquivo de texto descompactado de 30 GB armazenado no HDFS, com a configuração padrão de tamanho de bloco HDFS (128 MB), ele será armazenado em 235 blocos, o que significa que o RDD que você leu desse arquivo teria 235 partições. Quando você chama repartition(1000)
seu RDD seria marcado como reparticionado , mas, na verdade, ele seria embaralhado para 1000 partições somente quando você executasse uma ação sobre esse RDD (conceito de execução lenta).
Aqui está o instantâneo de ” Como os blocos no HDFS são carregados nos trabalhadores do Spark como partições ”
Nestas imagens 4 blocos HDFS são carregados como partições Spark dentro da memory de 3 trabalhadores
Exemplo: eu coloquei um TextFile de 30GB no HDFS-System, que está distribuindo em 10 nós.
Will Spark
a) usar as mesmas 10 partições?
O Spark carrega os mesmos 10 blocos HDFS para a memory dos trabalhadores como partições. Eu suponho que o tamanho do bloco de 30 GB deve ser de 3 GB para obter 10 partições / blocos (com conf padrão)
b) embaralhe 30 GB no cluster quando eu chamar repartição (1000)?
Sim , faça uma reprodução aleatória dos dados entre os nós do trabalhador para criar 1000 partições na memory dos trabalhadores.
Nota:
HDFS Block -> Spark partition : One block can represent as One partition (by default) Spark partition -> Workers : Many/One partitions can present in One workers
Além de @ 0x0FFF Se tomar HDFS como arquivo de input, ele irá calcular como para este rdd = SparkContext().textFile("hdfs://.../file.txt")
e quando você faz rdd.getNumPatitions
isso irá resultar Max(2, Number of HDFS block)
. Eu fiz muitos experimentos e achei isso como resultado. Mais uma vez explicitamente você pode fazer rdd = SparkContext().textFile("hdfs://.../file.txt", 400)
para obter 400 como partições ou mesmo pode fazer re-partições por rdd.repartition
ou diminuir para 10 por rdd.coalesce(10)