Параллельное программирование на С++ в действии. Практика разработки многопоточных программ - Энтони Уильямс 14 стр.


Ну и как же всё это соотносится с листингом 4.1? В следующем листинге показано, как перенести оттуда код в методы push() и wait_and_pop().

Листинг 4.4. Реализация функций push() и wait_and_pop() на основе кода из листинга 4.1

#include <queue>

#include <mutex>

#include <condition_variable>

template<typename T>

class threadsafe_queue {

private:

 std::mutex mut;

 std::queue<T> data_queue;

 std::condition_variable data_cond;

public:

 void push(T new_value) {

  std::lock_guard<std::mutex> lk(mut);

  data_queue.push(new_value);

  data_cond.notify_one();

 }

 void wait_and_pop(T& value) {

  std::unique_lock<std::mutex> lk(mut);

  data_cond.wait(lk, [this]{return !data_queue.empty();});

  value = data_queue.front();

  data_queue.pop();

 }

};

threadsafe_queue<data_chunk> data_queue; (1)

void data_preparation_thread() {

 while (more_data_to_prepare()) {

  data_chunk const data = prepare_data();

  data_queue.push(data); (2)

 }

}

void data_processing_thread() {

 while (true) {

  data_chunk data;

  data_queue.wait_and_pop(data); (3)

  process(data);

  if (is_last_chunk(data))

   break;

 }

}

Теперь мьютекс и условная переменная находятся в экземпляре threadsafe_queue, поэтому не нужно ни отдельных переменных (1), ни внешней синхронизации при обращении к функции push() (2). Кроме того, wait_and_pop() берет на себя заботу об ожидании условной переменной (3).

Второй перегруженный вариант wait_and_pop() тривиален, а остальные функции можно почти без изменений скопировать из кода стека в листинге 3.5. Ниже приведена окончательная реализация.

Листинг 4.5. Полное определение класса потокобезопасной очереди на базе условных переменных

#include <queue>

#include <memory>

#include <mutex>

#include <condition_variable>

template<typename T>

class threadsafe_queue {

private:

 mutable std::mutex mut;(1) Мьютекс должен быть изменяемым

 std::queue<T> data_queue;

 std::condition_variable data_cond;

public:

 threadsafe_queue() {}

 threadsafe_queue(threadsafe_queue const& other) {

  std::lock_guard<std::mutex> lk(other.mut);

  data_queue = other.data_queue;

 }

 void push(T new_value) {

  std::lock_guard<std::mutex> lk(mut);

  data_queue.push(new_value);

  data_cond.notify_one();

 }

 void wait_and_pop(T& value) {

  std::unique_lock<std::mutex> lk(mut);

  data_cond.wait(lk, [this]{ return !data_queue.empty(); });

  value = data_queue.front();

  data_queue.pop();

 }

 std::shared_ptr<T> wait_and_pop() {

  std::unique_lock<std::mutex> lk(mut);

  data_cond.wait(lk, [this]{ return !data_queue.empty(); });

  std::shared_ptr<T>

   res(std::make_shared<T>(data_queue.front()));

  data_queue.pop();

  return res;

 }

 bool try_pop(T& value) {

  std::lock_guard<std::mutex> lk(mut);

  if (data_queue.empty())

   return false;

  value = data_queue.front();

  data_queue.pop();

  return true;

 }

 std::shared_ptr<T> try_pop() {

  std::lock_guard<std::mutex> lk(mut);

  if (data_queue.empty())

   return std::shared_ptr<T>();

  std::shared_ptr<T>

   res(std::make_shared<T>(data_queue.front()));

  data_queue.pop();

  return res;

 }

 bool empty() const {

  std::lock_guard<std::mutex> lk(mut);

  return data_queue.empty();

 }

};

Хотя empty()константная функция-член, а параметр копирующего конструктораconst-ссылка, другие потоки могут хранить неконстантные ссылки на объект и вызывать изменяющие функции-члены, которые захватывают мьютекс. Поэтому захват мьютексаэто изменяющая операция, следовательно, член mut необходимо пометить как mutable (1), чтобы его можно было захватить в функции empty() и в копирующем конструкторе.

Условные переменные полезны и тогда, когда есть несколько потоков, ожидающих одного события. Если потоки используются для разделения работы и, следовательно, на извещение должен реагировать только один поток, то применима точно такая же структура программы, как в листинге 4.1; нужно только запустить несколько потоков обработки данных. При поступлении новых данных функция notify_one() разбудит только один поток, который проверяет условие внутри wait(), и этот единственный поток вернет управление из wait() (в ответ на помещение нового элемента в очередь data_queue). Заранее нельзя сказать, какой поток получит извещение и есть ли вообще ожидающие потоки (не исключено, что все они заняты обработкой ранее поступивших данных).

