Aplicativo C # de multitarefa com chamadas de database do SQL Server

Eu tenho um database do SQL Server com 500.000 registros na tabela main . Existem também outras três tabelas chamadas child1 , child2 e child3 . Os vários relacionamentos entre child1 , child2 , child3 e main são implementados por meio das três tabelas de relacionamento: main_child1_relationship , main_child2_relationship e main_child3_relationship . Eu preciso ler os registros em main , atualizar main , e também inserir nas tabelas de relacionamento novas linhas, bem como inserir novos registros nas tabelas filho. Os registros nas tabelas filhas têm restrições de exclusividade, portanto, o pseudo-código para o cálculo real (CalculateDetails) seria algo como:

 for each record in main { find its child1 like qualities for each one of its child1 qualities { find the record in child1 that matches that quality if found { add a record to main_child1_relationship to connect the two records } else { create a new record in child1 for the quality mentioned add a record to main_child1_relationship to connect the two records } } ...repeat the above for child2 ...repeat the above for child3 } 

Isso funciona bem como um aplicativo único encadeado. Mas é muito lento. O processamento em C # é muito pesado e demora muito. Eu quero transformar isso em um aplicativo multi-threaded.

Qual é a melhor maneira de fazer isso? Estamos usando o Linq para Sql.

Até agora, minha abordagem foi criar um novo object DataContext para cada lote de registros do main e usar ThreadPool.QueueUserWorkItem para processá-lo. No entanto, esses lotes estão pisando nos dedos uns dos outros, porque um segmento adiciona um registro e, em seguida, o próximo thread tenta adicionar o mesmo e … Estou recebendo todos os tipos de bloqueios interessantes do SQL Server mortos.

Aqui está o código:

  int skip = 0; List thisBatch; Queue<List> allBatches = new Queue<List>(); do { thisBatch = allIds .Skip(skip) .Take(numberOfRecordsToPullFromDBAtATime).ToList(); allBatches.Enqueue(thisBatch); skip += numberOfRecordsToPullFromDBAtATime; } while (thisBatch.Count() > 0); while (allBatches.Count() > 0) { RRDataContext rrdc = new RRDataContext(); var currentBatch = allBatches.Dequeue(); lock (locker) { runningTasks++; } System.Threading.ThreadPool.QueueUserWorkItem(x => ProcessBatch(currentBatch, rrdc)); lock (locker) { while (runningTasks > MAX_NUMBER_OF_THREADS) { Monitor.Wait(locker); UpdateGUI(); } } } 

E aqui está o ProcessBatch:

  private static void ProcessBatch( List currentBatch, RRDataContext rrdc) { var topRecords = GetTopRecords(rrdc, currentBatch); CalculateDetails(rrdc, topRecords); rrdc.Dispose(); lock (locker) { runningTasks--; Monitor.Pulse(locker); }; } 

E

  private static List GetTopRecords(RecipeRelationshipsDataContext rrdc, List thisBatch) { List topRecords; topRecords = rrdc.Records .Where(x => thisBatch.Contains(x.Id)) .OrderBy(x => x.OrderByMe).ToList(); return topRecords; } 

CalculateDetails é melhor explicado pelo pseudo-código no topo.

Eu acho que deve haver uma maneira melhor de fazer isso. Por favor ajude. Muito Obrigado!

Aqui está minha opinião sobre o problema:

  • Ao usar vários encadeamentos para inserir / atualizar / consultar dados no SQL Server ou em qualquer database, os deadlocks são um fato. Você tem que assumir que eles irão ocorrer e lidar com eles de forma adequada.

  • Isso não quer dizer que não devemos tentar limitar a ocorrência de deadlocks. No entanto, é fácil ler as causas básicas dos deadlocks e tomar medidas para evitá-los, mas o SQL Server sempre irá surpreendê-lo 🙂

Alguma razão para deadlocks:

  • Muitos encadeamentos – tente limitar o número de encadeamentos a um mínimo, mas é claro que queremos mais encadeamentos para desempenho máximo.

  • Não há índices suficientes. Se as seleções e atualizações não forem seletivas o suficiente, o SQL removerá bloqueios de alcance maiores do que os que estão íntegros. Tente especificar os índices apropriados.

  • Muitos índices. A atualização de índices causa deadlocks, portanto, tente reduzir os índices ao mínimo necessário.

  • Nível de isolamento de transação muito alto. O nível de isolamento padrão ao usar o .NET é ‘Serializable’, enquanto o padrão usando o SQL Server é ‘Read Committed’. Reduzir o nível de isolamento pode ajudar muito (se for o caso, é claro).

