Spark ler arquivo do S3 usando sc.textFile (“s3n: //…)

Tentando ler um arquivo localizado no S3 usando o shell de ignição:

scala> val myRdd = sc.textFile("s3n://myBucket/myFile1.log") lyrics: org.apache.spark.rdd.RDD[String] = s3n://myBucket/myFile1.log MappedRDD[55] at textFile at :12 scala> myRdd.count java.io.IOException: No FileSystem for scheme: s3n at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2607) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2614) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) ... etc ... 

A IOException: No FileSystem para esquema: erro s3n ocorreu com:

  • Spark 1.31 ou 1.40 na máquina dev (sem libs do Hadoop)
  • Correndo a partir do Hortonworks Sandbox HDP v2.2.4 (Hadoop 2.60) que integra o Spark 1.2.1 fora da checkbox
  • Usando o esquema s3: // ou s3n: //

Qual é a causa desse erro? Falta dependência, falta de configuração ou uso sc.textFile() de sc.textFile() ?

Ou pode ser que isso se deva a um bug que afeta a criação do Spark específica do Hadoop 2.60, como este post parece sugerir. Vou tentar o Spark para o Hadoop 2.40 para ver se isso resolve o problema.

Confirmado que isso está relacionado à construção do Spark no Hadoop 2.60. Acabou de instalar o Spark 1.4.0 “Pré-construído para o Hadoop 2.4 e posterior” (em vez do Hadoop 2.6). E o código agora funciona bem.

sc.textFile("s3n://bucketname/Filename") agora gera outro erro:

 java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively). 

O código abaixo usa o formato de URL S3 para mostrar que o Spark pode ler o arquivo S3. Usando a máquina dev (sem libs do Hadoop).

 scala> val lyrics = sc.textFile("s3n://MyAccessKeyID:MySecretKey@zpub01/SafeAndSound_Lyrics.txt") lyrics: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at :21 scala> lyrics.count res1: Long = 9 

Melhor ainda : o código acima com as credenciais da AWS embutidas no URI do S3N será interrompido se a chave secreta da AWS tiver um encaminhamento “/”. A configuração de credenciais da AWS no SparkContext corrigirá isso. Código funciona se o arquivo S3 é público ou privado.

 sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "BLABLA") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "....") // can contain "/" val myRDD = sc.textFile("s3n://myBucket/MyFilePattern") myRDD.count 

Apesar de essa pergunta já ter uma resposta aceita, acho que os detalhes exatos de por que isso está acontecendo ainda estão faltando. Então eu acho que pode haver um lugar para mais uma resposta.

Se você adicionar a dependência necessária do hadoop-aws , seu código deverá funcionar.

Ao iniciar o Hadoop 2.6.0, o conector s3 FS foi movido para uma biblioteca separada chamada hadoop-aws. Há também um Jira para isso: Mova o código do conector FS relacionado ao s3 para hadoop-aws .

Isto significa que qualquer versão de faísca, que tenha sido construída contra o Hadoop 2.6.0 ou mais recente, terá que usar outra dependência externa para poder se conectar ao S3 File System.
Aqui está um exemplo sbt que eu tentei e está funcionando como esperado usando o Apache Spark 1.6.2 construído no Hadoop 2.6.0:

libraryDependencies + = “org.apache.hadoop”% “hadoop-aws”% “2.6.0”

No meu caso, encontrei alguns problemas de dependencies, então resolvi adicionando exclusão:

libraryDependencies + = “org.apache.hadoop”% “hadoop-aws”% “2.6.0” excluir (“tomcat”, “jasper-compilador”) excludeAll ExclusionRule (organization = “javax.servlet”)

Em outra nota relacionada, eu ainda tenho que tentar, mas é recomendado usar o sistema de arquivos “s3a” e não “s3n” iniciando o Hadoop 2.6.0.

A terceira geração, s3a: filesystem. Projetado para ser um switch em substituição ao s3n :, esta binding do sistema de arquivos suporta arquivos maiores e promete maior desempenho.

Você pode adicionar o parâmetro –packages com o jar apropriado: ao seu envio:

 bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 code.py 

Este é um exemplo de código de ignição que pode ler os arquivos presentes no s3

 val hadoopConf = sparkContext.hadoopConfiguration hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") hadoopConf.set("fs.s3.awsAccessKeyId", s3Key) hadoopConf.set("fs.s3.awsSecretAccessKey", s3Secret) var jobInput = sparkContext.textFile("s3://" + s3_location) 

