Buffer circular livre de bloqueio

Eu estou no processo de projetar um sistema que se conecta a um ou mais streams de feeds de dados e fazer algumas análises nos dados do que triggersr events com base no resultado. Em uma configuração típica de produtor / consumidor multiencadeado, terei vários encadeamentos de produtores colocando dados em uma fila e vários encadeamentos de consumidores lendo os dados, e os consumidores estão interessados ​​apenas no último ponto de dados mais n número de pontos. Os encadeamentos do produtor terão que bloquear se o consumidor lento não puder acompanhar e, é claro, os encadeamentos do consumidor bloquearão quando não houver atualizações não processadas. Usar uma fila simultânea típica com bloqueio de leitor / gravador funcionará muito bem, mas a taxa de dados chegando pode ser enorme, então eu queria reduzir minha sobrecarga de bloqueio, especialmente bloqueios de gravação para os produtores. Eu acho que um buffer circular livre de bloqueio é o que eu precisava.

Agora duas perguntas:

  1. O buffer sem bloqueio circular é a resposta?

  2. Se assim for, antes de eu rolar, você conhece alguma implementação pública que atenda à minha necessidade?

Quaisquer pointers na implementação de um buffer circular livre de bloqueio são sempre bem-vindos.

BTW, fazendo isso em C ++ no Linux.

Algumas informações adicionais:

O tempo de resposta é crítico para o meu sistema. O ideal é que os segmentos de consumidores desejem ver as atualizações chegando o mais rápido possível, pois um atraso extra de 1 milissegundo pode tornar o sistema sem valor ou valer muito menos.

A idéia de design que estou inclinando é um buffer circular semi-lock-free onde o thread produtor coloca os dados no buffer o mais rápido que puder, vamos chamar o head do buffer A, sem bloquear a menos que o buffer esteja cheio, quando A atende ao final do buffer Z. Os threads do consumidor terão cada um dois pointers para o buffer circular, P e P n , onde P é o cabeçote do buffer local do encadeamento, e P n é o nth item após o P. Cada encadeamento consumidor avançará seu P e P n uma vez que terminar de processar a corrente P e o final do ponteiro de buffer Z é avançado com o P n mais lento. Quando P alcança até A, o que significa que não há mais nova atualização para processar, o consumidor gira e espera ocupado por A para avançar novamente. Se o thread do consumidor girar por muito tempo, ele pode ser colocado em espera e esperar por uma variável de condição, mas estou bem com o consumidor ocupando o ciclo da CPU esperando por atualização porque isso não aumenta minha latência (terei mais núcleos de CPU de threads). Imagine que você tenha uma faixa circular, e o produtor está correndo na frente de um grupo de consumidores, a chave é ajustar o sistema de modo que o produtor esteja correndo apenas um passo à frente dos consumidores, e a maioria dessas operações pode ser feito usando técnicas livres de bloqueio. Eu entendo que obter os detalhes da implementação correta não é fácil … ok, muito difícil, é por isso que quero aprender com os erros dos outros antes de fazer alguns dos meus.

