Como melhorar o desempenho de tarefas lentas do Spark usando a conexão DataFrame e JDBC?

Eu estou tentando acessar uma tabela Teradata de tamanho médio (~ 100 milhões de linhas) via JDBC no modo autônomo em um único nó (local [*]).

Eu estou usando o Spark 1.4.1. e está configurado em uma máquina muito poderosa (2 cpu, 24 núcleos, 126G de RAM).

Eu tentei várias configurações de memory e opções de ajuste para fazer o trabalho mais rápido, mas nenhum deles teve um impacto enorme.

Tenho certeza de que há algo que estou perdendo e abaixo está minha tentativa final que levou cerca de 11 minutos para obter essa contagem simples, e levou apenas 40 segundos usando uma conexão JDBC através de R para obter as contagens.

bin/pyspark --driver-memory 40g --executor-memory 40g df = sqlContext.read.jdbc("jdbc:teradata://......) df.count() 

Quando tentei com a tabela BIG (registros 5B), nenhum resultado retornou após a conclusão da consulta.

Todas as operações de agregação são executadas depois que todo o dataset é recuperado na memory em uma coleção DataFrame . Portanto, fazer a contagem no Spark nunca será tão eficiente quanto seria diretamente no TeraData. Às vezes vale a pena empurrar alguns cálculos para o database criando visualizações e, em seguida, mapeando essas visualizações usando a API JDBC.

Toda vez que você usar o driver JDBC para acessar uma tabela grande, deverá especificar a estratégia de particionamento. Caso contrário, você criará um DataFrame / RDD com uma única partição e sobrecarregará a única conexão JDBC.

Em vez disso, você quer experimentar o seguinte AI (desde o Spark 1.4.0+):

 sqlctx.read.jdbc( url = "", table = "", columnName = "", lowerBound = minValue, upperBound = maxValue, numPartitions = 20, connectionProperties = new java.util.Properties() )

Há também uma opção para reduzir alguns filtros.

Se você não tiver uma coluna integral distribuída uniformemente, você deseja criar algumas partições personalizadas especificando predicados personalizados ( where instruções). Por exemplo, suponhamos que você tenha uma coluna de timestamp e deseje particionar por intervalos de datas:

  val predicates = Array( "2015-06-20" -> "2015-06-30", "2015-07-01" -> "2015-07-10", "2015-07-11" -> "2015-07-20", "2015-07-21" -> "2015-07-31" ) .map { case (start, end) => s"cast(DAT_TME as date) >= date '$start' AND cast(DAT_TME as date) <= date '$end'" } predicates.foreach(println) // Below is the result of how predicates were formed //cast(DAT_TME as date) >= date '2015-06-20' AND cast(DAT_TME as date) <= date '2015-06-30' //cast(DAT_TME as date) >= date '2015-07-01' AND cast(DAT_TME as date) <= date '2015-07-10' //cast(DAT_TME as date) >= date '2015-07-11' AND cast(DAT_TME as date) <= date //'2015-07-20' //cast(DAT_TME as date) >= date '2015-07-21' AND cast(DAT_TME as date) <= date '2015-07-31' sqlctx.read.jdbc( url = "", table = "", predicates = predicates, connectionProperties = new java.util.Properties() )

Ele irá gerar um DataFrame onde cada partição conterá os registros de cada subconsulta associada aos diferentes predicados.

Verifique o código-fonte em DataFrameReader.scala

A tabela não serializada se encheckbox em 40 GB? Se começar a trocar no desempenho do disco, diminuirá dramaticamente.

De qualquer forma, quando você usa um JDBC padrão com a syntax ansi SQL, você aproveita o mecanismo de database, portanto, se teradata (não sei teradata) contém statistics sobre sua tabela, um clássico “select count (*) from table” será muito rápido. Em vez de faísca, está carregando seus 100 milhões de linhas na memory com algo como “selecione * da tabela” e, em seguida, realizará uma contagem nas linhas RDD. É uma carga de trabalho bem diferente.