Fifo Linux não bloqueante (registro sob demanda)

Eu gosto de registrar uma saída de programas ‘on demand’. Por exemplo. a saída é registrada no terminal, mas outro processo pode ligar-se à saída atual a qualquer momento.

O caminho clássico seria:

myprogram 2>&1 | tee /tmp/mylog 

e sob demanda

 tail /tmp/mylog 

No entanto, isso criaria um arquivo de log crescente, mesmo se não for usado até que a unidade fique sem espaço. Então minha tentativa foi:

 mkfifo /tmp/mylog myprogram 2>&1 | tee /tmp/mylog 

e sob demanda

 cat /tmp/mylog 

Agora posso ler / tmp / mylog a qualquer momento. No entanto, qualquer saída bloqueia o programa até que o / tmp / mylog seja lido. Eu gosto do fifo para liberar todos os dados recebidos não lidos de volta. Como fazer isso?

Inspirado em sua pergunta, escrevi um programa simples que permite fazer isso:

$ myprogram 2>&1 | ftee /tmp/mylog

Ele se comporta de forma semelhante ao tee mas clona o stdin para stdout e para um pipe nomeado (um requisito por enquanto) sem bloqueio. Isso significa que, se você quiser fazer o log dessa maneira, pode acontecer de perder seus dados de log, mas acho que é aceitável em seu cenário. O truque é bloquear o sinal SIGPIPE e ignorar o erro ao escrever em um fifo quebrado. Esta amostra pode ser otimizada de várias maneiras, é claro, mas até agora, o trabalho que eu acho.

 /* ftee - clone stdin to stdout and to a named pipe (c) racic@stackoverflow WTFPL Licence */ #include  #include  #include  #include  #include  #include  #include  #include  #include  int main(int argc, char *argv[]) { int readfd, writefd; struct stat status; char *fifonam; char buffer[BUFSIZ]; ssize_t bytes; signal(SIGPIPE, SIG_IGN); if(2!=argc) { printf("Usage:\n someprog 2>&1 | %s FIFO\n FIFO - path to a" " named pipe, required argument\n", argv[0]); exit(EXIT_FAILURE); } fifonam = argv[1]; readfd = open(fifonam, O_RDONLY | O_NONBLOCK); if(-1==readfd) { perror("ftee: readfd: open()"); exit(EXIT_FAILURE); } if(-1==fstat(readfd, &status)) { perror("ftee: fstat"); close(readfd); exit(EXIT_FAILURE); } if(!S_ISFIFO(status.st_mode)) { printf("ftee: %s in not a fifo!\n", fifonam); close(readfd); exit(EXIT_FAILURE); } writefd = open(fifonam, O_WRONLY | O_NONBLOCK); if(-1==writefd) { perror("ftee: writefd: open()"); close(readfd); exit(EXIT_FAILURE); } close(readfd); while(1) { bytes = read(STDIN_FILENO, buffer, sizeof(buffer)); if (bytes < 0 && errno == EINTR) continue; if (bytes <= 0) break; bytes = write(STDOUT_FILENO, buffer, bytes); if(-1==bytes) perror("ftee: writing to stdout"); bytes = write(writefd, buffer, bytes); if(-1==bytes);//Ignoring the errors } close(writefd); return(0); } 

Você pode compilar com este comando padrão:

$ gcc ftee.c -o ftee

Você pode verificá-lo rapidamente, por exemplo:

$ ping www.google.com | ftee /tmp/mylog

$ cat /tmp/mylog

Observe também - este não é um multiplexador. Você só pode ter um processo fazendo $ cat /tmp/mylog por vez.

Este é um tópico (muito) antigo, mas acabei tendo um problema semelhante ultimamente. Na verdade, o que eu precisava era de uma clonagem de stdin para stdout com uma cópia para um pipe que não fosse de bloqueio. A resposta proposta na primeira resposta realmente ajudou, mas foi (para meu caso de uso) muito volátil. Significa que perdi dados que poderia ter processado se tivesse chegado a tempo.

O cenário que enfrentei é que tenho um processo (algum_processo) que agrega alguns dados e grava seus resultados a cada três segundos para o stdout. A configuração (simplificada) ficou assim (na configuração real estou usando um pipe nomeado):

 some_process | ftee >(onlineAnalysis.pl > results) | gzip > raw_data.gz 

