(Por que) precisamos chamar o cache ou persistir em um RDD

Quando um dataset distribuído resiliente (RDD) é criado a partir de um arquivo de texto ou coleção (ou de outro RDD), precisamos chamar “cache” ou “persistir” explicitamente para armazenar os dados RDD na memory? Ou os dados RDD são armazenados de maneira distribuída na memory por padrão?

val textFile = sc.textFile("/user/emp.txt") 

De acordo com o meu entendimento, após a etapa acima, textFile é um RDD e está disponível em todas / algumas da memory do nó.

Em caso afirmativo, por que precisamos chamar “cache” ou “persist” no textFile RDD?

A maioria das operações de RDD é preguiçosa. Pense em um RDD como uma descrição de uma série de operações. Um RDD não é um dado. Então esta linha:

 val textFile = sc.textFile("/user/emp.txt") 

Não faz nada. Cria um RDD que diz “precisaremos carregar este arquivo”. O arquivo não está carregado neste momento.

As operações de RDD que exigem a observação do conteúdo dos dados não podem ser preguiçosas. (Essas ações são chamadas.) Um exemplo é RDD.count – para informar o número de linhas no arquivo, o arquivo precisa ser lido. Portanto, se você escrever textFile.count , nesse ponto o arquivo será lido, as linhas serão contadas e a contagem será retornada.

E se você chamar textFile.count novamente? A mesma coisa: o arquivo será lido e contado novamente. Nada é armazenado. Um RDD não é um dado.

Então, o que o RDD.cache faz? Se você adicionar textFile.cache ao código acima:

 val textFile = sc.textFile("/user/emp.txt") textFile.cache 

Não faz nada. RDD.cache também é uma operação lenta. O arquivo ainda não é lido. Mas agora o RDD diz “leia este arquivo e depois armazene em cache o conteúdo”. Se você executar textFile.count pela primeira vez, o arquivo será carregado, armazenado em cache e contado. Se você chamar textFile.count uma segunda vez, a operação usará o cache. Ele apenas pegará os dados do cache e contará as linhas.

O comportamento do cache depende da memory disponível. Se o arquivo não couber na memory, por exemplo, textFile.count voltará ao comportamento usual e textFile.count o arquivo.

Eu acho que a pergunta seria melhor formulada como:

Quando precisamos chamar o cache ou persistir em um RDD?

Os processos do Spark são preguiçosos, isto é, nada acontecerá até que seja necessário. Para responder rapidamente a questão, após val textFile = sc.textFile("/user/emp.txt") é emitido, nada acontece com os dados, apenas um HadoopRDD é construído, usando o arquivo como fonte.

Digamos que nós transformamos esses dados um pouco:

 val wordsRDD = textFile.flatMap(line => line.split("\\W")) 

Mais uma vez, nada acontece com os dados. Agora há um novo RDD wordsRDD que contém uma referência ao testFile e uma function a ser aplicada quando necessário.

Somente quando uma ação é chamada em um RDD, como as wordsRDD.count , a cadeia RDD, denominada linhagem , será executada. Ou seja, os dados, divididos em partições, serão carregados pelos executores do cluster do Spark, a function flatMap será aplicada e o resultado será calculado.

Em uma linhagem linear, como a deste exemplo, o cache() não é necessário. Os dados serão carregados para os executores, todas as transformações serão aplicadas e, finalmente, a count será computada, tudo na memory – se os dados couberem na memory.

cache é útil quando a linhagem do RDD se expande. Digamos que você queira filtrar as palavras do exemplo anterior para uma contagem de palavras positivas e negativas. Você poderia fazer isso assim:

 val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count() val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count() 

Aqui, cada filial emite um recarregamento dos dados. Adicionar uma instrução de cache explícita garantirá que o processamento feito anteriormente seja preservado e reutilizado. O trabalho ficará assim:

 val textFile = sc.textFile("/user/emp.txt") val wordsRDD = textFile.flatMap(line => line.split("\\W")) wordsRDD.cache() val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count() val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count() 

Por essa razão, o cache é dito “quebrar a linhagem”, pois cria um ponto de verificação que pode ser reutilizado para processamento posterior.

Regra geral: Use o cache quando a linhagem do seu RDD se ramifica ou quando um RDD é usado várias vezes, como em um loop.

Precisamos chamar “cache” ou “persistir” explicitamente para armazenar os dados RDD na memory?

Sim, somente se necessário.

Os dados RDD armazenados de forma distribuída na memory por padrão?

Não!

E estas são as razões pelas quais:

  • O Spark oferece suporte a dois tipos de variables ​​compartilhadas: variables ​​de difusão, que podem ser usadas para armazenar em cache um valor na memory em todos os nós e acumuladores, que são variables ​​que são “adicionadas” apenas, como contadores e sums.

  • Os RDDs suportam dois tipos de operações: transformações, que criam um novo dataset a partir de um existente, e ações, que retornam um valor para o programa do driver após executar uma computação no dataset. Por exemplo, map é uma transformação que passa cada elemento do dataset por meio de uma function e retorna um novo RDD representando os resultados. Por outro lado, reduzir é uma ação que agrega todos os elementos do RDD usando alguma function e retorna o resultado final ao programa do driver (embora também haja um reduceByKey paralelo que retorna um dataset distribuído).

  • Todas as transformações no Spark são preguiçosas, pois não calculam os resultados imediatamente. Em vez disso, eles apenas lembram as transformações aplicadas a algum dataset de base (por exemplo, um arquivo). As transformações são calculadas apenas quando uma ação exige que um resultado seja retornado ao programa do driver. Esse design permite que o Spark seja executado com mais eficiência. Por exemplo, podemos perceber que um dataset criado por meio do mapa será usado em uma redução e retornará apenas o resultado da redução para o driver, em vez do dataset mapeado maior.

  • Por padrão, cada RDD transformado pode ser recalculado toda vez que você executar uma ação nele. No entanto, você também pode persistir um RDD na memory usando o método persist (ou cache). Nesse caso, o Spark manterá os elementos no cluster para access muito mais rápido na próxima vez que você consultá-lo. Também há suporte para persistir RDDs no disco ou replicados em vários nós.

Para mais detalhes, consulte o guia de programação do Spark .

Adicionando outro motivo para adicionar (ou adicionar temporariamente) chamada de método de cache .

para problemas de memory de debugging

Com o método cache , o Spark fornecerá informações de debugging referentes ao tamanho do RDD. Assim, na interface integrada do Spark, você obterá informações de consumo de memory RDD. e isso se mostrou muito útil para diagnosticar problemas de memory.

Abaixo estão as três situações em que você deve armazenar seus RDDs em cache:

usando um RDD muitas vezes

executando várias ações no mesmo RDD

para cadeias longas de (ou muito caras) transformações