Esquema de Particionamento Padrão no Spark

Quando executo o comando abaixo:

scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4).partitionBy(new HashPartitioner(10)).persist() rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[10] at partitionBy at :22 scala> rdd.partitions.size res9: Int = 10 scala> rdd.partitioner.isDefined res10: Boolean = true scala> rdd.partitioner.get res11: org.apache.spark.Partitioner = org.apache.spark.HashPartitioner@a 

Ele diz que existem 10 partições e o particionamento é feito usando o HashPartitioner . Mas quando eu executo o comando abaixo:

 scala> val rdd = sc.parallelize(List((1,2),(3,4),(3,6)),4) ... scala> rdd.partitions.size res6: Int = 4 scala> rdd.partitioner.isDefined res8: Boolean = false 

Ele diz que existem 4 partições e particionador não está definido. Então, qual é o esquema de particionamento padrão no Spark? Como os dados são particionados no segundo caso?

Você tem que distinguir entre duas coisas diferentes:

  • particionamento como distribuição de dados entre partições dependendo de um valor da chave que é limitado apenas ao PairwiseRDDs ( RDD[(T, U)] ). Isso cria um relacionamento entre a partição e o conjunto de chaves que podem ser encontradas em uma determinada partição.
  • particionamento como input de divisão em várias partições em que os dados são simplesmente divididos em partes contendo registros consecutivos para permitir a computação distribuída. A lógica exata depende de uma fonte específica, mas é o número de registros ou o tamanho de um bloco.

    No caso de dados de parallelize é distribuído uniformemente entre partições usando índices. No caso do HadoopInputFormats (como textFile ), ele depende de propriedades como mapreduce.input.fileinputformat.split.minsize / mapreduce.input.fileinputformat.split.maxsize .

Portanto, o esquema de particionamento padrão é simplesmente nenhum porque o particionamento não é aplicável a todos os RDDs. Para operações que requerem particionamento em um método padrão PairwiseRDD ( aggregateByKey , reduceByKey etc.) é usado o particionamento de hash.