Agora, o raw_data.gz precisa ser compactado e deve estar completo. Ftee faz esse trabalho muito bem. Mas o pipe que eu estou usando no meio era muito lento para pegar os dados liberados – mas era rápido o suficiente para processar tudo se pudesse chegar até ele, o que foi testado com um tee normal. No entanto, um tee normal bloqueia se alguma coisa acontecer com o pipe sem nome, e como eu quero ser capaz de me ligar por demanda, tee não é uma opção. Voltar ao tópico: Melhorou quando coloquei um buffer no meio, resultando em:

 some_process | ftee >(mbuffer -m 32M| onlineAnalysis.pl > results) | gzip > raw_data.gz 

Mas isso ainda estava perdendo dados que eu poderia ter processado. Então fui em frente e ampliei o limite proposto antes para uma versão em buffer (bftee). Ele ainda tem todas as mesmas propriedades, mas usa um buffer interno (ineficiente?) No caso de uma gravação falhar. Ele ainda perde dados se o buffer ficar cheio, mas funciona perfeitamente no meu caso. Como sempre, há muito espaço para melhorias, mas como eu copiei o código para fora daqui, eu gostaria de compartilhá-lo de volta com pessoas que possam ter um uso para ele.

 /* bftee - clone stdin to stdout and to a buffered, non-blocking pipe (c) racic@stackoverflow (c) fabraxias@stackoverflow WTFPL Licence */ #include  #include  #include  #include  #include  #include  #include  #include  #include  // the number of sBuffers that are being held at a maximum #define BUFFER_SIZE 4096 #define BLOCK_SIZE 2048 typedef struct { char data[BLOCK_SIZE]; int bytes; } sBuffer; typedef struct { sBuffer *data; //array of buffers int bufferSize; // number of buffer in data int start; // index of the current start buffer int end; // index of the current end buffer int active; // number of active buffer (currently in use) int maxUse; // maximum number of buffers ever used int drops; // number of discarded buffer due to overflow int sWrites; // number of buffer written to stdout int pWrites; // number of buffers written to pipe } sQueue; void InitQueue(sQueue*, int); // initialized the Queue void PushToQueue(sQueue*, sBuffer*, int); // pushes a buffer into Queue at the end sBuffer *RetrieveFromQueue(sQueue*); // returns the first entry of the buffer and removes it or NULL is buffer is empty sBuffer *PeakAtQueue(sQueue*); // returns the first entry of the buffer but does not remove it. Returns NULL on an empty buffer void ShrinkInQueue(sQueue *queue, int); // shrinks the first entry of the buffer by n-bytes. Buffer is removed if it is empty void DelFromQueue(sQueue *queue); // removes the first entry of the queue static void sigUSR1(int); // signal handled for SUGUSR1 - used for stats output to stderr static void sigINT(int); // signla handler for SIGKILL/SIGTERM - allows for a graceful stop ? sQueue queue; // Buffer storing the overflow volatile int quit; // for quiting the main loop int main(int argc, char *argv[]) { int readfd, writefd; struct stat status; char *fifonam; sBuffer buffer; ssize_t bytes; int bufferSize = BUFFER_SIZE; signal(SIGPIPE, SIG_IGN); signal(SIGUSR1, sigUSR1); signal(SIGTERM, sigINT); signal(SIGINT, sigINT); /** Handle commandline args and open the pipe for non blocking writing **/ if(argc < 2 || argc > 3) { printf("Usage:\n someprog 2>&1 | %s FIFO [BufferSize]\n" "FIFO - path to a named pipe, required argument\n" "BufferSize - temporary Internal buffer size in case write to FIFO fails\n", argv[0]); exit(EXIT_FAILURE); } fifonam = argv[1]; if (argc == 3) { bufferSize = atoi(argv[2]); if (bufferSize == 0) bufferSize = BUFFER_SIZE; } readfd = open(fifonam, O_RDONLY | O_NONBLOCK); if(-1==readfd) { perror("bftee: readfd: open()"); exit(EXIT_FAILURE); } if(-1==fstat(readfd, &status)) { perror("bftee: fstat"); close(readfd); exit(EXIT_FAILURE); } if(!S_ISFIFO(status.st_mode)) { printf("bftee: %s in not a fifo!\n", fifonam); close(readfd); exit(EXIT_FAILURE); } writefd = open(fifonam, O_WRONLY | O_NONBLOCK); if(-1==writefd) { perror("bftee: writefd: open()"); close(readfd); exit(EXIT_FAILURE); } close(readfd); InitQueue(&queue, bufferSize); quit = 0; while(!quit) { // read from STDIN bytes = read(STDIN_FILENO, buffer.data, sizeof(buffer.data)); // if read failed due to interrupt, then retry, otherwise STDIN has closed and we should stop reading if (bytes < 0 && errno == EINTR) continue; if (bytes <= 0) break; // save the number if read bytes in the current buffer to be processed buffer.bytes = bytes; // this is a blocking write. As long as buffer is smaller than 4096 Bytes, the write is atomic to a pipe in Linux // thus, this cannot be interrupted. however, to be save this should handle the error cases of partial or interrupted write none the less. bytes = write(STDOUT_FILENO, buffer.data, buffer.bytes); queue.sWrites++; if(-1==bytes) { perror("ftee: writing to stdout"); break; } sBuffer *tmpBuffer = NULL; // if the queue is empty (tmpBuffer gets set to NULL) the this does nothing - otherwise it tries to write // the buffered data to the pipe. This continues until the Buffer is empty or the write fails. // NOTE: bytes cannot be -1 (that would have failed just before) when the loop is entered. while ((bytes != -1) && (tmpBuffer = PeakAtQueue(&queue)) != NULL) { // write the oldest buffer to the pipe bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes); // the written bytes are equal to the buffer size, the write is successful - remove the buffer and continue if (bytes == tmpBuffer->bytes) { DelFromQueue(&queue); queue.pWrites++; } else if (bytes > 0) { // on a positive bytes value there was a partial write. we shrink the current buffer // and handle this as a write failure ShrinkInQueue(&queue, bytes); bytes = -1; } } // There are several cases here: // 1.) The Queue is empty -> bytes is still set from the write to STDOUT. in this case, we try to write the read data directly to the pipe // 2.) The Queue was not empty but is now -> bytes is set from the last write (which was successful) and is bigger 0. also try to write the data // 3.) The Queue was not empty and still is not -> there was a write error before (even partial), and bytes is -1. Thus this line is skipped. if (bytes != -1) bytes = write(writefd, buffer.data, buffer.bytes); // again, there are several cases what can happen here // 1.) the write before was successful -> in this case bytes is equal to buffer.bytes and nothing happens // 2.) the write just before is partial or failed all together - bytes is either -1 or smaller than buffer.bytes -> add the remaining data to the queue // 3.) the write before did not happen as the buffer flush already had an error. In this case bytes is -1 -> add the remaining data to the queue if (bytes != buffer.bytes) PushToQueue(&queue, &buffer, bytes); else queue.pWrites++; } // once we are done with STDIN, try to flush the buffer to the named pipe if (queue.active > 0) { //set output buffer to block - here we wait until we can write everything to the named pipe // --> this does not seem to work - just in case there is a busy loop that waits for buffer flush aswell. int saved_flags = fcntl(writefd, F_GETFL); int new_flags = saved_flags & ~O_NONBLOCK; int res = fcntl(writefd, F_SETFL, new_flags); sBuffer *tmpBuffer = NULL; //TODO: this does not handle partial writes yet while ((tmpBuffer = PeakAtQueue(&queue)) != NULL) { int bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes); if (bytes != -1) DelFromQueue(&queue); } } close(writefd); } /** init a given Queue **/ void InitQueue (sQueue *queue, int bufferSize) { queue->data = calloc(bufferSize, sizeof(sBuffer)); queue->bufferSize = bufferSize; queue->start = 0; queue->end = 0; queue->active = 0; queue->maxUse = 0; queue->drops = 0; queue->sWrites = 0; queue->pWrites = 0; } /** push a buffer into the Queue**/ void PushToQueue(sQueue *queue, sBuffer *p, int offset) { if (offset < 0) offset = 0; // offset cannot be smaller than 0 - if that is the case, we were given an error code. Set it to 0 instead if (offset == p->bytes) return; // in this case there are 0 bytes to add to the queue. Nothing to write // this should never happen - offset cannot be bigger than the buffer itself. Panic action if (offset > p->bytes) {perror("got more bytes to buffer than we read\n"); exit(EXIT_FAILURE);} // debug output on a partial write. TODO: remove this line // if (offset > 0 ) fprintf(stderr, "partial write to buffer\n"); // copy the data from the buffer into the queue and remember its size memcpy(queue->data[queue->end].data, p->data + offset , p->bytes-offset); queue->data[queue->end].bytes = p->bytes - offset; // move the buffer forward queue->end = (queue->end + 1) % queue->bufferSize; // there is still space in the buffer if (queue->active < queue->bufferSize) { queue->active++; if (queue->active > queue->maxUse) queue->maxUse = queue->active; } else { // Overwriting the oldest. Move start to next-oldest queue->start = (queue->start + 1) % queue->bufferSize; queue->drops++; } } /** return the oldest entry in the Queue and remove it or return NULL in case the Queue is empty **/ sBuffer *RetrieveFromQueue(sQueue *queue) { if (!queue->active) { return NULL; } queue->start = (queue->start + 1) % queue->bufferSize; queue->active--; return &(queue->data[queue->start]); } /** return the oldest entry in the Queue or NULL if the Queue is empty. Does not remove the entry **/ sBuffer *PeakAtQueue(sQueue *queue) { if (!queue->active) { return NULL; } return &(queue->data[queue->start]); } /*** Shrinks the oldest entry i the Queue by bytes. Removes the entry if buffer of the oldest entry runs empty*/ void ShrinkInQueue(sQueue *queue, int bytes) { // cannot remove negative amount of bytes - this is an error case. Ignore it if (bytes <= 0) return; // remove the entry if the offset is equal to the buffer size if (queue->data[queue->start].bytes == bytes) { DelFromQueue(queue); return; }; // this is a partial delete if (queue->data[queue->start].bytes > bytes) { //shift the memory by the offset memmove(queue->data[queue->start].data, queue->data[queue->start].data + bytes, queue->data[queue->start].bytes - bytes); queue->data[queue->start].bytes = queue->data[queue->start].bytes - bytes; return; } // panic is the are to remove more than we have the buffer if (queue->data[queue->start].bytes < bytes) { perror("we wrote more than we had - this should never happen\n"); exit(EXIT_FAILURE); return; } } /** delete the oldest entry from the queue. Do nothing if the Queue is empty **/ void DelFromQueue(sQueue *queue) { if (queue->active > 0) { queue->start = (queue->start + 1) % queue->bufferSize; queue->active--; } } /** Stats output on SIGUSR1 **/ static void sigUSR1(int signo) { fprintf(stderr, "Buffer use: %i (%i/%i), STDOUT: %i PIPE: %i:%i\n", queue.active, queue.maxUse, queue.bufferSize, queue.sWrites, queue.pWrites, queue.drops); } /** handle signal for terminating **/ static void sigINT(int signo) { quit++; if (quit > 1) exit(EXIT_FAILURE); } 

