Почему этот код асинхронного / ожидающего TAP медленнее, чем версия TPL?

18

Мне пришлось написать консольное приложение, которое называлось веб-службой Microsoft Dynamics CRM, для выполнения действия над более чем восемью тысячами CRM-объектов. Подробности вызова веб-службы не имеют значения и не показаны здесь, но мне нужен многопоточный клиент, чтобы я мог выполнять вызовы параллельно. Я хотел бы иметь возможность контролировать количество потоков, используемых в настройках конфигурации, а также для приложения, чтобы отменить всю операцию, если количество ошибок службы достигло порога, определенного в конфигурации.

Я написал его с помощью Task Parallel Library Task.Run и ContinueWith, отслеживая количество вызовов (потоков), количество ошибок, которые мы получили, и был ли пользователь отменен с клавиатуры. Все отлично работало, и у меня была обширная регистрация, чтобы убедиться, что потоки заканчиваются чисто и все в порядке. Я мог видеть, что программа использовала максимальное количество потоков параллельно и, если наш максимальный предел был достигнут, до тех пор, пока запущенная задача не завершится, прежде чем запускать другую.

Во время моего обзора кода мой коллега предположил, что было бы лучше сделать это с помощью async / wait вместо задач и продолжений, поэтому я создал ветвь и переписал ее таким образом. Результаты были интересными - версия async / await была почти в два раза медленнее и никогда не достигала максимального количества разрешенных параллельных операций / потоков. В TPL всегда было до 10 потоков параллельно, тогда как версия async / await никогда не превышала 5.

Мой вопрос: допустил ли я ошибку в том, как я написал код async / await (или код TPL даже)? Если я не закодировал это неправильно, можете ли вы объяснить, почему async / await менее эффективен, и значит ли это, что лучше использовать TPL для многопоточного кода.

Обратите внимание, что код, который я тестировал, на самом деле не вызывал CRM - класс CrmClient просто нить-спал в течение продолжительности, указанной в конфигурации (пять секунд), а затем генерирует исключение. Это означало отсутствие внешних переменных, которые могли бы повлиять на производительность.

Для целей этого вопроса я создал урезанную программу, которая объединяет обе версии; который вызывается, определяется настройкой конфигурации. Каждый из них начинается с загрузочного бегуна, который настраивает среду, создает класс очереди, а затем использует объект TaskCompletion для ожидания завершения. A CancellationTokenSource используется для сообщения об аннулировании с пользователя. Список идентификаторов для процесса считывается из встроенного файла и помещается в ConcurrentQueue. Они оба начинают вызов StartCrmRequest столько раз, сколько max-threads; впоследствии, каждый раз, когда результат обрабатывается, метод ProcessResult снова вызывает StartCrmRequest, сохраняя ход, пока все наши идентификаторы не будут обработаны.

Вы можете клонировать / загружать полную программу отсюда: Ссылка

Вот соответствующая конфигурация:

<appSettings>
    <add key="TellUserAfterNCalls" value="5"/>
    <add key="CrmErrorsBeforeQuitting" value="20"/>
    <add key="MaxThreads" value="10"/>
    <add key="CallIntervalMsecs" value="5000"/>
    <add key="UseAsyncAwait" value="True" />
</appSettings>

Начиная с версии TPL, вот бегун бутстрапа, который запускает диспетчер очереди:

public static class TplRunner
{
    private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();

    public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList)
    {
        Console.CancelKeyPress += (s, args) =>
        {
            CancelCrmClient();
            args.Cancel = true;
        };

        var start = DateTime.Now;
        Program.TellUser("Start: " + start);

        var taskCompletionSource = new TplQueue(parameters)
            .Start(CancellationTokenSource.Token, idList);

        while (!taskCompletionSource.Task.IsCompleted)
        {
            if (Console.KeyAvailable)
            {
                if (Console.ReadKey().Key != ConsoleKey.Q) continue;
                Console.WriteLine("When all threads are complete, press any key to continue.");
                CancelCrmClient();
            }
        }

        var end = DateTime.Now;
        Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds);
    }

    private static void CancelCrmClient()
    {
        CancellationTokenSource.Cancel();
        Console.WriteLine("Cancelling Crm client. Web service calls in operation will have to run to completion.");
    }
}

