Como os registros do processo do Hadoop se dividem entre limites de bloco?

De acordo com o Hadoop - The Definitive Guide

Os registros lógicos que FileInputFormats definem normalmente não se encheckboxm perfeitamente nos blocos do HDFS. Por exemplo, os registros lógicos de TextInputFormat são linhas, que ultrapassam os limites do HDFS com mais freqüência do que não. Isso não tem relação com o funcionamento do seu programa – as linhas não são perdidas ou quebradas, por exemplo – mas vale a pena conhecer, pois significa que os mapas de dados locais (ou seja, mapas que estão sendo executados no mesmo host de seus computadores) dados de input) executará algumas leituras remotas. A pequena sobrecarga que isso causa normalmente não é significativa.

Suponha que uma linha de registro seja dividida em dois blocos (b1 e b2). O mapeador que processa o primeiro bloco (b1) notará que a última linha não possui um separador EOL e busca o restante da linha do próximo bloco de dados (b2).

Como o mapeador que processa o segundo bloco (b2) determina que o primeiro registro está incompleto e deve ser processado a partir do segundo registro no bloco (b2)?

Pergunta interessante, passei algum tempo olhando o código para os detalhes e aqui estão os meus pensamentos. As divisões são tratadas pelo cliente por InputFormat.getSplits , portanto, uma olhada em FileInputFormat fornece as seguintes informações:

  • Para cada arquivo de input, obtenha o tamanho do arquivo, o tamanho do bloco e calcule o tamanho da divisão como max(minSize, min(maxSize, blockSize)) onde maxSize corresponde a mapred.max.split.size e minSize é mapred.min.split.size .
  • Divida o arquivo em diferentes FileSplit base no tamanho da divisão calculado acima. O importante aqui é que cada FileSplit é inicializado com um parâmetro start correspondente ao deslocamento no arquivo de input . Ainda não há manipulação das linhas nesse ponto. A parte relevante do código se parece com isso:

     while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } 