Fiz um estudo particular de estruturas de dados sem bloqueio nos últimos dois anos. Eu li a maioria dos artigos no campo (há apenas cerca de quarenta ou mais – embora apenas cerca de dez ou quinze sejam de uso real 🙂

AFAIK, um buffer circular livre de bloqueio não foi inventado. O problema será lidar com a condição complexa em que um leitor ultrapassa um escritor ou vice-versa.

Se você não tiver passado pelo menos seis meses estudando estruturas de dados sem bloqueio, não tente escrever uma. Você entenderá errado e pode não ser óbvio para você que existem erros, até que seu código falhe, após a implantação, em novas plataformas.

Eu acredito no entanto, há uma solução para sua exigência.

Você deve emparelhar uma fila livre de bloqueio com uma lista livre sem bloqueio.

A lista livre lhe dará pré-alocação e evitará a exigência (fiscalmente cara) de um alocador sem bloqueio; quando a lista livre está vazia, você replica o comportamento de um buffer circular, desqualificando instantaneamente um elemento da fila e usando isso.

(Claro que, em um buffer circular baseado em bloqueio, uma vez que o bloqueio é obtido, obter um elemento é muito rápido – basicamente apenas uma referência de ponteiro – mas você não conseguirá isso em nenhum algoritmo livre de bloqueio; bem fora do seu caminho para fazer as coisas, a sobrecarga de falhar um pop de lista livre seguido por um desenfileiramento está em pé de igualdade com a quantidade de trabalho que qualquer algoritmo livre de bloqueio precisará estar fazendo).

Michael e Scott desenvolveram uma boa fila sem bloqueios em 1996. Um link abaixo lhe dará detalhes suficientes para rastrear o PDF de seu artigo; Michael e Scott, FIFO

Uma lista livre livre de bloqueios é o algoritmo mais simples e livre de bloqueios e, na verdade, acho que não vi um artigo para ele.

O termo de arte para o que você quer é uma fila livre de bloqueio . Há um excelente conjunto de notas com links para códigos e documentos de Ross Bencina. O cara cujo trabalho eu mais confio é Maurice Herlihy (para os americanos, ele pronuncia seu primeiro nome como “Morris”).

A exigência de que produtores ou consumidores bloqueiem se o buffer estiver vazio ou cheio sugere que você deve usar uma estrutura de dados de bloqueio normal, com semáforos ou variables ​​de condição para fazer com que os produtores e consumidores bloqueiem até que os dados estejam disponíveis. Código livre de bloqueio geralmente não bloqueia em tais condições – ele gira ou abandona as operações que não podem ser feitas em vez de bloquear usando o sistema operacional. (Se você pode esperar até que outro segmento produza ou consuma dados, por que esperar um bloqueio por outro segmento para concluir a atualização da estrutura de dados?)

No Linux (x86 / x64), a synchronization intra-thread usando mutexes é razoavelmente barata se não houver contenção. Concentre-se em minimizar o tempo que os produtores e consumidores precisam para manter seus bloqueios. Como você disse que só se importa com os últimos N pontos de dados registrados, acho que um buffer circular seria razoavelmente bom. No entanto, eu realmente não entendo como isso se encheckbox no requisito de bloqueio e na ideia de consumidores realmente consumindo (removendo) os dados que lêem. (Você quer que os consumidores olhem apenas os últimos N pontos de dados, e não os removam? Você quer que os produtores não se importem se os consumidores não conseguirem acompanhar e apenas sobregravar os dados antigos?)

Além disso, como Zan Lynx comentou, você pode agregar / armazenar em buffer seus dados em partes maiores quando tiver muita input. Você pode armazenar em buffer um número fixo de pontos ou todos os dados recebidos dentro de um determinado período de tempo. . Isso significa que haverá menos operações de synchronization. Ele introduz latência, mas se você não estiver usando Linux em tempo real, então terá que lidar com isso de qualquer maneira.

Há uma boa série de artigos sobre isso no DDJ . Como um sinal de quão difícil isso pode ser, é uma correção em um artigo anterior que errou. Certifique-se de compreender os erros antes de rolar o seu próprio) -;

A implementação na biblioteca de incentivo vale a pena considerar. É fácil de usar e de alto desempenho. Eu escrevi um teste & corri em um laptop quad core i7 (8 threads) e obtive ~ 4M enfileiramento / enfileiramento operações por segundo. Outra implementação não mencionada até agora é a fila do MPMC em http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue . Eu fiz alguns testes simples com esta implementação no mesmo laptop com 32 produtores e 32 consumidores. É, como anunciado, mais rápido que o aumento da fila sem bloqueios.

