spark.ml StringIndexer lança ‘Rótulo não visto’ no ajuste ()

Estou preparando um exemplo de spark.ml brinquedo. Spark version 1.6.0 , em execução no Oracle JDK version 1.8.0_65 , pyspark, ipython notebook.

Primeiro, dificilmente tem algo a ver com Spark, ML, StringIndexer: manipular labels não vistos . A exceção é lançada ao ajustar um pipeline a um dataset, não a transformando. E suprimir a exceção pode não ser uma solução aqui, pois, infelizmente, o dataset fica muito confuso nesse caso.

Meu dataset é de cerca de 800 MB descompactado, por isso pode ser difícil de reproduzir (subconjuntos menores parecem evitar esse problema).

O dataset é assim:

 +--------------------+-----------+-----+-------+-----+--------------------+ | url| ip| rs| lang|label| txt| +--------------------+-----------+-----+-------+-----+--------------------+ |http://3d-detmold...|217.160.215|378.0| de| 0.0|homwillkommskip c...| | http://3davto.ru/| 188.225.16|891.0| id| 1.0|оформить заказ пе...| | http://404.szm.com/| 85.248.42| 58.0| cs| 0.0|kliknite tu alebo...| | http://404.xls.hu/| 212.52.166|168.0| hu| 0.0|honlapkészítés404...| |http://a--m--a--t...| 66.6.43|462.0| en| 0.0|back top archiv r...| |http://a-wrf.ru/c...| 78.108.80|126.0|unknown| 1.0| | |http://a-wrf.ru/s...| 78.108.80|214.0| ru| 1.0|установк фаркопна...| +--------------------+-----------+-----+-------+-----+--------------------+ 

O valor previsto é o label . O pipeline inteiro aplicado a ele:

 from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Tokenizer, HashingTF from pyspark.ml.classification import LogisticRegression train, test = munge(src_dataframe).randomSplit([70., 30.], seed=12345) pipe_stages = [ StringIndexer(inputCol='lang', outputCol='lang_idx'), OneHotEncoder(inputCol='lang_idx', outputCol='lang_onehot'), Tokenizer(inputCol='ip', outputCol='ip_tokens'), HashingTF(numFeatures=2**10, inputCol='ip_tokens', outputCol='ip_vector'), Tokenizer(inputCol='txt', outputCol='txt_tokens'), HashingTF(numFeatures=2**18, inputCol='txt_tokens', outputCol='txt_vector'), VectorAssembler(inputCols=['lang_onehot', 'ip_vector', 'txt_vector'], outputCol='features'), LogisticRegression(labelCol='label', featuresCol='features') ] pipe = Pipeline(stages=pipe_stages) pipemodel = pipe.fit(train) 

E aqui está o stacktrace:

 Py4JJavaError: An error occurred while calling o10793.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 627.0 failed 1 times, most recent failure: Lost task 18.0 in stage 627.0 (TID 23259, localhost): org.apache.spark.SparkException: Unseen label: pl-PL. at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157) at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007) at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1113) at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:271) at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:159) at org.apache.spark.ml.Predictor.fit(Predictor.scala:90) at org.apache.spark.ml.Predictor.fit(Predictor.scala:71) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Unseen label: pl-PL. at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157) at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more 

A linha mais interessante é:

 org.apache.spark.SparkException: Unseen label: pl-PL. 

Não faço ideia de como pl-PL que é um valor da coluna lang , poderia ter sido misturado na coluna label , que é um float , não uma string editada: algumas coclusions apressadas, corrigidas graças a @ zero323

Eu procurei mais e descobri que pl-PL é um valor da parte de teste do dataset, não de treinamento. Então, agora eu nem sei onde procurar o culpado: pode facilmente ser o código randomSplit , não o StringIndexer , e quem sabe o que mais.

Como faço para investigar isso?

Unseen label é uma mensagem genérica que não corresponde a uma coluna específica . O problema mais provável é com o seguinte estágio:

 StringIndexer(inputCol='lang', outputCol='lang_idx') 

com pl-PL presente no train("lang") e não presente no test("lang") .

Você pode corrigi-lo usando setHandleInvalid com skip :

 from pyspark.ml.feature import StringIndexer train = sc.parallelize([(1, "foo"), (2, "bar")]).toDF(["k", "v"]) test = sc.parallelize([(3, "foo"), (4, "foobar")]).toDF(["k", "v"]) indexer = StringIndexer(inputCol="v", outputCol="vi") indexer.fit(train).transform(test).show() ## Py4JJavaError: An error occurred while calling o112.showString. ## : org.apache.spark.SparkException: Job aborted due to stage failure: ## ... ## org.apache.spark.SparkException: Unseen label: foobar. indexer.setHandleInvalid("skip").fit(train).transform(test).show() ## +---+---+---+ ## | k| v| vi| ## +---+---+---+ ## | 3|foo|1.0| ## +---+---+---+ 

Tudo bem, acho que entendi isso. Pelo menos eu tenho esse trabalho.

Armazenar em cache o dataframe (incluindo partes de teste / trem) resolve o problema. Foi o que encontrei nesta edição do JIRA: https://issues.apache.org/jira/browse/SPARK-12590 .

Portanto, não é um bug, apenas o fato de que randomSample pode gerar um resultado diferente no mesmo dataset, mas diferentemente particionado. E aparentemente, algumas das minhas funções de munging (ou Pipeline ) envolvem a repartição, portanto, os resultados da recomputação do conjunto de trem de sua definição podem divergir.

O que ainda me interessa – é a reprodutibilidade: é sempre a linha ‘pl-PL’ que fica misturada na parte errada do dataset, ou seja, não é uma distribuição aleatória. É determinista, apenas inconsistente. Eu me pergunto como exatamente isso acontece.