Вот сам менеджер очереди TPL:

public class TplQueue
{
    private readonly RuntimeParameters parameters;
    private readonly object locker = new object();
    private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>();
    private readonly CrmClient crmClient;
    private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
    private int threadCount;
    private int crmErrorCount;
    private int processedCount;
    private CancellationToken cancelToken;

    public TplQueue(RuntimeParameters parameters)
    {
        this.parameters = parameters;
        crmClient = new CrmClient();
    }

    public TaskCompletionSource<bool> Start(CancellationToken cancellationToken, IEnumerable<string> ids)
    {
        cancelToken = cancellationToken;

        foreach (var id in ids)
        {
            idQueue.Enqueue(id);
        }

        threadCount = 0;

        // Prime our thread pump with max threads.
        for (var i = 0; i < parameters.MaxThreads; i++)
        {
            Task.Run((Action) StartCrmRequest, cancellationToken);
        }

        return taskCompletionSource;
    }

    private void StartCrmRequest()
    {
        if (taskCompletionSource.Task.IsCompleted)
        {
            return;
        }

        if (cancelToken.IsCancellationRequested)
        {
            Program.TellUser("Crm client cancelling...");
            ClearQueue();
            return;
        }

        var count = GetThreadCount();

        if (count >= parameters.MaxThreads)
        {
            return;
        }

        string id;
        if (!idQueue.TryDequeue(out id)) return;

        IncrementThreadCount();
        crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs).ContinueWith(ProcessResult);

        processedCount += 1;
        if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0)
        {
            ShowProgress(processedCount);
        }
    }

    private void ProcessResult(Task<CrmResultMessage> response)
    {
        if (response.Result.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting)
        {
            Program.TellUser(
                "Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.",
                crmErrorCount);
            ClearQueue();
        }

        var count = DecrementThreadCount();

        if (idQueue.Count == 0 && count == 0)
        {
            taskCompletionSource.SetResult(true);
        }
        else
        {
            StartCrmRequest();
        }
    }

    private int GetThreadCount()
    {
        lock (locker)
        {
            return threadCount;
        }
    }

    private void IncrementThreadCount()
    {
        lock (locker)
        {
            threadCount = threadCount + 1;
        }
    }

    private int DecrementThreadCount()
    {
        lock (locker)
        {
            threadCount = threadCount - 1;
            return threadCount;
        }
    }

    private void ClearQueue()
    {
        idQueue = new ConcurrentQueue<string>();
    }

    private static void ShowProgress(int processedCount)
    {
        Program.TellUser("{0} activities processed.", processedCount);
    }
}

Обратите внимание, что я знаю, что несколько счетчиков не являются потокобезопасными, но они не критичны; переменная threadCount является единственной критической.

Вот пример метода фиктивного клиента CRM:

public Task<CrmResultMessage> CompleteActivityAsync(Guid activityId, int callIntervalMsecs)
{
    // Here we would normally call a CRM web service.
    return Task.Run(() =>
    {
        try
        {
            if (callIntervalMsecs > 0)
            {
                Thread.Sleep(callIntervalMsecs);
            }
            throw new ApplicationException("Crm web service not available at the moment.");
        }
        catch
        {
            return new CrmResultMessage(activityId, CrmResult.Failed);
        }
    });
}

И вот те же классы async / await (для краткости с удалением общих методов):

public static class AsyncRunner
{
    private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();

    public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList)
    {
        var start = DateTime.Now;
        Program.TellUser("Start: " + start);

        var taskCompletionSource = new AsyncQueue(parameters)
            .StartAsync(CancellationTokenSource.Token, idList).Result;

        while (!taskCompletionSource.Task.IsCompleted)
        {
            ...
        }

        var end = DateTime.Now;
        Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds);
    }
}

Менеджер очередей async / wait:

public class AsyncQueue
{
    private readonly RuntimeParameters parameters;
    private readonly object locker = new object();
    private readonly CrmClient crmClient;
    private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
    private CancellationToken cancelToken;
    private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>();
    private int threadCount;
    private int crmErrorCount;
    private int processedCount;

    public AsyncQueue(RuntimeParameters parameters)
    {
        this.parameters = parameters;
        crmClient = new CrmClient();
    }

    public async Task<TaskCompletionSource<bool>> StartAsync(CancellationToken cancellationToken,
        IEnumerable<string> ids)
    {
        cancelToken = cancellationToken;

        foreach (var id in ids)
        {
            idQueue.Enqueue(id);
        }
        threadCount = 0;

        // Prime our thread pump with max threads.
        for (var i = 0; i < parameters.MaxThreads; i++)
        {
            await StartCrmRequest();
        }

        return taskCompletionSource;
    }

    private async Task StartCrmRequest()
    {
        if (taskCompletionSource.Task.IsCompleted)
        {
            return;
        }

        if (cancelToken.IsCancellationRequested)
        {
            ...
            return;
        }

        var count = GetThreadCount();

        if (count >= parameters.MaxThreads)
        {
            return;
        }

        string id;
        if (!idQueue.TryDequeue(out id)) return;

        IncrementThreadCount();
        var crmMessage = await crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs);
        ProcessResult(crmMessage);

        processedCount += 1;
        if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0)
        {
            ShowProgress(processedCount);
        }
    }

    private async void ProcessResult(CrmResultMessage response)
    {
        if (response.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting)
        {
            Program.TellUser(
                "Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.",
                crmErrorCount);
            ClearQueue();
        }

        var count = DecrementThreadCount();

        if (idQueue.Count == 0 && count == 0)
        {
            taskCompletionSource.SetResult(true);
        }
        else
        {
            await StartCrmRequest();
        }
    }
}

Итак, при установке MaxThreads на 10 и CrmErrorsBeforeQuitting на 20, версия TPL на моей машине завершается через 19 секунд, а версия async / await занимает 35 секунд. Учитывая, что у меня более 8000 звонков, чтобы сделать это, это значительная разница. Любые идеи?

    
задан Rob Kent 24.06.2014 в 16:41
источник
  • Рассмотрите возможность использования ConfigureAwait (false) во всех ваших ожидаемых задачах, где вам не нужно возвращаться к исходному SynchronizationContext. –  Kirill Shlenskiy 24.06.2014 в 16:49
  • @RobKent Вы заметите, что обсуждение Джона Скита - это микро-оптимизация, второстепенная проблема. –  GregC 24.06.2014 в 17:19
  • По моему опыту, async / wait может работать лучше, чем ContinueWith: stackoverflow.com/a/23878905/1768303 –  Noseratio 24.06.2014 в 23:52
  • @Noseratio, я понял, что ExecuteSynchronously не был преступником после прочтения вашего (правда, довольно интересного) теста. –  Kirill Shlenskiy 25.06.2014 в 01:46
  • @CountZero: «Если мне нужно пробираться через сотни строк кода, чтобы найти проблему, я делаю работу, которую вы должны делать. Часто, если вы много работаете, чтобы уменьшить проблему до короткой, но полной программы, вы сами найдете эту проблему ». - Джон Скит –  Stephen Cleary 26.06.2014 в 19:00
Показать остальные комментарии

3 ответа

10

Я думаю, что вижу здесь проблему или, по крайней мере, ее часть. Посмотрите внимательно на два бита кода ниже; они не эквивалентны.

// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
    Task.Run((Action) StartCrmRequest, cancellationToken);
}

и

// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
    await StartCrmRequest();
}

В исходном коде (я воспринимаю его как заданный, что он функционально звучит) есть один вызов ContinueWith . Именно столько await -операторов, которые я ожидал бы увидеть в тривиальной перезаписи, если она предназначена для сохранения исходного поведения.