Esta versão aceita mais um argumento (opcional) que especifica o número de blocos que devem ser armazenados em buffer para o pipe. Minha chamada de amostra agora é assim:

 some_process | bftee >(onlineAnalysis.pl > results) 16384 | gzip > raw_data.gz 

resultando em 16384 blocos a serem armazenados antes que ocorram os descartes. isso usa cerca de 32 Mbytes mais memory, mas … quem se importa?

É claro que, no ambiente real, estou usando um pipe nomeado para que eu possa append e desconectar conforme necessário. Há parece com isso:

 mkfifo named_pipe some_process | bftee named_pipe 16384 | gzip > raw_data.gz & cat named_pipe | onlineAnalysis.pl > results 

Além disso, o processo reage aos sinais da seguinte forma: SIGUSR1 -> contadores de impressão para STDERR SIGTERM, SIGINT -> primeiro sai do loop principal e descarrega o buffer para o pipe, o segundo encerra o programa imediatamente.

Talvez isso ajude alguém no futuro … Aproveite

No entanto, isso criaria um arquivo de log crescente, mesmo se não for usado até que a unidade fique sem espaço.

Por que não periodicamente gira os logs? Existe até um programa para fazer isso para você logrotate .

Há também um sistema para gerar mensagens de log e fazer coisas diferentes com eles de acordo com o tipo. É chamado syslog .