Альтернативный сценарийкогда несколько потоков ожидают одного события, и отреагировать должны все. Так бывает, например, когда инициализируются разделяемые данные, и все работающие с ними потоки должны ждать, пока инициализация завершится (хотя для этого случая существуют более подходящие механизмы, см. раздел 3.3.1 главы 3), или когда потоки должны ждать обновления разделяемых данных, например, в случае периодической повторной инициализации. В таких ситуациях поток, отвечающий за подготовку данных, может вызвать функцию-член notify_all() условной переменной вместо notify_one(). Эта функция извещает все потоки, ожидающие внутри функции wait(), о том, что они должны проверить ожидаемое условие.

Если ожидающий поток собирается ждать условия только один раз, то есть после того как оно станет истинным, он не вернется к ожиданию той же условной переменной, то лучше применить другой механизм синхронизации. В особенности это относится к случаю, когда ожидаемое условиедоступность каких-то данных. Для такого сценария больше подходят так называемые будущие результаты (future).

4.2. Ожидание одноразовых событий с помощью механизма будущих результатов

Предположим, вы летите самолетом в отпуск за границу. Вы приехали в аэропорт, прошли регистрацию и прочие процедуры, но должны ждать объявления о посадкебыть может, несколько часов. Можно, конечно, найти себе занятиенапример, почитать книжку, побродить в Интернете, поесть в кафе за бешеные деньги, но суть от этого не меняется: вы ждете сигнала о том, что началась посадка в самолет. И есть еще одна особенностьданный рейс вылетает всего один раз; в следующий отпуск вы будете ждать посадки на другой рейс.

В стандартной библиотеке С++ такие одноразовые события моделируются с помощью будущего результата. Если поток должен ждать некоего одноразового события, то он каким-то образом получает представляющий его объект-будущее. Затем поток может периодически в течение очень короткого времени ожидать этот объект-будущее, проверяя, произошло ли событие (посмотреть на табло вылетов), а между проверками заниматься другим делом (вкушать в кафе аэропортовскую пищу по несуразным ценам). Можно поступить и иначевыполнять другую работу до тех пор, пока не наступит момент, когда без наступления ожидаемого события двигаться дальше невозможно, и вот тогда ждать готовности будущего результата. С будущим результатом могут быть ассоциированы какие-то данные (например, номер выхода в объявлении на посадку), но это необязательно. После того как событие произошло (то есть будущий результат готов), сбросить объект-будущее в исходное состояние уже невозможно.

В стандартной библиотеке С++ есть две разновидности будущих результатов, реализованные в форме двух шаблонов классов, которые объявлены в заголовке <future>: уникальные будущие результаты (std::future<>) и разделяемые будущие результаты (std::shared_future<>). Эти классы устроены по образцу std::unique_ptr и std::shared_ptr. На одно событие может ссылаться только один экземпляр std::future, но несколько экземпляров std::shared_future. В последнем случае все экземпляры оказываются готовы одновременно и могут обращаться к ассоциированным с событием данным. Именно из-за ассоциированных данных будущие результаты представлены шаблонами, а не обычными классами; точно так же шаблоны std::unique_ptr и std::shared_ptr параметризованы типом ассоциированных данных. Если ассоциированных данных нет, то следует использовать специализации шаблонов std::future<void> и std::shared_future<void>. Хотя будущие результаты используются как механизм межпоточной коммуникации, сами по себе они не обеспечивают синхронизацию доступа. Если несколько потоков обращаются к единственному объекту-будущему, то они должны защитить доступ с помощью мьютекса или какого-либо другого механизма синхронизации, как описано в главе 3. Однако, как будет показано в разделе 4.2.5, каждый из нескольких потоков может работать с собственной копией std::shared_future<> безо всякой синхронизации, даже если все они ссылаются на один и тот же асинхронно получаемый результат.

Самое простое одноразовое событиеэто результат вычисления, выполненного в фоновом режиме. В главе 2 мы видели, что класс std::thread не предоставляет средств для возврата вычисленного значения, и я обещал вернуться к этому вопросу в главе 4. Исполняю обещание.

4.2.1. Возврат значения из фоновой задачи

Допустим, вы начали какое-то длительное вычисление, которое в конечном итоге должно дать полезный результат, но пока без него можно обойтись. Быть может, вы нашли способ получить ответ на «Главный возрос жизни, Вселенной и всего на свете» из книги Дугласа Адамса. Для вычисления можно запустить новый поток, но придётся самостоятельно позаботиться о передаче в основную программу результата, потому что в классе std::thread такой механизм не предусмотрен. Тут-то и приходит на помощь шаблон функции std::async (также объявленный в заголовке <future>).