É assim que eu posso resolver seu problema:

  • Eu não iria rolar minha própria solução de threading, eu usaria a biblioteca TaskParallel. Meu método principal seria algo como isto:

     using (var dc = new TestDataContext()) { // Get all the ids of interest. // I assume you mark successfully updated rows in some way // in the update transaction. List ids = dc.TestItems.Where(...).Select(item => item.Id).ToList(); var problematicIds = new List(); // Either allow the TaskParallel library to select what it considers // as the optimum degree of parallelism by omitting the // ParallelOptions parameter, or specify what you want. Parallel.ForEach(ids, new ParallelOptions {MaxDegreeOfParallelism = 8}, id => CalculateDetails(id, problematicIds)); } 
  • Executar o método CalculateDetails com novas tentativas de falhas de deadlock

     private static void CalculateDetails(int id, List problematicIds) { try { // Handle deadlocks DeadlockRetryHelper.Execute(() => CalculateDetails(id)); } catch (Exception e) { // Too many deadlock retries (or other exception). // Record so we can diagnose problem or retry later problematicIds.Add(new ErrorType(id, e)); } } 
  • O método principal CalculateDetails

     private static void CalculateDetails(int id) { // Creating a new DeviceContext is not expensive. // No need to create outside of this method. using (var dc = new TestDataContext()) { // TODO: adjust IsolationLevel to minimize deadlocks // If you don't need to change the isolation level // then you can remove the TransactionScope altogether using (var scope = new TransactionScope( TransactionScopeOption.Required, new TransactionOptions {IsolationLevel = IsolationLevel.Serializable})) { TestItem item = dc.TestItems.Single(i => i.Id == id); // work done here dc.SubmitChanges(); scope.Complete(); } } } 
  • E, claro, a minha implementação de um auxiliar de repetição de deadlock

     public static class DeadlockRetryHelper { private const int MaxRetries = 4; private const int SqlDeadlock = 1205; public static void Execute(Action action, int maxRetries = MaxRetries) { if (HasAmbientTransaction()) { // Deadlock blows out containing transaction // so no point retrying if already in tx. action(); } int retries = 0; while (retries < maxRetries) { try { action(); return; } catch (Exception e) { if (IsSqlDeadlock(e)) { retries++; // Delay subsequent retries - not sure if this helps or not Thread.Sleep(100 * retries); } else { throw; } } } action(); } private static bool HasAmbientTransaction() { return Transaction.Current != null; } private static bool IsSqlDeadlock(Exception exception) { if (exception == null) { return false; } var sqlException = exception as SqlException; if (sqlException != null && sqlException.Number == SqlDeadlock) { return true; } if (exception.InnerException != null) { return IsSqlDeadlock(exception.InnerException); } return false; } } 
  • Uma outra possibilidade é usar uma estratégia de particionamento

Se as tabelas puderem ser particionadas naturalmente em vários conjuntos de dados distintos, você poderá usar tabelas e índices particionados do SQL Server ou dividir manualmente as tabelas existentes em vários conjuntos de tabelas. Eu recomendaria usar o particionamento do SQL Server, já que a segunda opção seria confusa. O particionamento interno também está disponível apenas no SQL Enterprise Edition.

Se o particionamento for possível para você, você poderia escolher um esquema de parções que quebrasse seus dados em, digamos, 8 conjuntos distintos. Agora você pode usar o seu único código de thread original, mas tem 8 threads cada segmentando uma partição separada. Agora não haverá nenhum (ou pelo menos um número mínimo de) deadlocks.

Espero que faça sentido.

Visão geral

A raiz do problema é que o L2S DataContext, como o ObjectContext do Entity Framework, não é thread-safe. Conforme explicado nesta troca do fórum MSDN , o suporte para operações assíncronas nas soluções .NET ORM ainda está pendente a partir do .NET 4.0; você terá que rolar sua própria solução, o que, como você descobriu, nem sempre é fácil de fazer quando sua estrutura assume o thread único.

Aproveito a oportunidade para observar que o L2S é construído sobre o ADO.NET, que suporta totalmente a operação assíncrona – pessoalmente, eu preferiria lidar diretamente com essa camada inferior e escrever o SQL sozinho, só para ter certeza de que Eu entendi perfeitamente o que estava acontecendo na rede.

Solução do SQL Server?

Dito isto, eu tenho que perguntar – isso deve ser uma solução C #? Se você puder compor sua solução a partir de um conjunto de instruções insert / update, você pode simplesmente enviar o SQL diretamente e seus problemas de desempenho e encadeamento desaparecem. * Parece-me que seus problemas estão relacionados não às transformações de dados reais a serem feito, mas centrado em torno de torná-los performant de .net. Se o .NET for removido da equação, sua tarefa se tornará mais simples. Afinal, a melhor solução é muitas vezes aquela que você está escrevendo a menor quantidade de código, certo? 😉

Mesmo que sua lógica de atualização / inserção não possa ser expressa de maneira estritamente relacionada à configuração, o SQL Server possui um mecanismo integrado para iterar registros e executar lógica – enquanto eles são justamente difamados para muitos casos de uso, os cursores podem seja apropriado para a sua tarefa.

Se esta é uma tarefa que tem que acontecer repetidamente, você pode se beneficiar muito de codificá-lo como um procedimento armazenado.

* É claro que o SQL de longa duração traz seus próprios problemas, como escalonamento de bloqueios e uso de índice, com os quais você terá de lidar.

Solução C #

É claro, pode ser que fazer isso em SQL esteja fora de questão – talvez as decisões do seu código dependam de dados que vêm de outro lugar, por exemplo, ou talvez seu projeto tenha uma convenção rígida ‘não permitida pelo SQL’. Você menciona alguns erros típicos de multithreading, mas sem ver seu código, eu não posso realmente ser útil com eles especificamente.

Fazer isso do C # é obviamente viável, mas você precisa lidar com o fato de que uma quantidade fixa de latência existirá para cada chamada que você fizer. Você pode atenuar os efeitos da latência da rede usando conexões em pool, habilitando vários conjuntos de resultados ativos e usando os methods Begin / End asynchronouss para executar suas consultas. Mesmo com todos eles, você ainda terá que aceitar que há um custo para enviar dados do SQL Server para o seu aplicativo.

Uma das melhores maneiras de evitar que o seu código seja posta em andamento é evitar o compartilhamento de dados mutáveis ​​entre os encadeamentos o máximo possível. Isso significaria não compartilhar o mesmo DataContext em vários segmentos. A próxima melhor abordagem é bloquear seções críticas de código que tocam os dados compartilhados – lock blocos em todo o access a DataContext, desde a primeira leitura até a gravação final. Essa abordagem pode apenas evitar os benefícios do multithreading inteiramente; você pode provavelmente fazer o seu bloqueio mais refinado, mas seja advertido de que este é um caminho de dor.

Muito melhor é manter suas operações separadas umas das outras por completo. Se você pode particionar sua lógica através de registros ‘principais’, isso é ideal – isto é, contanto que não haja relacionamentos entre as várias tabelas filhas, e desde que um registro em ‘main’ não tenha implicações para outro, você pode dividir suas operações em vários segmentos como este:

 private IList GetMainIds() { using (var context = new MyDataContext()) return context.Main.Select(m => m.Id).ToList(); } private void FixUpSingleRecord(int mainRecordId) { using (var localContext = new MyDataContext()) { var main = localContext.Main.FirstOrDefault(m => m.Id == mainRecordId); if (main == null) return; foreach (var childOneQuality in main.ChildOneQualities) { // If child one is not found, create it // Create the relationship if needed } // Repeat for ChildTwo and ChildThree localContext.SaveChanges(); } } public void FixUpMain() { var ids = GetMainIds(); foreach (var id in ids) { var localId = id; // Avoid closing over an iteration member ThreadPool.QueueUserWorkItem(delegate { FixUpSingleRecord(id) }); } } 

Obviamente, isso é tanto um exemplo de brinquedo quanto o pseudocódigo na sua pergunta, mas espero que você pense em como escopo de suas tarefas de tal forma que não há (ou mínimo) estado compartilhado entre eles. Isso, eu acho, será a chave para uma solução C # correta.

EDIT Respondendo a atualizações e comentários

Se você está vendo problemas de consistência de dados, eu aconselho impor a semântica de transação – você pode fazer isso usando um System.Transactions.TransactionScope (adicionar uma referência a System.Transactions). Como alternativa, você pode fazer isso em um nível ADO.NET acessando a conexão interna e chamando BeginTransaction nela (ou qualquer que seja o método DataConnection chamado).

Você também menciona deadlocks. O fato de você estar lutando contra os deadlocks do SQL Server indica que as consultas SQL reais estão pisando nos dedos uns dos outros. Sem saber o que realmente está sendo enviado pela rede, é difícil dizer em detalhes o que está acontecendo e como corrigi-lo. Basta dizer que deadlocks SQL resultam de consultas SQL e não necessariamente de construções de threading C # – é necessário examinar exatamente o que está acontecendo. Meu instinto me diz que se cada registro ‘principal’ é verdadeiramente independente dos outros, então não deve haver necessidade de bloqueios de linha e tabela, e que o Linq para SQL é provavelmente o culpado aqui.

Você pode obter um despejo do SQL bruto emitido por L2S em seu código, definindo a propriedade DataContext.Log para algo, por exemplo, Console.Out. Embora eu nunca tenha usado isso pessoalmente, eu entendo que o LINQPad oferece resources L2S e você pode conseguir o SQL lá também.

O SQL Server Management Studio vai te levar até o fim – usando o Monitor de Atividades, você pode observar o escalonamento de bloqueios em tempo real. Usando o Query Analyzer, você pode ver exatamente como o SQL Server executará suas consultas. Com esses, você deve ter uma boa noção do que seu código está fazendo no lado do servidor e, por sua vez, como corrigi-lo.

Eu recomendaria mover todo o processamento XML para o servidor SQL também. Não apenas todos os seus deadlocks desaparecerão, mas você verá um aumento no desempenho que nunca mais vai querer voltar.

Será melhor explicado por um exemplo. Neste exemplo, suponho que o blob XML já esteja entrando na sua tabela principal (chamo-o de closet). Eu assumirei o seguinte esquema:

 CREATE TABLE closet (id int PRIMARY KEY, xmldoc ntext) CREATE TABLE shoe(id int PRIMARY KEY IDENTITY, color nvarchar(20)) CREATE TABLE closet_shoe_relationship ( closet_id int REFERENCES closet(id), shoe_id int REFERENCES shoe(id) ) 

E eu espero que seus dados (apenas na tabela principal) se pareçam assim:

 INSERT INTO closet(id, xmldoc) VALUES (1, 'blue') INSERT INTO closet(id, xmldoc) VALUES (2, 'red') 

Então, toda a sua tarefa é tão simples quanto a seguinte:

 INSERT INTO shoe(color) SELECT DISTINCT CAST(CAST(xmldoc AS xml).query('//shoe/color/text()') AS nvarchar) AS color from closet INSERT INTO closet_shoe_relationship(closet_id, shoe_id) SELECT closet.id, shoe.id FROM shoe JOIN closet ON CAST(CAST(closet.xmldoc AS xml).query('//shoe/color/text()') AS nvarchar) = shoe.color 

Mas, dado que você fará muito processamento semelhante, poderá tornar sua vida mais fácil declarando seu blob principal como tipo XML e simplificando ainda mais:

 INSERT INTO shoe(color) SELECT DISTINCT CAST(xmldoc.query('//shoe/color/text()') AS nvarchar) FROM closet INSERT INTO closet_shoe_relationship(closet_id, shoe_id) SELECT closet.id, shoe.id FROM shoe JOIN closet ON CAST(xmldoc.query('//shoe/color/text()') AS nvarchar) = shoe.color 

Há otimizações de desempenho adicionais possíveis, como a pré-computação repetidamente chamada de resultados do Xpath em uma tabela temporária ou permanente, ou a conversão da população inicial da tabela principal em um BULK INSERT, mas não espero que você realmente precise deles para ter sucesso .

deadlocks do servidor sql são normais e esperados neste tipo de cenário – a recomendação da MS é que eles devem ser manipulados no lado do aplicativo em vez do lado do db.

No entanto, se você precisar certificar-se de que um procedimento armazenado seja chamado apenas uma vez, poderá usar um bloqueio de mutex sql usando sp_getapplock. Aqui está um exemplo de como implementar isso

 BEGIN TRAN DECLARE @mutex_result int; EXEC @mutex_result = sp_getapplock @Resource = 'CheckSetFileTransferLock', @LockMode = 'Exclusive'; IF ( @mutex_result < 0) BEGIN ROLLBACK TRAN END -- do some stuff EXEC @mutex_result = sp_releaseapplock @Resource = 'CheckSetFileTransferLock' COMMIT TRAN 

Esse problema pode ser resolvido com a ajuda de um LimitedConcurrencyLevelTaskScheduler

 public class InOutMessagesController { private static LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(1); private TaskFactory taskFactory = new TaskFactory(scheduler); private TaskFactory> taskFactoryWithResult = new TaskFactory>(scheduler); private ConcurrentBag tasks = new ConcurrentBag(); private ConcurrentBag> tasksWithResult = new ConcurrentBag>(); private ConcurrentBag endedTaskIds = new ConcurrentBag(); private ConcurrentBag endedTaskWithResultIds = new ConcurrentBag(); private Task TaskForgetEndedTasks; private static object taskForgetLocker = new object(); #region Conveyor private async void AddTaskVoidToQueue(Task task) { try { tasks.Add(task); await taskFactory.StartNew(() => task.Start()); if (TaskForgetEndedTasks == null) { ForgetTasks(); } } catch (Exception ex) { NLogger.Error(ex); } } private async Task AddTaskWithResultToQueue(MyTask task) { ForgetTasks(); tasksWithResult.Add(task); return await taskFactoryWithResult.StartNew(() => { task.Start(); return task; }).Result; } private Object[] GetEqualTaskWithResult(string methodName) { var equalTask = tasksWithResult.FirstOrDefault(x => x.MethodName == methodName); if (equalTask == null) { return null; } else { return equalTask.Result; } } private void ForgetTasks() { Task.WaitAll(tasks.Where(x => x.Status == TaskStatus.Running || x.Status == TaskStatus.Created || x.Status == TaskStatus.WaitingToRun).ToArray()); lock (taskForgetLocker) { if (TaskForgetEndedTasks == null) { TaskForgetEndedTasks = new Task(ForgetEndedTasks); TaskForgetEndedTasks.Start(); } TaskForgetEndedTasks.Wait(); TaskForgetEndedTasks = null; } } private void ForgetEndedTasks() { try { var completedTasks = tasks.Where(x => x.IsCompleted || x.IsFaulted || x.IsCanceled); var completedTasksWithResult = tasksWithResult.Where(x => x.IsCompleted || x.IsFaulted || x.IsCanceled); if (completedTasks.Count() > 0) { foreach (var ts in completedTasks) { if (ts.Exception != null) { NLogger.Error(ts.Exception); if (ts.Exception.InnerException != null) { NLogger.Error(ts.Exception.InnerException); } } endedTaskIds.Add(ts.Id); } if (endedTaskIds.Count != 0) { foreach (var t in endedTaskIds) { Task ct = completedTasks.FirstOrDefault(x => x.Id == t); tasks.TryTake(out ct); } } endedTaskIds = new ConcurrentBag(); } if (completedTasksWithResult.Count() > 0) { foreach (var ts in completedTasksWithResult) { if (ts.Exception != null) { NLogger.Error(ts.Exception); if (ts.Exception.InnerException != null) { NLogger.Error(ts.Exception.InnerException); } } endedTaskWithResultIds.Add(ts.Id); } foreach (var t in endedTaskWithResultIds) { var ct = tasksWithResult.FirstOrDefault(x => x.Id == t); tasksWithResult.TryTake(out ct); } endedTaskWithResultIds = new ConcurrentBag(); } } catch(Exception ex) { NLogger.Error(ex); } } #endregion Conveyor internal void UpdateProduct(List products) { var updateProductDataTask = new Task(() => ADOWorker.UpdateProductData(products)); AddTaskVoidToQueue(updateProductDataTask); } internal async Task> GetProduct() { string methodName = "GetProductData"; Product_Data[] result = GetEqualTaskWithResult(methodName) as Product_Data[]; if (result == null) { var task = new MyTask(ADOWorker.GetProductData, methodName); result = await AddTaskWithResultToQueue(task) as Product_Data[]; } return result; } } public class ADOWorker { public Object[] GetProductData() { entities = new DataContext(); return entities.Product_Data.ToArray(); } public void UpdateProductData(List products) { entities = new DataContext(); foreach (Product_Data pr_data in products) { entities.sp_Product_Data_Upd(pr_data); } } } 

Isso pode ser óbvio, mas fazer um loop por cada tupla e executar seu trabalho no contêiner de servlet envolve muita sobrecarga por registro.

Se possível, mova parte ou todo o processamento para o servidor SQL, reescrevendo sua lógica como um ou mais stored procedures.

E se

  • Você não tem muito tempo para gastar com esse problema e precisa corrigi-lo agora
  • Você tem certeza de que seu código é feito para que diferentes threads NÃO modifiquem o mesmo registro
  • Você não está com medo

Então … você pode simplesmente adicionar “WITH NO LOCK” às suas consultas para que o MSSQL não aplique os bloqueios.

Para usar com caucanvas 🙂

Mas de qualquer forma, você não nos disse onde o tempo é perdido (na versão mono-threaded). Porque se estiver no código, aconselho você a escrever tudo diretamente no database para evitar a troca contínua de dados. Se estiver no database, eu aconselho a verificar o índice (demais?), I / o, cpu etc.