C # produtor / consumidor

Eu recentemente encontrei uma implementação c # de padrão produtor / consumidor. é muito simples e (para mim pelo menos) muito elegante.

parece ter sido concebido por volta de 2006, então eu queria saber se essa implementação é
– seguro
– ainda aplicável

Código está abaixo (código original foi referenciado em http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375 )

using System; using System.Collections; using System.Threading; public class Test { static ProducerConsumer queue; static void Main() { queue = new ProducerConsumer(); new Thread(new ThreadStart(ConsumerJob)).Start(); Random rng = new Random(0); for (int i=0; i < 10; i++) { Console.WriteLine ("Producing {0}", i); queue.Produce(i); Thread.Sleep(rng.Next(1000)); } } static void ConsumerJob() { // Make sure we get a different random seed from the // first thread Random rng = new Random(1); // We happen to know we've only got 10 // items to receive for (int i=0; i < 10; i++) { object o = queue.Consume(); Console.WriteLine ("\t\t\t\tConsuming {0}", o); Thread.Sleep(rng.Next(1000)); } } } public class ProducerConsumer { readonly object listLock = new object(); Queue queue = new Queue(); public void Produce(object o) { lock (listLock) { queue.Enqueue(o); // We always need to pulse, even if the queue wasn't // empty before. Otherwise, if we add several items // in quick succession, we may only pulse once, waking // a single thread up, even if there are multiple threads // waiting for items. Monitor.Pulse(listLock); } } public object Consume() { lock (listLock) { // If the queue is empty, wait for an item to be added // Note that this is a while loop, as we may be pulsed // but not wake up before another thread has come in and // consumed the newly added object. In that case, we'll // have to wait for another pulse. while (queue.Count==0) { // This releases listLock, only reacquiring it // after being woken up by a call to Pulse Monitor.Wait(listLock); } return queue.Dequeue(); } } } 

    O código é mais antigo que isso – escrevi algum tempo antes do .NET 2.0 ser lançado. O conceito de uma fila de produtor / consumidor é bem mais antigo que isso 🙂

    Sim, esse código é seguro, tanto quanto eu sei – mas tem algumas deficiências:

    • Não é genérico. Uma versão moderna seria certamente genérica.
    • Não tem como parar a fila. Uma maneira simples de parar a fila (para que todos os segmentos de consumidores se aposentem) é ter um token “stop work” que pode ser colocado na fila. Você adiciona tantos tokens quanto os threads. Como alternativa, você tem um sinalizador separado para indicar que deseja parar. (Isso permite que os outros segmentos parem antes de terminar todo o trabalho atual na fila.)
    • Se os trabalhos forem muito pequenos, consumir um único trabalho de cada vez pode não ser a coisa mais eficiente a ser feita.

    As idéias por trás do código são mais importantes do que o próprio código, para ser honesto.

    Você poderia fazer algo como o seguinte trecho de código. Ele é genérico e tem um método para enfileirar nulos (ou qualquer sinalizador que você gostaria de usar) para instruir os threads de trabalho a serem encerrados.

    O código é tirado daqui: http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse

     using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; namespace ConsoleApplication1 { public class TaskQueue : IDisposable where T : class { object locker = new object(); Thread[] workers; Queue taskQ = new Queue(); public TaskQueue(int workerCount) { workers = new Thread[workerCount]; // Create and start a separate thread for each worker for (int i = 0; i < workerCount; i++) (workers[i] = new Thread(Consume)).Start(); } public void Dispose() { // Enqueue one null task per worker to make each exit. foreach (Thread worker in workers) EnqueueTask(null); foreach (Thread worker in workers) worker.Join(); } public void EnqueueTask(T task) { lock (locker) { taskQ.Enqueue(task); Monitor.PulseAll(locker); } } void Consume() { while (true) { T task; lock (locker) { while (taskQ.Count == 0) Monitor.Wait(locker); task = taskQ.Dequeue(); } if (task == null) return; // This signals our exit Console.Write(task); Thread.Sleep(1000); // Simulate time-consuming task } } } } 

    No passado aprendi como funciona o Monitor.Wait / Pulse (e muito sobre encadeamentos em geral) do código acima e da série de artigos de onde ele é. Então, como Jon diz, isso tem muito valor e é de fato seguro e aplicável.

    No entanto, a partir do .NET 4, há uma implementação de fila produtor-consumidor no framework . Eu só encontrei eu mesmo, mas até agora, ele faz tudo que eu preciso.

    Aviso: Se você ler os comentários, você entenderá que minha resposta está errada 🙂

    Existe um possível impasse no seu código.

    Imagine o seguinte caso, para maior clareza, usei uma abordagem de thread único, mas deveria ser fácil de converter para multi-thread com sleep:

     // We create some actions... object locker = new object(); Action action1 = () => { lock (locker) { System.Threading.Monitor.Wait(locker); Console.WriteLine("This is action1"); } }; Action action2 = () => { lock (locker) { System.Threading.Monitor.Wait(locker); Console.WriteLine("This is action2"); } }; // ... (stuff happens, etc.) // Imagine both actions were running // and there's 0 items in the queue // And now the producer kicks in... lock (locker) { // This would add a job to the queue Console.WriteLine("Pulse now!"); System.Threading.Monitor.Pulse(locker); } // ... (more stuff) // and the actions finish now! Console.WriteLine("Consume action!"); action1(); // Oops... they're locked... action2(); 

    Por favor, deixe-me saber se isso não faz sentido.

    Se isso for confirmado, a resposta à sua pergunta é “não, não é seguro”;) Espero que isso ajude.