Функция std::async позволяет запустить асинхронную задачу, результат которой прямо сейчас не нужен. Но вместо объекта std::thread она возвращает объект std::future, который будет содержать возвращенное значение, когда оно станет доступно. Когда программе понадобится значение, она вызовет функцию-член get() объекта-будущего, и тогда поток будет приостановлен до готовности будущего результата, после чего вернет значение. В листинге ниже оказан простой пример.

Листинг 4.6. Использование std::future для получения результата асинхронной задачи

#include <future>

#include <iostream>

int find_the_answer_to_ltuae();

void do_other_stuff();

int main() {

 std::future<int> the_answer =

  std::async(find_the_answer_to_ltuae);

 do_other_stuff();

 std::cout << "Ответ равен " << the_answer.get() << std::endl;

}

Шаблон std::async позволяет передать функции дополнительные параметры, точно так же, как std::thread. Если первым аргументом является указатель на функцию-член, то второй аргумент должен содержать объект, от имени которого эта функция-член вызывается (сам объект, указатель на него или обертывающий его std::ref), а все последующие аргументы передаются без изменения функции-члену. В противном случае второй и последующие аргументы передаются функции или допускающему вызов объекту, заданному в первом аргументе. Как и в std::thread, если аргументы представляют собой r-значения, то создаются их копии посредством перемещения оригинала. Это позволяет использовать в качестве объекта-функции и аргументов типы, допускающие только перемещение. Пример см. в листинге ниже.

Листинг 4.7. Передача аргументов функции, заданной в std::async

#include <string>

#include <future>

struct X {

 void foo(int, std::string const&);

 std::string bar(std::string const&);

};

                                                Вызывается

X x;                                            p->foo(42,"hello"),

auto f1 = std::async(&X::foo, &x, 42, "hello");где p=&x

auto f2 = std::async(&X::bar, x, "goodbye"); вызывается

                                             tmpx.bar("goodbye"),

struct Y {                                   где tmpxкопия x

 double operator()(double);

};                               Вызывается tmpy(3.141),

                                 где tmpy создается

Y y;                             из Y перемещающим

auto f3 = std::async(Y(), 3.141)конструктором

auto f4 = std::async(std::ref(y), 2.718);Вызывается y(2.718)

X baz(X&);

std::async(baz, std::ref(x); Вызывается baz(x)

class move_only {

public:

 move_only();

 move_only(move_only&&);

 move_only(move_only const&) = delete;

 move_only& operator=(move_only&&);

 move_only& operator=(move_only const&) = delete;

 void operator()();                Вызывается tmp(), где tmp

};                                 конструируется с помощью

auto f5 = std::async(move_only());std::move(move_only())

По умолчанию реализации предоставлено право решать, запускает ли std::async новый поток или задача работает синхронно, когда программа ожидает будущего результата. В большинстве случаев такое поведение вас устроит, но можно задать требуемый режим в дополнительном параметре std::async перед вызываемой функцией. Этот параметр имеет тип std::launch и может принимать следующие значения: std::launch::deferred  отложить вызов функции до того момента, когда будет вызвана функция-член wait() или get() объекта-будущего; std::launch::asyncзапускать функцию в отдельном потоке; std::launch::deferred | std::launch::asyncоставить решение на усмотрение реализации. Последний вариант подразумевается по умолчанию. В случае отложенного вызова функция может вообще никогда не выполниться. Например:

auto f6 =                                  Выполнять в

 std::async(std::launch::async, Y(), 1.2);новом потоке

auto f7 =

 std::async(

  std::launch::deferred, baz, std::ref(x));

auto f8 = std::async(                      Выполнять

 std::launch::deferred | std::launch::async,при вызове

 baz, std::ref(x));                         wait() или get()

auto f9 = std::async(baz, std::ref(x));    Оставить на

                                            усмотрение реализации

f7.wait();Вызвать отложенную функцию

Ниже в этой главе и далее в главе 8 мы увидим, что с помощью std::async легко разбивать алгоритм на параллельно выполняемые задачи. Однако это не единственный способ ассоциировать объект std::future с задачей; можно также обернуть задачу объектом шаблонного класса std::packaged_task<> или написать код, который будет явно устанавливать значения с помощью шаблонного класса std::promise<>. Шаблон std::packaged_task является абстракцией более высокого уровня, чем std::promise, поэтому начнем с него.

Назад Дальше