Ну и как же всё это соотносится с листингом 4.1? В следующем листинге показано, как перенести оттуда код в методы push()
и wait_and_pop()
.
Листинг 4.4. Реализация функций push()
и wait_and_pop()
на основе кода из листинга 4.1
#include <queue>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue {
private:
std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
void push(T new_value) {
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
value = data_queue.front();
data_queue.pop();
}
};
threadsafe_queue<data_chunk> data_queue;
(1)
void data_preparation_thread() {
while (more_data_to_prepare()) {
data_chunk const data = prepare_data();
data_queue.push(data);
(2)
}
}
void data_processing_thread() {
while (true) {
data_chunk data;
data_queue.wait_and_pop(data);
(3)
process(data);
if (is_last_chunk(data))
break;
}
}
Теперь мьютекс и условная переменная находятся в экземпляре threadsafe_queue
, поэтому не нужно ни отдельных переменных (1), ни внешней синхронизации при обращении к функции push()
(2). Кроме того, wait_and_pop()
берет на себя заботу об ожидании условной переменной (3).
Второй перегруженный вариант wait_and_pop()
тривиален, а остальные функции можно почти без изменений скопировать из кода стека в листинге 3.5. Ниже приведена окончательная реализация.
Листинг 4.5. Полное определение класса потокобезопасной очереди на базе условных переменных
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue {
private:
mutable std::mutex mut;
(1) Мьютекс должен быть изменяемым
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
threadsafe_queue(threadsafe_queue const& other) {
std::lock_guard<std::mutex> lk(other.mut);
data_queue = other.data_queue;
}
void push(T new_value) {
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{ return !data_queue.empty(); });
value = data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{ return !data_queue.empty(); });
std::shared_ptr<T>
res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = data_queue.front();
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T>
res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
Хотя empty()
константная функция-член, а параметр копирующего конструктораconst
-ссылка, другие потоки могут хранить неконстантные ссылки на объект и вызывать изменяющие функции-члены, которые захватывают мьютекс. Поэтому захват мьютексаэто изменяющая операция, следовательно, член mut
необходимо пометить как mutable
(1), чтобы его можно было захватить в функции empty()
и в копирующем конструкторе.
Условные переменные полезны и тогда, когда есть несколько потоков, ожидающих одного события. Если потоки используются для разделения работы и, следовательно, на извещение должен реагировать только один поток, то применима точно такая же структура программы, как в листинге 4.1; нужно только запустить несколько потоков обработки данных. При поступлении новых данных функция notify_one()
разбудит только один поток, который проверяет условие внутри wait()
, и этот единственный поток вернет управление из wait()
(в ответ на помещение нового элемента в очередь data_queue
). Заранее нельзя сказать, какой поток получит извещение и есть ли вообще ожидающие потоки (не исключено, что все они заняты обработкой ранее поступивших данных).
Альтернативный сценарийкогда несколько потоков ожидают одного события, и отреагировать должны все. Так бывает, например, когда инициализируются разделяемые данные, и все работающие с ними потоки должны ждать, пока инициализация завершится (хотя для этого случая существуют более подходящие механизмы, см. раздел 3.3.1 главы 3), или когда потоки должны ждать обновления разделяемых данных, например, в случае периодической повторной инициализации. В таких ситуациях поток, отвечающий за подготовку данных, может вызвать функцию-член notify_all()
условной переменной вместо notify_one()
. Эта функция извещает все потоки, ожидающие внутри функции wait()
, о том, что они должны проверить ожидаемое условие.
Если ожидающий поток собирается ждать условия только один раз, то есть после того как оно станет истинным, он не вернется к ожиданию той же условной переменной, то лучше применить другой механизм синхронизации. В особенности это относится к случаю, когда ожидаемое условиедоступность каких-то данных. Для такого сценария больше подходят так называемые будущие результаты (future).
4.2. Ожидание одноразовых событий с помощью механизма будущих результатов
Предположим, вы летите самолетом в отпуск за границу. Вы приехали в аэропорт, прошли регистрацию и прочие процедуры, но должны ждать объявления о посадкебыть может, несколько часов. Можно, конечно, найти себе занятиенапример, почитать книжку, побродить в Интернете, поесть в кафе за бешеные деньги, но суть от этого не меняется: вы ждете сигнала о том, что началась посадка в самолет. И есть еще одна особенностьданный рейс вылетает всего один раз; в следующий отпуск вы будете ждать посадки на другой рейс.
В стандартной библиотеке С++ такие одноразовые события моделируются с помощью будущего результата. Если поток должен ждать некоего одноразового события, то он каким-то образом получает представляющий его объект-будущее. Затем поток может периодически в течение очень короткого времени ожидать этот объект-будущее, проверяя, произошло ли событие (посмотреть на табло вылетов), а между проверками заниматься другим делом (вкушать в кафе аэропортовскую пищу по несуразным ценам). Можно поступить и иначевыполнять другую работу до тех пор, пока не наступит момент, когда без наступления ожидаемого события двигаться дальше невозможно, и вот тогда ждать готовности будущего результата. С будущим результатом могут быть ассоциированы какие-то данные (например, номер выхода в объявлении на посадку), но это необязательно. После того как событие произошло (то есть будущий результат готов), сбросить объект-будущее в исходное состояние уже невозможно.
В стандартной библиотеке С++ есть две разновидности будущих результатов, реализованные в форме двух шаблонов классов, которые объявлены в заголовке <future>
: уникальные будущие результаты (std::future<>
) и разделяемые будущие результаты (std::shared_future<>
). Эти классы устроены по образцу std::unique_ptr
и std::shared_ptr
. На одно событие может ссылаться только один экземпляр std::future
, но несколько экземпляров std::shared_future
. В последнем случае все экземпляры оказываются готовы одновременно и могут обращаться к ассоциированным с событием данным. Именно из-за ассоциированных данных будущие результаты представлены шаблонами, а не обычными классами; точно так же шаблоны std::unique_ptr
и std::shared_ptr
параметризованы типом ассоциированных данных. Если ассоциированных данных нет, то следует использовать специализации шаблонов std::future<void>
и std::shared_future<void>
. Хотя будущие результаты используются как механизм межпоточной коммуникации, сами по себе они не обеспечивают синхронизацию доступа. Если несколько потоков обращаются к единственному объекту-будущему, то они должны защитить доступ с помощью мьютекса или какого-либо другого механизма синхронизации, как описано в главе 3. Однако, как будет показано в разделе 4.2.5, каждый из нескольких потоков может работать с собственной копией std::shared_future<>
безо всякой синхронизации, даже если все они ссылаются на один и тот же асинхронно получаемый результат.
Самое простое одноразовое событиеэто результат вычисления, выполненного в фоновом режиме. В главе 2 мы видели, что класс std::thread
не предоставляет средств для возврата вычисленного значения, и я обещал вернуться к этому вопросу в главе 4. Исполняю обещание.
4.2.1. Возврат значения из фоновой задачи
Допустим, вы начали какое-то длительное вычисление, которое в конечном итоге должно дать полезный результат, но пока без него можно обойтись. Быть может, вы нашли способ получить ответ на «Главный возрос жизни, Вселенной и всего на свете» из книги Дугласа Адамса. Для вычисления можно запустить новый поток, но придётся самостоятельно позаботиться о передаче в основную программу результата, потому что в классе std::thread
такой механизм не предусмотрен. Тут-то и приходит на помощь шаблон функции std::async
(также объявленный в заголовке <future>
).
Функция std::async
позволяет запустить асинхронную задачу, результат которой прямо сейчас не нужен. Но вместо объекта std::thread
она возвращает объект std::future
, который будет содержать возвращенное значение, когда оно станет доступно. Когда программе понадобится значение, она вызовет функцию-член get()
объекта-будущего, и тогда поток будет приостановлен до готовности будущего результата, после чего вернет значение. В листинге ниже оказан простой пример.
Листинг 4.6. Использование std::future
для получения результата асинхронной задачи
#include <future>
#include <iostream>
int find_the_answer_to_ltuae();
void do_other_stuff();
int main() {
std::future<int> the_answer =
std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout << "Ответ равен " << the_answer.get() << std::endl;
}
Шаблон std::async
позволяет передать функции дополнительные параметры, точно так же, как std::thread
. Если первым аргументом является указатель на функцию-член, то второй аргумент должен содержать объект, от имени которого эта функция-член вызывается (сам объект, указатель на него или обертывающий его std::ref
), а все последующие аргументы передаются без изменения функции-члену. В противном случае второй и последующие аргументы передаются функции или допускающему вызов объекту, заданному в первом аргументе. Как и в std::thread
, если аргументы представляют собой r-значения, то создаются их копии посредством перемещения оригинала. Это позволяет использовать в качестве объекта-функции и аргументов типы, допускающие только перемещение. Пример см. в листинге ниже.
Листинг 4.7. Передача аргументов функции, заданной в std::async
#include <string>
#include <future>
struct X {
void foo(int, std::string const&);
std::string bar(std::string const&);
};
Вызывается
X x;
p->foo(42,"hello"),
auto f1 = std::async(&X::foo, &x, 42, "hello");
где p=&x
auto f2 = std::async(&X::bar, x, "goodbye");
вызывается
tmpx.bar("goodbye"),
struct Y {
где tmpxкопия x
double operator()(double);
};
Вызывается tmpy(3.141),
где tmpy создается
Y y;
из Y перемещающим
auto f3 = std::async(Y(), 3.141)
конструктором
auto f4 = std::async(std::ref(y), 2.718);
Вызывается y(2.718)
X baz(X&);
std::async(baz, std::ref(x);
Вызывается baz(x)
class move_only {
public:
move_only();
move_only(move_only&&);
move_only(move_only const&) = delete;
move_only& operator=(move_only&&);
move_only& operator=(move_only const&) = delete;
void operator()();
Вызывается tmp(), где tmp
};
конструируется с помощью
auto f5 = std::async(move_only());
std::move(move_only())
По умолчанию реализации предоставлено право решать, запускает ли std::async
новый поток или задача работает синхронно, когда программа ожидает будущего результата. В большинстве случаев такое поведение вас устроит, но можно задать требуемый режим в дополнительном параметре std::async
перед вызываемой функцией. Этот параметр имеет тип std::launch
и может принимать следующие значения: std::launch::deferred
отложить вызов функции до того момента, когда будет вызвана функция-член wait()
или get()
объекта-будущего; std::launch::async
запускать функцию в отдельном потоке; std::launch::deferred | std::launch::async
оставить решение на усмотрение реализации. Последний вариант подразумевается по умолчанию. В случае отложенного вызова функция может вообще никогда не выполниться. Например:
auto f6 =
Выполнять в
std::async(std::launch::async, Y(), 1.2);
новом потоке
auto f7 =
std::async(
std::launch::deferred, baz, std::ref(x));
auto f8 = std::async(
Выполнять
std::launch::deferred | std::launch::async,
при вызове
baz, std::ref(x));
wait() или get()
auto f9 = std::async(baz, std::ref(x));
Оставить на
усмотрение реализации
f7.wait();
Вызвать отложенную функцию
Ниже в этой главе и далее в главе 8 мы увидим, что с помощью std::async
легко разбивать алгоритм на параллельно выполняемые задачи. Однако это не единственный способ ассоциировать объект std::future
с задачей; можно также обернуть задачу объектом шаблонного класса std::packaged_task<>
или написать код, который будет явно устанавливать значения с помощью шаблонного класса std::promise<>
. Шаблон std::packaged_task
является абстракцией более высокого уровня, чем std::promise
, поэтому начнем с него.