Como a maioria das outras respostas afirmam que a programação sem bloqueios é difícil. A maioria das implementações terá dificuldade em detectar casos de canto que exigem muitos testes e debugging para correção. Normalmente, eles são corrigidos com o posicionamento cuidadoso das barreiras de memory no código. Você também encontrará provas de correção publicadas em muitos dos artigos acadêmicos. Eu prefiro testar essas implementações com uma ferramenta de força bruta. Qualquer algoritmo sem trava que você planeja usar na produção deve ser verificado quanto à exatidão usando uma ferramenta como http://research.microsoft.com/en-us/um/people/lamport/tla/tla.html .

Uma técnica útil para reduzir a contenção é transformar os itens em várias filas e ter cada consumidor dedicado a um “tópico”.

Para o número mais recente de itens em que seus consumidores estão interessados ​​- você não deseja bloquear toda a fila e iterá-la para encontrar um item a ser substituído – basta publicar itens em N-tuplas, ou seja, todos os N itens recentes. Pontos de bônus para implementação em que o produtor bloquearia na fila completa (quando os consumidores não conseguem acompanhar) com um tempo limite, atualizando seu cache de tupla local – dessa forma, você não coloca pressão de volta na fonte de dados.

Eu concordaria com este artigo e recomendaria contra o uso de estruturas de dados livres de bloqueio. Um artigo relativamente recente sobre filas de fifo livres de bloqueio é este , busque outros artigos do (s) mesmo (s) autor (es); Há também uma tese de doutorado sobre Chalmers sobre estruturas de dados sem bloqueio (eu perdi o link). No entanto, você não disse o tamanho de seus elementos – estruturas de dados livres de bloqueio funcionam eficientemente somente com itens do tamanho de uma palavra, então você terá que alocar dinamicamente seus elementos se eles forem maiores que uma palavra de máquina (32 ou 64 bits). Se você alocar dinamicamente elementos, você desloca o (supostamente, já que você não projetou seu programa e você está basicamente fazendo otimização prematura) afunilamento ao alocador de memory, então você precisa de um alocador de memory livre de bloqueio, por exemplo, Streamflow , e integrar com o seu aplicativo.

A fila de Sutter é sub-ótima e ele sabe disso. A arte da programação Multicore é uma ótima referência, mas não confie nos caras do Java em modelos de memory, ponto final. Os links de Ross não lhe darão uma resposta definitiva, porque eles tinham suas bibliotecas em tais problemas e assim por diante.

Fazer uma programação sem bloqueios é pedir problemas, a menos que você queira gastar muito tempo em algo que você está claramente superengenharia antes de resolver o problema (a julgar pela descrição disso, é uma loucura comum “procurar a perfeição”. ’em coerência de cache). Leva anos e leva a não resolver os problemas primeiro e otimizar depois, uma doença comum.

Eu não sou especialista em modelos de memory de hardware e bloqueio de estruturas de dados livres e tenho a tendência de evitar o uso desses em meus projetos e vou com estruturas tradicionais de dados bloqueados.

No entanto, notei recentemente que o vídeo: fila SPSC sem bloqueio baseada no buffer de toque

Isso é baseado em uma biblioteca Java de software livre de alto desempenho chamada distruptor LMAX usada por um sistema de negociação: LMAX Distruptor

Com base na apresentação acima, você faz os pointers da cabeça e da cauda atômicos e atômicos verificam a condição em que a cabeça pega a cauda por trás ou vice-versa.

Abaixo você pode ver uma implementação muito básica do C ++ 11:

// USING SEQUENTIAL MEMORY #include #include #include  using namespace std; #define RING_BUFFER_SIZE 1024 // power of 2 for efficient % class lockless_ring_buffer_spsc { public : lockless_ring_buffer_spsc() { write.store(0); read.store(0); } bool try_push(int64_t val) { const auto current_tail = write.load(); const auto next_tail = increment(current_tail); if (next_tail != read.load()) { buffer[current_tail] = val; write.store(next_tail); return true; } return false; } void push(int64_t val) { while( ! try_push(val) ); // TODO: exponential backoff / sleep } bool try_pop(int64_t* pval) { auto currentHead = read.load(); if (currentHead == write.load()) { return false; } *pval = buffer[currentHead]; read.store(increment(currentHead)); return true; } int64_t pop() { int64_t ret; while( ! try_pop(&ret) ); // TODO: exponential backoff / sleep return ret; } private : std::atomic write; std::atomic read; static const int64_t size = RING_BUFFER_SIZE; int64_t buffer[RING_BUFFER_SIZE]; int64_t increment(int n) { return (n + 1) % size; } }; int main (int argc, char** argv) { lockless_ring_buffer_spsc queue; std::thread write_thread( [&] () { for(int i = 0; i<1000000; i++) { queue.push(i); } } // End of lambda expression ); std::thread read_thread( [&] () { for(int i = 0; i<1000000; i++) { queue.pop(); } } // End of lambda expression ); write_thread.join(); read_thread.join(); return 0; } 

Este é um thread antigo, mas como ainda não foi mencionado, há um FIFO livre de bloqueio, circular, 1 produtor -> 1, disponível no framework JUCE C ++.

https://www.juce.com/doc/classAbstractFifo#details

Confira Disruptor ( Como usá-lo ), que é um buffer de anel que vários segmentos podem se inscrever:

Embora essa seja uma pergunta antiga, ninguém mencionou o buffer de campainha sem trava do DPDK. É um buffer de anel de alto rendimento que suporta vários produtores e vários consumidores. Ele também fornece modos de consumidor único e produtor único, e o buffer de anel está livre de espera no modo SPSC. Está escrito em C e suporta várias arquiteturas.

Além disso, ele suporta os modos Bulk e Burst nos quais os itens podem ser enfileirados / desenfileirados em massa. O design permite que vários consumidores ou vários produtores gravem na fila ao mesmo tempo simplesmente reservando o espaço por meio da movimentação de um ponteiro atômico.

Aqui está como eu faria:

  • mapear a fila em um array
  • manter o estado com uma próxima leitura e na próxima próxima escrever índices
  • manter um vetor de bit cheio vazio

Inserção consiste em usar um CAS com um incremento e rolar na próxima gravação. Depois de ter um slot, adicione seu valor e defina o bit vazio / completo que corresponda a ele.

As remoções requerem uma verificação do bit antes para testar em underflows, mas, além disso, são as mesmas que para a gravação, mas usando o índice de leitura e limpando o bit vazio / completo.

Esteja avisado,

  1. Eu não sou especialista nessas coisas
  2. atômicos ASM ops parecem ser muito lentos quando eu os usei assim se você acabar com mais que alguns deles, você poderia ser mais rápido usar fechaduras embutidas dentro das funções de inserção / remoção. A teoria é que uma única opção atômica para agarrar a trava seguida por (muito) poucas operações ASM não atômicas pode ser mais rápida do que a mesma coisa feita por várias operações atômicas. Mas para fazer esse trabalho seria necessário inlineing manual ou automático, então é tudo um pequeno bloco de ASM.

Apenas para completar: há buffer circular livre de bloqueio bem testado em OtlContainers , mas está escrito em Delphi (TOmniBaseBoundedQueue é buffer circular e TOmniBaseBoundedStack é uma pilha limitada). Há também uma fila ilimitada na mesma unidade (TOmniBaseQueue). A fila ilimitada é descrita na fila de bloqueio dynamic – fazendo isso da maneira certa . A implementação inicial da fila limitada (buffer circular) foi descrita em uma fila livre de bloqueio, finalmente! mas o código foi atualizado desde então.

Você pode tentar lfqueue

É simples de usar, é design circular livre de bloqueio

 int *ret; lfqueue_t results; lfqueue_init(&results); /** Wrap This scope in multithread testing **/ int_data = (int*) malloc(sizeof(int)); assert(int_data != NULL); *int_data = i++; /*Enqueue*/ while (lfqueue_enq(&results, int_data) != 1) ; /*Dequeue*/ while ( (ret = lfqueue_deq(&results)) == NULL); // printf("%d\n", *(int*) ret ); free(ret); /** End **/ lfqueue_clear(&results);