Не сложное и быстрое правило и применимо только в простых случаях, но, тем не менее, хорошо следить за тем.

    
ответ дан Kirill Shlenskiy 24.06.2014 в 17:05
  • Просто начал понимать это. Это было мое (вероятно, ошибочное) понимание того, что основной код кода продолжает работать после ожидания, чтобы цикл продолжался. Просто хочу подтвердить это сейчас. –  Rob Kent 24.06.2014 в 17:23
  • @RobKent, это на самом деле выглядит довольно простой перепиской - вы просто переусердствовали в некоторых частях, изменяя вещи, которые не нужно было менять (опять же, я предполагаю, что исходный код был правильным для начала) , Я внес изменения в число ожиданий. –  Kirill Shlenskiy 24.06.2014 в 17:26
  • Ỳes, просто протестировал его, и цикл с ожиданием в нем синхронный. –  Rob Kent 24.06.2014 в 17:28
  • @RobKent, он по-прежнему асинхронный, но последовательный, а не параллельный. –  Kirill Shlenskiy 24.06.2014 в 17:36
  • Кстати, вы вынимаете этот бит Task.Run также меняет вещи, потому что в перезаписывании StartCrmRequest будет работать синхронно, пока он не достигнет первого ожидания, тогда как Task.Run выгружает часть метода, предшествующего вашему ожиданию в другой поток с самого начала (делая асинхронный вызов). Это тонкая разница, но стоит упомянуть. Это не окажет отрицательного влияния на производительность, если метод работает быстро, прежде чем нажать кнопку ожидания (что я уверен, что вы это делаете), но если вам случится существенная синхронная работа перед ожиданием, это приведет к дальнейшей задержке. –  Kirill Shlenskiy 24.06.2014 в 17:38
Показать остальные комментарии
4

Я думаю, вы слишком усложнили свое решение и оказались не в том месте, где захотите, в реализации.

Прежде всего, соединения с любым хостом HTTP ограничены диспетчером точек обслуживания , ограничение по умолчанию для клиентских сред - 2, но вы можете увеличить его самостоятельно.

Независимо от того, сколько потоков вы создаете, не будет более активных запросов, чем ожидалось.

Затем, как заметил кто-то, await логически блокирует поток выполнения.

И, наконец, вы потратили свое время на создание AsyncQueue , когда вы должны были использовать Потоки данных TPL .

    
ответ дан Paulo Morgado 25.06.2014 в 00:55
  • Да, спасибо, указав ограничение ServicePointManager - я не знал об этом. Хотя это определенно имеет значение, когда я реализую фактический веб-вызов, для целей этого вопроса это не имеет значения, потому что нам нужно только знать, почему асинхронный / ожидающий работает медленнее, не делая фактически веб-вызов. Спасибо за ваш комментарий. –  Rob Kent 25.06.2014 в 12:31
  • Если вы действительно хотите это доказать, вам понадобится гораздо менее сложный пример. –  Paulo Morgado 25.06.2014 в 12:43
0

Когда реализовано с помощью async / await, я ожидаю, что алгоритм привязки ввода-вывода будет работать в одном потоке. В отличие от @KirillShlenskiy, я считаю, что бит, ответственный за «возвращение» в контекст звонящего, не несет ответственности за замедление. Я думаю, что вы переполняете пул потоков, пытаясь использовать его для операций с привязкой к I / O. Он предназначен в основном для операций с привязкой к вычислению.

Посмотрите на ForEachAsync. Я чувствую, что это то, что вы ищете (обсуждение Стивена Тууба, вы также найдете видео Wischik):

Ссылка

(Используйте степень параллелизма для уменьшения объема памяти)

Ссылка Ссылка

    
ответ дан GregC 24.06.2014 в 17:01
  • Спасибо, я посмотрю на эти ссылки. –  Rob Kent 24.06.2014 в 17:29
  • В этой статье есть один комментарий, в котором я согласен: «Эти новые ожидания и асинхронные ключевые слова действительно разрушают мой мозг. В какой-то момент я думаю, что понимаю их, а другой понимаю, что я этого не делаю». Мне нужно изучить его больше. –  Rob Kent 24.06.2014 в 17:35
  • @RobKent Я рад поделиться ссылками на этот чудесно богатый контент. Счастливое чтение и просмотр! –  GregC 24.06.2014 в 17:38