Параллельное программирование на С++ в действии. Практика разработки многопоточных программ - Энтони Уильямс 15 стр.


4.2.2. Ассоциирование задачи с будущим результатом

Шаблон класса std::packaged_task<> связывает будущий результат с функцией или объектом, допускающим вызов. При вызове объекта std::packaged_task<> ассоциированная функция или допускающий вызов объект вызывается и делает будущий результат готовым, сохраняя возвращенное значение в виде ассоциированных данных. Этот механизм можно использовать для построение пулов потоков (см. главу 9) и иных схем управления, например, запускать каждую задачу в отдельном потоке или запускать их все последовательно в выделенном фоновом потоке. Если длительную операцию можно разбить на автономные подзадачи, то каждую из них можно обернуть объектом std::packaged_task<> и передать этот объект планировщику задач или пулу потоков. Таким образом, мы абстрагируем специфику задачипланировщик имеет дело только с экземплярами std::packaged_task<>, а не с индивидуальными функциями.

Параметром шаблона класса std::packaged_task<> является сигнатура функции, например void() для функции, которая не принимает никаких параметров и не возвращает значения, или int(std::string&, double*) для функции, которая принимает неконстантную ссылку на std::string и указатель на double и возвращает значение типа int. При конструировании экземпляра std::packaged_task вы обязаны передать функцию или допускающий вызов объект, который принимает параметры указанных типов и возвращает значение типа, преобразуемого в указанный тип возвращаемого значения. Точного совпадения типов не требуется; можно сконструировать объект std::packaged_task<double (double)> из функции, которая принимает int и возвращает float, потому что между этими типами существуют неявные преобразования.

Тип возвращаемого значения, указанный в сигнатуре функции, определяет тип объекта std::future<>, возвращаемого функцией-членом get_future(), а заданный в сигнатуре список аргументов используется для определения сигнатуры оператора вызова в классе упакованной задачи. Например, в листинге ниже приведена часть определения класса std::packaged_task<std::string(std::vector<char>*, int)>.

Листинг 4.8. Определение частичной специализации std::packaged_task

template<>

class packaged_task<std::string(std::vector<char>*, int)> {

public:

 template<typename Callable>

 explicit packaged_task(Callable&& f);

 std::future<std::string> get_future();

 void operator()(std::vector<char>*, int);

};

Таким образом, std::packaged_taskдопускающий вызов объект, и, значит, его можно обернуть объектом std::function, передать std::thread в качестве функции потока, передать любой другой функции, которая ожидает допускающий вызов объект, или даже вызвать напрямую. Если std::packaged_task вызывается как объект-функция, то аргументы, переданные оператору вызова, без изменения передаются обернутой им функции, а возвращенное значение сохраняется в виде асинхронного результата в объекте std::future, полученном от get_future(). Следовательно, мы можем обернуть задачу в std::packaged_task и извлечь будущий результат перед тем, как передавать объект std::packaged_task в то место, из которого он будет в свое время вызван. Когда результат понадобится, нужно будет подождать готовности будущего результата. В следующем примере показано, как всё это делается на практике.

Передача задач между потоками

Во многих каркасах для разработки пользовательского интерфейса требуется, чтобы интерфейс обновлялся только в специально выделенных потоках. Если какому-то другому потоку потребуется обновить интерфейс, то он должен послать сообщение одному из таких выделенных потоков, чтобы тот выполнил операцию. Шаблон std::packaged_task позволяет решить эту задачу, не заводя специальных сообщений для каждой относящейся к пользовательскому интерфейсу операции.

Листинг 4.9. Выполнение кода в потоке пользовательского интерфейса с применением std::packaged_task

#include <deque>

#include <mutex>

#include <future>

#include <thread>

#include <utility>

std::mutex m;

std::deque<std::packaged_task<void()>> tasks;

bool gui_shutdown_message_received();

void get_and_process_gui_message();

void gui_thread() {                         (1)

 while (!gui_shutdown_message_received()) { (2)

  get_and_process_gui_message();            (3)

  std::packaged_task<void()> task; {

   std::lock_guard<std::mutex> lk(m);

   if (tasks empty())                       (4)

    continue;

   task = std::move(tasks.front());         (5)

   tasks.pop_front();

  }

 task();                                    (6)

 }

}

std::thread gui_bg_thread(gui_thread);

template<typename Func>

std::future<void> post_task_for_gui_thread(Func f) {

 std::packaged_task<void()> task(f);       (7)

 std::future<void> res = task.get_future();(8)

 std::lock_guard<std::mutex> lk(m);

 tasks.push_back(std::move(task));         (9)

 return res;                               (10)

}