Você poderia até combinar os dois. Faça seu programa gerar mensagens syslog, configure o syslog para colocá-las em um arquivo e use logrotate para garantir que elas não preencham o disco.


Se descobriu que você estava escrevendo para um pequeno sistema embarcado e a saída do programa é pesada, há uma variedade de técnicas que você pode considerar.

  • Syslog remoto: envia as mensagens do syslog para um servidor syslog na rede.
  • Use os níveis de severidade disponíveis no syslog para fazer coisas diferentes com as mensagens. Por exemplo, descarte “INFO”, mas registre e envie “ERR” ou superior. Por exemplo, para consolar
  • Use um manipulador de sinal em seu programa para reler a configuração no HUP e variar a geração de registros “on demand” dessa maneira.
  • Peça ao seu programa para escutar em um soquete unix e escrever mensagens quando aberto. Você poderia até implementar e console interativo em seu programa dessa maneira.
  • Usando um arquivo de configuração, forneça controle granular da saída de log.

BusyBox frequentemente usado em dispositivos embarcados pode criar um log de memory RAM

 syslogd -C 

que pode ser preenchido por

 logger 

e lido por

 logread 

Funciona muito bem, mas fornece apenas um log global.

Parece que o operador de redirecionamento bash <> ( 3.6.10 Abrindo Descritores de Arquivo para Leitura e Escrita ) faz a gravação no arquivo / fifo aberta com ele sem bloqueio. Isso deve funcionar:

 $ mkfifo /tmp/mylog $ exec 4<>/tmp/mylog $ myprogram 2>&1 | tee >&4 $ cat /tmp/mylog # on demend 