Depois disso, se você olhar para o LineRecordReader que é definido pelo TextInputFormat , é onde as linhas são manipuladas:

  • Quando você inicializa seu LineRecordReader ele tenta instanciar um LineReader que é uma abstração para poder ler linhas sobre FSDataInputStream . Existem dois casos:
  • Se houver um CompressionCodec definido, esse codec é responsável por manipular os limites. Provavelmente não é relevante para sua pergunta.
  • Se não houver nenhum codec, no entanto, é aí que as coisas são interessantes: se o start do seu InputSplit for diferente de 0, você recua 1 caractere e pula a primeira linha identificada por \ n ou \ r \ n (Windows) ! O backtrack é importante porque, caso os limites da linha sejam os mesmos que os limites divididos, isso garante que você não pule a linha válida. Aqui está o código relevante:

     if (codec != null) { in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; --start; fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // skip first line and re-establish "start". start += in.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start)); } this.pos = start; 

Então, como os splits são calculados no cliente, os mapeadores não precisam executar em sequência, cada mapeador já sabe se é necessário descartar a primeira linha ou não.

Então, basicamente, se você tem 2 linhas de cada 100Mb no mesmo arquivo, e para simplificar, digamos que o tamanho da divisão é de 64MB. Então, quando as divisões de input forem calculadas, teremos o seguinte cenário:

  • Divisão 1 contendo o caminho e os hosts para esse bloco. Inicializado no início 200-200 = 0Mb, comprimento 64Mb.
  • Split 2 inicializado no início 200-200 + 64 = 64Mb, comprimento 64Mb.
  • Split 3 inicializado no início 200-200 + 128 = 128Mb, comprimento 64Mb.
  • Split 4 inicializado no início 200-200 + 192 = 192Mb, comprimento 8Mb.
  • O Mapeador A processará a divisão 1, a inicial será 0, portanto, não ignore a primeira linha e leia uma linha completa que ultrapasse o limite de 64 MB, para que seja necessária a leitura remota.
  • O Mapeador B processará a divisão 2, o início será! = 0, então ignore a primeira linha após 64Mb-1byte, que corresponde ao final da linha 1 a 100Mb que ainda está na divisão 2, temos 28Mb da linha na divisão 2, então remoto ler os restantes 72Mb.
  • O Mapeador C irá processar a divisão 3, o início é! = 0, então pula a primeira linha após 128Mb-1byte, que corresponde ao final da linha 2 a 200Mb, que é o final do arquivo, portanto não faça nada.
  • O Mapeador D é o mesmo que o mapeador C, exceto que ele procura por uma nova linha após 192Mb-1byte.

O algoritmo Map Reduece não funciona em blocos físicos do arquivo. Funciona em divisões de inputs lógicas. A divisão de input depende de onde o registro foi gravado. Um registro pode abranger dois mappers.

A forma como o HDFS foi configurado, ele divide arquivos muito grandes em blocos grandes (por exemplo, medindo 128 MB) e armazena três cópias desses blocos em diferentes nós no cluster.

O HDFS não tem conhecimento do conteúdo desses arquivos. Um registro pode ter sido iniciado no Bloco-a, mas o final desse registro pode estar presente no Bloco-b .

Para resolver esse problema, o Hadoop usa uma representação lógica dos dados armazenados em blocos de arquivos, conhecidos como divisões de input. Quando um cliente de trabalho MapReduce calcula as divisões de input , ele descobre onde o primeiro registro inteiro em um bloco começa e onde o último registro no bloco termina .

O ponto chave:

Nos casos em que o último registro em um bloco está incompleto, a divisão de input inclui informações de localização para o próximo bloco e o deslocamento de byte dos dados necessários para concluir o registro.

Dê uma olhada no diagrama abaixo.

insira a descrição da imagem aqui

Dê uma olhada neste artigo e na questão SE relacionada: Sobre a divisão de arquivos do Hadoop / HDFS

Mais detalhes podem ser lidos na documentação

A estrutura Map-Reduce depende do InputFormat da tarefa para:

  1. Valide a especificação de input do trabalho.
  2. Dividir o (s) arquivo (s) de input em InputSplits lógicos, cada um dos quais é atribuído a um Mapeador individual.
  3. Cada InputSplit é então atribuído a um Mapeador individual para processamento. Split pode ser tuple . InputSplit[] getSplits(JobConf job,int numSplits ) é a API para cuidar dessas coisas.

FileInputFormat , que estende o método getSplits () implementado pelo InputFormat . Dê uma olhada nos componentes internos deste método em grepcode

Eu vejo isso da seguinte forma: InputFormat é responsável por dividir os dados em divisões lógicas, levando em conta a natureza dos dados.
Nada impede que isso aconteça, embora possa adicionar uma latência significativa ao trabalho – toda a lógica e a leitura em torno dos limites de tamanho de divisão desejados ocorrerão no jobtracker.
O formato de input mais simples é o TextInputFormat. Ele está funcionando da seguinte maneira (até onde eu entendi do código) – o formato de input cria divisões por tamanho, independentemente das linhas, mas o LineRecordReader sempre:
a) Saltar a primeira linha na divisão (ou parte dela), se não for a primeira divisão
b) Leia uma linha após o limite da divisão no final (se os dados estiverem disponíveis, não é a última divisão).

Pelo que entendi, quando o FileSplit é inicializado para o primeiro bloco, o construtor padrão é chamado. Portanto, os valores para início e comprimento são zero inicialmente. No final do processamento do primeiro bloco, se a última linha estiver incompleta, o valor do comprimento será maior que o comprimento da divisão e também será lida a primeira linha do próximo bloco. Devido a isso, o valor de início para o primeiro bloco será maior que zero e, sob essa condição, o LineRecordReader irá pular a primeira linha do segundo bloco. (Veja fonte )

Caso a última linha do primeiro bloco esteja completa, o valor do comprimento será igual ao comprimento do primeiro bloco e o valor do início do segundo bloco será zero. Nesse caso, o LineRecordReader não irá pular a primeira linha e ler o segundo bloco do início.

Faz sentido?

Os mapeadores não precisam se comunicar. Os blocos de arquivos estão no HDFS e o mapeador atual (RecordReader) pode ler o bloco que possui a parte restante da linha. Isso acontece nos bastidores.

Do código-fonte hadoop de LineRecordReader.java o construtor: Eu encontro alguns comentários:

 // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; 

A partir disso, acredito que hadoop lerá uma linha extra para cada divisão (no final da divisão atual, leia a próxima linha na próxima divisão), e se não for a primeira divisão, a primeira linha será descartada. para que nenhum registro de linha seja perdido e incompleto