Correu para o mesmo problema no Spark 2.0.2. Resolvido alimentando os potes. Aqui está o que eu corri:

 $ spark-shell --jars aws-java-sdk-1.7.4.jar,hadoop-aws-2.7.3.jar,jackson-annotations-2.7.0.jar,jackson-core-2.7.0.jar,jackson-databind-2.7.0.jar,joda-time-2.9.6.jar scala> val hadoopConf = sc.hadoopConfiguration scala> hadoopConf.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem") scala> hadoopConf.set("fs.s3.awsAccessKeyId",awsAccessKeyId) scala> hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey) scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) scala> sqlContext.read.parquet("s3://your-s3-bucket/") 

obviamente, você precisa ter os flasks no caminho onde você está executando o shell de ignição de

Para o Spark 1.4.x “Pré-construído para o Hadoop 2.6 e posterior”:

Acabei de copiar os pacotes S3, S3native do hadoop-aws-2.6.0.jar para o spark-assembly-1.4.1-hadoop2.6.0.jar.

Depois disso, reiniciei o cluster de centelhamento e ele funciona. Não esqueça de verificar o proprietário e o modo do jarro de assembly.

Há um Spark JIRA, SPARK-7481 , aberto a partir de hoje, 20 de outubro de 2016, para adicionar um módulo spark-cloud que inclui dependencies transitivas em tudo s3a e azure wasb: need, junto com testes.

E um Spark PR para combinar. É assim que obtenho suporte s3a nas minhas construções de faíscas

Se você fizer isso manualmente, você deve obter o JAR hadoop-aws da versão exata que o resto do seu hadoop JARS possui, e uma versão do AWS JARs 100% em sincronia com a qual o Hadoop aws foi compilado. Para o Hadoop 2.7. {1, 2, 3, …}

 hadoop-aws-2.7.x.jar aws-java-sdk-1.7.4.jar joda-time-2.9.3.jar + jackson-*-2.6.5.jar 

Cole tudo isso em SPARK_HOME/jars . Execute o faísca com suas credenciais configuradas em Env vars ou em spark-default.conf

o teste mais simples é você pode fazer uma contagem de linha de um arquivo CSV

 val landsatCSV = "s3a://landsat-pds/scene_list.gz" val lines = sc.textFile(landsatCSV) val lineCount = lines.count() 

Obter um número: tudo está bem. Obter um rastreamento de pilha Más notícias.

Eu tive que copiar os arquivos jar de um download do hadoop para o $SPARK_HOME/jars . Usando o sinalizador --jars ou o sinalizador --packages para spark-submit não funcionou.

Detalhes:

  • Spark 2.3.0
  • Hadoop baixado foi 2.7.6
  • Dois arquivos jar copiados eram de (hadoop dir)/share/hadoop/tools/lib/
    • aws-java-sdk-1.7.4.jar
    • hadoop-aws-2.7.6.jar

S3N não é um formato de arquivo padrão. Você precisa criar sua versão do Spark com uma versão do Hadoop que tenha as bibliotecas adicionais usadas para compatibilidade com a AWS. Informações adicionais que encontrei aqui, https://www.hakkalabs.co/articles/making-your-local-hadoop-more-like-aws-elastic-mapreduce

Você provavelmente tem que usar s3a: / scheme em vez de s3: / ou s3n: / No entanto, ele não está funcionando fora da checkbox (para mim) para o shell de centelha. Eu vejo o seguinte stacktrace:

 java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) at org.apache.spark.rdd.RDD.count(RDD.scala:1099) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:24) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:29) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:31) at $iwC$$iwC$$iwC$$iwC$$iwC.(:33) at $iwC$$iwC$$iwC$$iwC.(:35) at $iwC$$iwC$$iwC.(:37) at $iwC$$iwC.(:39) at $iwC.(:41) at (:43) at .(:47) at .() at .(:7) at .() at $print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072) ... 68 more 

O que eu acho – você tem que adicionar manualmente manualmente a dependência do hadoop-aws http://search.maven.org/#artifactdetails|org.apache.hadoop|hadoop-aws|2.7.1|jar Mas não tenho ideia de como adicione-o a faísca-shell corretamente.

Eu estava enfrentando o mesmo problema. Ele funcionou bem depois de definir o valor para fs.s3n.impl e adicionar dependência de hadoop-aws.

 sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKeyId) sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey) sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") 

Usa s3a em vez de s3n. Eu tive um problema semelhante em um trabalho do Hadoop. Depois de mudar de s3n para s3a funcionou.

por exemplo

s3a: //myBucket/myFile1.log