Solução dada por gniourf_gniourf no canal IRC #bash.

Se você puder instalar a canvas no dispositivo incorporado, poderá executar o ‘myprogram’, desanexá-lo e reconectá-lo sempre que quiser ver o log. Algo como:

 $ screen -t sometitle myprogram Hit Ctrl+A, then d to detach it. 

Sempre que você quiser ver a saída, reconecte-a:

 $ screen -DR sometitle Hit Ctrl-A, then d to detach it again. 

Desta forma, você não terá que se preocupar com a saída do programa usando o espaço em disco.

O problema com a abordagem dada pelo fifo é que a coisa toda irá travar quando o buffer do pipe estiver sendo preenchido e nenhum processo de leitura estiver ocorrendo.

Para a abordagem fifo funcionar, acho que você teria que implementar um modelo cliente-servidor de pipe nomeado semelhante ao mencionado em BASH: Melhor arquitetura para leitura de dois streams de input (consulte código ligeiramente modificado abaixo, código de amostra 2).

Para uma solução alternativa, você também pode usar uma construção while ... read vez de colocar o stdout em um pipe nomeado implementando um mecanismo de contagem dentro do while ... read loop de while ... read que sobrescreverá o arquivo de log periodicamente por um número especificado de linhas. Isso evitaria um arquivo de log crescente (código de amostra 1).

 # sample code 1 # terminal window 1 rm -f /tmp/mylog touch /tmp/mylog while sleep 2; do date '+%Y-%m-%d_%H.%M.%S'; done 2>&1 | while IFS="" read -r line; do lno=$((lno+1)) #echo $lno array[${lno}]="${line}" if [[ $lno -eq 10 ]]; then lno=$((lno+1)) array[${lno}]="-------------" printf '%s\n' "${array[@]}" > /tmp/mylog unset lno array fi printf '%s\n' "${line}" done # terminal window 2 tail -f /tmp/mylog #------------------------ # sample code 2 # code taken from: # https://stackoverflow.com/questions/6702474/bash-best-architecture-for-reading-from-two-input-streams # terminal window 1 # server ( rm -f /tmp/to /tmp/from mkfifo /tmp/to /tmp/from while true; do while IFS="" read -r -d $'\n' line; do printf '%s\n' "${line}" done /tmp/from & bgpid=$! exec 3>/tmp/to exec 4/tmp/to; exec 4 /dev/null else printf 'line from fifo: %s\n' "$line" > /dev/null fi done & trap "kill -TERM $"'!; exit' 1 2 3 13 15 while IFS="" read -r -d $'\n' line; do # can we make it atomic? # sleep 0.5 # dd if=/tmp/to iflag=nonblock of=/dev/null # flush fifo printf '\177%s\n' "${line}" done >&3 ) & # kill -TERM $! # terminal window 2 # tests echo hello > /tmp/to yes 1 | nl > /tmp/to yes 1 | nl | tee /tmp/to while sleep 2; do date '+%Y-%m-%d_%H.%M.%S'; done 2>&1 | tee -a /tmp/to # terminal window 3 cat /tmp/to | head -n 10 

Se o seu processo gravar em qualquer arquivo de log e, em seguida, limpar o arquivo e iniciar novamente de vez em quando, ele não ficará muito grande ou usará o logrotate.

tail –follow = name –retry my.log

É tudo o que você precisa. Você terá o máximo de rolagem do seu terminal.

Nada não padrão é necessário.

Eu não tentei com pequenos arquivos de log, mas todos os nossos logs rodam assim e eu nunca notei perder linhas.