Como divido um RDD em dois ou mais RDDs?

Eu estou procurando uma maneira de dividir um RDD em dois ou mais RDDs. O mais próximo que já vi é o Scala Spark: dividir a coleção em vários RDD? que ainda é um único RDD.

Se você estiver familiarizado com o SAS, algo assim:

data work.split1, work.split2; set work.preSplit; if (condition1) output work.split1 else if (condition2) output work.split2 run; 

que resultou em dois conjuntos de dados distintos. Teria que ser imediatamente persistido para obter os resultados que pretendo …

Não é possível gerar múltiplos RDDs de uma única transformação *. Se você quiser dividir um RDD, deverá aplicar um filter para cada condição de divisão. Por exemplo:

 def even(x): return x % 2 == 0 def odd(x): return not even(x) rdd = sc.parallelize(range(20)) rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even)) 

Se você tem apenas uma condição binária e o cálculo é caro, você pode preferir algo assim:

 kv_rdd = rdd.map(lambda x: (x, odd(x))) kv_rdd.cache() rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys() rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys() 

Isso significa apenas um único cálculo de predicado, mas requer uma passagem adicional sobre todos os dados.

É importante observar que, desde que um RDD de input seja armazenado corretamente em cache e não haja suposições adicionais em relação à distribuição de dados, não há diferença significativa quando se trata de complexidade temporal entre o filtro repetido e o loop for com if-else nested.

Com N elementos e condições M, o número de operações que você deve realizar é claramente proporcional a N vezes M. No caso de for-loop, ele deve estar mais próximo de (N + MN) / 2 e o filtro repetido é exatamente NM, mas no final o dia não é nada mais que O (NM). Você pode ver minha discussão ** com Jason Lenderman para ler sobre alguns prós e contras.

No nível muito alto, você deve considerar duas coisas:

  1. Transformações Spark são preguiçosas, até que você execute uma ação que seu RDD não é materializado

    Por que isso Importa? Voltando ao meu exemplo:

     rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even)) 

    Se mais tarde eu decidir que preciso apenas de rdd_odd então não há razão para materializar o rdd_even .

    Se você der uma olhada no seu exemplo do SAS para calcular o work.split2 precisará materializar os dados de input e o work.split1 .

  2. Os RDDs fornecem uma API declarativa. Quando você usa o filter ou o map , o Mecanismo do Spark está totalmente pronto para executar essa operação. Desde que as funções passadas para as transformações sejam livres de efeitos colaterais, ela cria múltiplas possibilidades para otimizar todo o pipeline.

No final do dia, este caso não é especial o suficiente para justificar sua própria transformação.

Esse mapa com padrão de filtro é realmente usado em um Spark central. Veja minha resposta para Como o Sparks RDD.randomSplit realmente divide o RDD e uma parte relevante do método randomSplit .

Se o único objective é conseguir uma divisão na input, é possível usar a cláusula partitionBy para o DataFrameWriter cujo formato de saída de texto é:

 def makePairs(row: T): (String, String) = ??? data .map(makePairs).toDF("key", "value") .write.partitionBy($"key").format("text").save(...) 

* Existem apenas 3 tipos básicos de transformações no Spark:

  • RDD [T] => RDD [T]
  • RDD [T] => RDD [U]
  • (RDD [T], RDD [U]) => RDD [W]

onde T, U, W podem ser tipos atômicos ou produtos / tuplas (K, V). Qualquer outra operação deve ser expressa usando alguma combinação dos itens acima. Você pode verificar o papel RDD original para obter mais detalhes.

** http://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman

*** Veja também Scala Spark: Dividir a coleção em vários RDD?

Como outros pôsteres mencionados acima, não há uma transformação RDD nativa única que divida os RDDs, mas aqui estão algumas operações “multiplex” que podem emular eficientemente uma ampla variedade de “divisão” em RDDs, sem ler várias vezes:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions

Alguns methods específicos para divisão aleatória:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions

Os methods estão disponíveis no projeto silex open source:

https://github.com/willb/silex

Uma postagem no blog explicando como eles funcionam:

http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/

 def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U], persist: StorageLevel): Seq[RDD[U]] = { val mux = self.mapPartitionsWithIndex { case (id, itr) => Iterator.single(f(id, itr)) }.persist(persist) Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } } } def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]], persist: StorageLevel): Seq[RDD[U]] = { val mux = self.mapPartitionsWithIndex { case (id, itr) => Iterator.single(f(id, itr)) }.persist(persist) Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } } } 

Como mencionado anteriormente, esses methods envolvem um trade-off de memory para velocidade, porque eles operam calculando resultados de partições inteiras “avidamente” em vez de “preguiçosamente”. Portanto, é possível que esses methods sejam executados em problemas de memory em partições grandes, em que as transformações lazy mais tradicionais não ocorrerão.

Se você dividir um RDD usando a chamada da API randomSplit , receberá uma matriz de RDDs.

Se você quiser 5 RDDs retornados, passe 5 valores de peso.

por exemplo

 val sourceRDD = val sourceRDD = sc.parallelize(1 to 100, 4) val seedValue = 5 val splitRDD = sourceRDD.randomSplit(Array(1.0,1.0,1.0,1.0,1.0), seedValue) splitRDD(1).collect() res7: Array[Int] = Array(1, 6, 11, 12, 20, 29, 40, 62, 64, 75, 77, 83, 94, 96, 100) 

Uma maneira é usar um particionador personalizado para particionar os dados, dependendo da condição do filtro. Isso pode ser feito estendendo o Partitioner e implementando algo semelhante ao RangePartitioner .

As partições de mapa podem então ser usadas para construir vários RDDs do RDD particionado sem ler todos os dados.

 val filtered = partitioned.mapPartitions { iter => { new Iterator[Int](){ override def hasNext: Boolean = { if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) { false } else { iter.hasNext } } override def next():Int = iter.next() } 

Apenas esteja ciente de que o número de partições nos RDDs filtrados será o mesmo que o número no RDD particionado, portanto, um coalesce deve ser usado para reduzir isso e remover as partições vazias.