Код очень простой: поток пользовательского интерфейса (1) повторяет цикл, пока не будет получено сообщение о необходимости завершить работу (2). На каждой итерации проверяется, есть ли готовые для обработки сообщения GUI (3), например события мыши, или новые задачи в очереди. Если задач нет (4), программа переходит на начало цикла; в противном случае извлекает задачу из очереди (5), освобождает защищающий очередь мьютекс и исполняет задачу (6). По завершении задачи будет готов ассоциированный с ней будущий результат.

Помещение задачи в очередь ничуть не сложнее: по предоставленной функции создается новая упакованная задача (7), для получения ее будущего результата вызывается функция-член get_future() (8), после чего задача помещается в очередь (9) еще до того, как станет доступен будущий результат (10). Затем часть программы, которая отправляла сообщение потоку пользовательского интерфейса, может дождаться будущего результата, если хочет знать, как завершилась задача, или отбросить его, если это несущественно.

В этом примере мы использовали класс std::packaged_task<void()> для задач, обертывающих функцию или иной допускающий вызов объект, который не принимает параметров и возвращает void (если он вернет что-то другое, то возвращенное значение будет отброшено). Это простейшая из всех возможных задач, но, как мы видели ранее, шаблон std::packaged_task применим и в более сложных ситуацияхзадав другую сигнатуру функции в качестве параметра шаблона, вы сможете изменить тип возвращаемого значения (и, стало быть, тип данных, которые хранятся в состоянии, ассоциированном с будущим объектом), а также типы аргументов оператора вызова. Наш пример легко обобщается на задачи, которые должны выполняться в потоке GUI и при этом принимают аргументы и возвращают в std::future какие-то данные, а не только индикатор успешности завершения.

А как быть с задачами, которые нельзя выразить в виде простого вызова функции, или такими, где результат может поступать из нескольких мест? Эти проблемы решаются с помощью еще одного способа создания будущего результата: явного задания значения с помощью шаблона класса std::promise.

4.2.3. Использование std::promise

При написании сетевых серверных программ часто возникает искушение обрабатывать каждый запрос на соединение в отдельном потоке, поскольку при такой структуре порядок коммуникации становится нагляднее и проще для программирования. Этот подход срабатывает, пока количество соединений (и, следовательно, потоков) не слишком велико. Но с ростом числа потоков увеличивается и объем потребляемых ресурсов операционной системы, а равно частота контекстных переключений (если число потоков превышает уровень аппаратного параллелизма), что негативно сказывается на производительности. В предельном случае у операционной системы могут закончиться ресурсы для запуска новых потоков, хотя пропускная способность сети еще не исчерпана. Поэтому в приложениях, обслуживающих очень большое число соединений, обычно создают совсем немного потоков (быть может, всего один), каждый из которых одновременно обрабатывает несколько запросов.

Рассмотрим один из таких потоков. Пакеты данных приходят по разным соединениям в случайном порядке, а потому и порядок помещения исходящих пакетов в очередь отправки тоже непредсказуем. Часто будет складываться ситуация, когда другие части приложения ждут либо успешной отправки данных, либо поступления нового пакета по конкретному сетевому соединению.

Шаблон std::promise<T> дает возможность задать значение (типа T), которое впоследствии можно будет прочитать с помощью ассоциированного объекта std::future<T>. Пара std::promise/std::future реализует один из возможных механизмов такого рода; ожидающий поток приостанавливается в ожидании будущего результата, тогда как поток, поставляющий данные, может с помощью promise установить ассоциированное значение и сделать будущий результат готовым.

Чтобы получить объект std::future, ассоциированный с данным обещанием std::promise, мы должны вызвать функцию-член get_future()так же, как в случае std::packaged_task. После установки значения обещания (с помощью функции-члена set_value()) будущий результат становится готовым, и его можно использовать для получения установленного значения. Если уничтожить объект std::promise, не установив значение, то в будущем результате будет сохранено исключение. О передаче исключений между потоками см. раздел 4.2.4.

В листинге 4.10 приведен код потока обработки соединений, написанный по только что изложенной схеме. В данном случае для уведомления об успешной передаче блока исходящих данных применяется пара std::promise<bool>/std::future<bool>; ассоциированное с будущим результатом значениеэто просто булевский флаг успех/неудача. Для входящих пакетов в качестве ассоциированных данных могла бы выступать полезная нагрузка пакета.

Листинг 4.10. Обработка нескольких соединений в одном потоке с помощью объектов-обещаний

#include <future>

void process_connections(connection_set& connections) {

 while(!done(connections)) {             (1)

  for (connection_iterator               (2)

   connection = connections.begin(), end = connections.end();

   connection != end;

   ++connection) {

   if (connection->has_incoming_data()) {(3)

    data_packet data = connection->incoming();

    std::promise<payload_type>& p =

     connection->get_promise(data.id);   (4)

    p.set_value(data.payload);

   }

   if (connection->has_outgoing_data()) {(5)

    outgoing_packet data =

     connection->top_of_outgoing_queue();

    connection->send(data.payload);

    data.promise.set_value(true);        (6)

   }

  }

 }

}

Функция process_connections() повторяет цикл, пока done() возвращает true (1). На каждой итерации поочередно проверяется каждое соединение (2); если есть входящие данные, они читаются (3), а если в очереди имеются исходящие данные, они отсылаются (5). При этом предполагается, что в каждом входящем пакете хранится некоторый идентификатор и полезная нагрузка, содержащая собственно данные. Идентификатору сопоставляется объект std::promise (возможно, путем поиска в ассоциативном контейнере) (4), значением которого является полезная нагрузка пакета. Исходящие пакеты просто извлекаются из очереди отправки и передаются но соединению. После завершения передачи в обещание, ассоциированное с исходящими данными, записывается значение true, обозначающее успех (6). Насколько хорошо эта схема ложится на фактический сетевой протокол, зависит от самого протокола; в конкретном случае схема обещание/будущий результат может и не подойти, хотя структурно она аналогична поддержке асинхронного ввода/вывода в некоторых операционных системах.

В коде выше мы полностью проигнорировали возможные исключения. Хотя мир, в котором всё всегда работает правильно, был бы прекрасен, действительность не так радужна. Переполняются диски, не находятся искомые данные, отказывает сеть, «падает» база данных  всякое бывает. Если бы операция выполнялась в том потоке, которому нужен результат, программа могла бы просто сообщить об ошибке с помощью исключения. Но было бы неоправданным ограничением требовать, чтобы всё работало правильно только потому, что мы захотели воспользоваться классами std::packaged_task или std::promise.

Поэтому в стандартной библиотеке С++ имеется корректный способ учесть возникновение исключений в таком контексте и сохранить их как часть ассоциированного результата.

4.2.4. Сохранение исключения в будущем результате

Рассмотрим следующий коротенький фрагмент. Если передать функции square_root() значение -1, то она возбудит исключение, которое увидит вызывающая программа:

double square_root(double x) {

 if (x<0) {

  throw std::out_of_range("x<0");

 }

 return sqrt(x);

}

А теперь предположим, что вместо вызова square_root() в текущем потоке

double y = square_root(-1);

мы вызываем ее асинхронно:

std::future<double> f = std::async(square_root,-1);

double y = f.get();

В идеале хотелось бы получить точно такое же поведение: чтобы поток, вызывающий f.get(), мог увидеть не только нормальное значение y, но и исключениекак в однопоточной программе.

Что ж, именно так на самом деле и происходит: если функция, вызванная через std::async, возбуждает исключение, то это исключение сохраняется в будущем результате вместо значения, а когда будущий результат оказывается готовым, вызов get() повторно возбуждает сохраненное исключение. (Примечание: стандарт ничего не говорит о том, возбуждается ли исходное исключение или его копия; различные компиляторы и библиотеки вправе решать этот вопрос по-разному.) То же самое происходит, когда функция обернута объектом std::packaged_task,если при вызове задачи обернутая функция возбуждает исключение, то объект исключения сохраняется в будущем результате вместо значения, и это исключение повторно возбуждается при обращении к get().

Разумеется, std::promise обеспечивает те же возможности в случае явного вызова функции. Чтобы сохранить исключение вместо значения, следует вызвать функцию-член set_exception(), а не set_value(). Обычно это делается в блоке catch:

extern std::promise<double> some_promise;

try {

 some_promise.set_value(calculate_value());

} catch (...) {

 some_promise.set_exception(std::current_exception());

}

Здесь мы воспользовались функцией std::current_exception(), которая возвращает последнее возбужденное исключение, но могли вызвать std::copy_exception(), чтобы поместить в объект-обещание новое исключение, которое никем не возбуждалось:

some_promise.set_exception(

 std::copy_exception(std::logic_error("foo"));

Если тип исключения заранее известен, то это решение гораздо чище, чем использование блока try/catch; мы не только упрощаем код, но и оставляем компилятору возможности для его оптимизации.

Есть еще один способ сохранить исключение в будущем результате: уничтожить ассоциированный с ним объект std::promise или std::packaged_task, не вызывая функцию установки значения в случае обещания или не обратившись к упакованной задаче. В любом случае деструктор std::promise или std::packaged_task сохранит в ассоциированном состоянии исключение типа std::future_error, в котором код ошибки равен std::future_errc::broken_promise, если только будущий результат еще не готов; создавая объект-будущее, вы даете обещание предоставить значение или исключение, а, уничтожая объект, не задав ни того, ни другого, вы это обещание нарушаете. Если бы компилятор в этом случае не сохранил ничего в будущем результате, то ожидающие потоки могли бы никогда не выйти из состояния ожидания.

Назад Дальше