Параллельное программирование на С++ в действии. Практика разработки многопоточных программ - Энтони Уильямс 6 стр.


Предположим, что требуется написать функцию для создания потока, который должен работать в фоновом режиме, но при этом мы не хотим ждать его завершения, а хотим, чтобы владение новым потоком было передано вызывающей функции. Или требуется сделать обратноесоздать поток и передать владение им некоторой функции, которая будет ждать его завершения. В обоих случаях требуется передать владение из одного места в другое.

Именно здесь и оказывается полезной поддержка классом std::thread семантики перемещения. В предыдущем разделе отмечалось, что в стандартной библиотеке С++ есть много типов, владеющих ресурсами, например std::ifstream и std::unique_ptr, которые являются перемещаемыми, но не копируемыми, и один из нихstd::thread. Это означает, что владение потоком можно передавать от одного экземпляра std::thread другому, как показано в примере ниже. В нем создается два потока выполнения, владение которыми передается между тремя объектами std::thread: t1, t2 и t3.

void some_function();

void some_other_function();

std::thread t1(some_function);         (1)

std::thread t2 = std::move(t1);        (2)

t1 = std::thread(some_other_function); (3)

std::thread t3;     (4)

t3 = std::move(t2); (5)

t1 = std::move(t3); (6) Это присваивание приводит

;                       к аварийному завершению программы

Сначала создастся новый поток (1) и связывается с объектом t1. Затем владение явно передается объекту t2 в момент его конструирования путем вызова std::move() (2). В этот момент с t1 уже не связан никакой поток выполнения: поток, в котором исполняется функция some_function, теперь связан с t2.

Далее создается еще один поток, который связывается с временным объектом типа std::thread (3). Для последующей передачи владения объекту t1 уже не требуется явный вызов std::move(), так как владельцем является временный объект, а передача владения от временных объектов производится автоматически и неявно.

Объект t3 конструируется по умолчанию (4), а это означает, что в момент создания с ним не связывается никакой поток. Владение потоком, который в данный момент связан с t2, передастся объекту t3 (5), опять-таки путем явного обращения к std::move(), поскольку t2именованный объект. После всех этих перемещений t1 оказывается связан с потоком, исполняющим функцию some_other_function, t2 не связан ни с каким потоком, a t3 связан с потоком, исполняющим функцию some_function .

Последнее перемещение (6) передает владение потоком, исполняющим some_function, обратно объекту t1, в котором исполнение этой функции началось. Однако теперь с t1 уже связан поток (который исполнял функцию some_other_function), поэтому вызывается std::terminate(), и программа завершается. Так делается ради совместимости с поведением деструктора std::thread. В разделе 2.1.1 мы видели, что нужно либо явно ждать завершения потока, либо отсоединить его до момента уничтожения; то же самое относится и к присваиванию: нельзя просто «прихлопнуть» поток, присвоив новое значение объекту std::thread, который им управляет.

Поддержка операции перемещения в классе std::thread означает, что владение можно легко передать при возврате из функции, как показано в листинге 2.5.

Листинг 2.5. Возврат объекта std::thread из функции

std::thread f() {

 void some_function();

 return std::thread(some_function);

}

std::thread g() {

 void some_other_function(int);

 std::thread t(some_other_function, 42);

 return t;

}

Аналогично, если требуется передать владение внутрь функции, то достаточно, чтобы она принимала экземпляр std::thread по значению в качестве одного из параметров, например:

void f(std::thread t);

void g() {

 void some_function();

 f(std::thread(some_function));

 std::thread t(some_function);

 f(std::move(t));

}

Одно из преимуществ, которые даёт поддержка перемещения в классе std::thread, заключается в том, что мы можем модифицировать класс thread_guard из листинга 2.3, так чтобы он принимал владение потоком. Это позволит избежать неприятностей в случае, когда время жизни объекта thread_guard оказывает больше, чем время жизни потока, на который он ссылается, а, кроме того, это означает, что никто другой не сможет присоединиться к потоку или отсоединить его, так как владение было передано объекту thread_guard. Поскольку основное назначение этого класса гарантировать завершение потока до выхода из области видимости, я назвал его scoped_thread. Реализация и простой пример использования приведены в листинге 2.6.

Листинг 2.6. Класс scoped_thread и пример его использования

class scoped_thread {

 std::thread t;

public:

 explicit scoped_thread(std::thread t_) : (1)

 t(std::move(t_)) {

 if (!t.joinable()) (2)

  throw std::logic_error("No thread");

 }

 ~scoped_thread() {

  t.join();         (3)

 }

 scoped_thread(scoped_thread const&)=delete;

 scoped_thread& operator=(scoped_thread const&)=delete;

};

struct func; см. листинг 2.1

void f() {

 int some_local_state;

 scoped_thread t(std::thread(func(some_local_state))); (4)

 do_something_in_current_thread();

}                   (5)

Этот пример очень похож на приведенный в листинге 2.3, только новый поток теперь передается непосредственно конструктору scoped_thread (4), вместо того чтобы создавать для него отдельную именованную переменную. Когда новый поток достигает конца f (5), объект scoped_thread уничтожается, а затем поток соединяется (3) с потоком, переданным конструктору (1). Если в классе thread_guard из листинга 2.3 деструктор должен был проверить, верно ли, что поток все еще допускает соединение, то теперь мы можем сделать это в конструкторе (2) и возбудить исключение, если это не так.

Поддержка перемещения в классе std::thread позволяет также хранить объекты этого класса в контейнере при условии, что класс контейнера поддерживает перемещение (как, например, модифицированный класс std::vector<>). Это означает, что можно написать код, показанный в листинге 2.7, который запускает несколько потоков, а потом ждет их завершения.

Листинг 2.7. Запуск нескольких потоков и ожидание их завершения

void do_work(unsigned id);

void f() {

 std::vector<std::thread> threads;

 for (unsigned i = 0; i < 20; ++i) {           Запуск

  threads.push_back(std::thread(do_work(i))); потоков

 }                                            Поочередный

 std::for_each(threads.begin(), threads.end(),вызов join()

 std::mem_fn(&std::thread::join));           для каждого потока

}

Если потоки применяются для разбиения алгоритма на части, то зачастую такой подход именно то, что требуется: перед возвратом управления вызывающей программе все потоки должны завершиться. Разумеется, столь простая структура, как в листинге 2.7, предполагает, что каждый поток выполняет независимую работу, а единственным результатом является побочный эффект, заключающийся в изменении разделяемых данных. Если бы функция f() должна была вернуть вызывающей программе значение, зависящее от результатов операций, выполненных в потоках, то при такой организации получить это значение можно было бы только путем анализа разделяемых данных по завершении всех потоков. В главе 4 обсуждаются альтернативные схемы передачи результатов работы из одного потока в другой.

Хранение объектов std::thread в векторе std::vectorшаг к автоматизации управления потоками: вместо тот чтобы создавать отдельные переменные для потоков и выполнять соединение напрямую, мы можем рассматривать группу потоков. Можно пойти еще дальше и создавать не фиксированное число потоков, как в листинге 2.7, а определять нужное количество динамически, во время выполнения.

2.4. Задание количества потоков во время выполнения

В стандартной библиотеке С++ есть функция std::thread::hardware_concurrency(), которая поможет нам решить эту задачу. Она возвращает число потоков, которые могут работать по-настоящему параллельно. В многоядерной системе это может быть, например, количество процессорных ядер. Возвращаемое значение всего лишь оценка; более того, функция может возвращать 0, если получить требуемую информацию невозможно. Однако эту оценку можно с пользой применить для разбиения задачи на несколько потоков.

В листинге 2.8 приведена простая реализация параллельной версии std::accumulate. Она распределяет работу между несколькими потоками и, чтобы не создавать слишком много потоков, задает ограничение снизу на количество элементов, обрабатываемых одним потоком. Отмстим, что в этой реализации предполагается, что ни одна операция не возбуждает исключений, хотя в принципе исключения возможны; например, конструктор std::thread возбуждает исключение, если не может создать новый поток. Но если добавить в этот алгоритм обработку исключений, он перестанет быть таким простым; эту тему мы рассмотрим в главе 8.

Листинг 2.8. Наивная реализация параллельной версии алгоритма std::accumulate

template<typename Iterator, typename T>

 struct accumulate_block {

 void operator()(Iterator first, Iterator last, T& result) {

  result = std::accumulate(first, last, result);

 }

};

template<typename Iterator, typename T>

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);

 if (!length) (1)

  return init;

 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length+min_per_thread - 1) / min_per_thread; (2)

 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();

 unsigned long const num_threads = (3)

  std::min(

   hardware.threads != 0 ? hardware_threads : 2, max_threads);

 unsigned long const block_size = length / num_threads; (4)

 std::vector<T> results(num_threads);

 std::vector<std::thread> threads(num_threads - 1); (5)

 Iterator block_start = first;

 for(unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size); (6)

  threads[i] = std::thread( (7)

   accumulate_block<Iterator, T>(),

   block_start, block_end, std::ref(results(i)));

  block_start = block_end;  (8)

 }

 accumulate_block()(

  block_start, last, results[num_threads-1]); (9)

 std::for_each(threads.begin(), threads.end(),

 std::mem_fn(&std::thread::join)); (10)

 return

  std::accumulate(results.begin(), results.end(), init); (11)

}

Хотя функция довольно длинная, по существу она очень проста. Если входной диапазон пуст (1), то мы сразу возвращаем начальное значение init. В противном случае диапазон содержит хотя бы один элемент, поэтому мы можем разделить количество элементов на минимальный размер блока и получить максимальное число потоков (2).

Это позволит избежать создания 32 потоков на 32-ядерной машине, если диапазон состоит всего из пяти элементов.

Число запускаемых потоков равно минимуму из только что вычисленного максимума и количества аппаратных потоков (3): мы не хотим запускать больше потоков, чем может поддержать оборудование (это называется превышением лимита), так как из-за контекстных переключений при большем количестве потоков производительность снизится. Если функция std::thread::hardware_concurrency() вернула 0, то мы берем произвольно выбранное число, я решил остановиться на 2. Мы не хотим запускать слишком много потоков, потому что на одноядерной машине это только замедлило бы программу. Но и слишком мало потоков тоже плохо, так как это означало бы отказ от возможного параллелизма.

Каждый поток будет обрабатывать количество элементов, равное длине диапазона, поделенной на число потоков (4). Пусть вас не пугает случай, когда одно число нацело не делится на другое,ниже мы рассмотрим его.

Теперь, зная, сколько необходимо потоков, мы можем создать вектор std::vector<T> для хранения промежуточных результатов и вектор std::vector<std::thread> для хранения потоков (5). Отметим, что запускать нужно на один поток меньше, чем num_threads, потому что один поток у нас уже есть.

Запуск потоков производится в обычном цикле: мы сдвигаем итератор block_end в конец текущего блока (6) и запускаем новый поток для аккумулирования результатов по этому блоку (7). Начало нового блока совпадает с концом текущего (8).

После того как все потоки запущены, главный поток может обработать последний блок (9). Именно здесь обрабатывается случай деления с остатком: мы знаем, что конец последнего блокаlast, а сколько в нем элементов, не имеет значения.

Аккумулировав результаты но последнему блоку, мы можем дождаться завершения всех запущенных потоков с помощью алгоритма std::for_each (10), а затем сложить частичные результаты, обратившись к std::accumulate (11).

Прежде чем расстаться с этим примером, полезно отметить, что в случае, когда оператор сложения, определенный в типе T, не ассоциативен (например, если Tэто float или double), результаты, возвращаемые алгоритмами parallel_accumulate и std::accumulate, могут различаться из-за разбиения диапазона на блоки. Кроме того, к итераторам предъявляются более жесткие требования: они должны быть по меньшей мере однонаправленными, тогда как алгоритм std::accumulate может работать и с однопроходными итераторами ввода. Наконец, тип T должен допускать конструирование по умолчанию (удовлетворять требованиям концепции DefaultConstructible), чтобы можно было создать вектор results. Такого рода изменения требований довольно типичны для параллельных алгоритмов: но самой своей природе они отличаются от последовательных алгоритмов, и это приводит к определенным последствиям в части как результатов, так и требований. Более подробно параллельные алгоритмы рассматриваются в главе 8. Стоит также отметить, что из-за невозможности вернуть значение непосредственно из потока, мы должны передавать ссылку на соответствующий элемент вектора results. Другой способ возврата значений из потоков, с помощью будущих результатов, рассматривается в главе 4.

В данном случае вся необходимая потоку информация передавалась в момент его запуска  в том числе и адрес, но которому необходимо сохранить результат вычисления. Так бывает не всегда; иногда требуется каким-то образом идентифицировать потоки во время работы. Конечно, можно было бы передать какой-то идентификатор, например значение i в листинге 2.7, но если вызов функции, которой этот идентификатор нужен, находится несколькими уровнями стека глубже, и эта функция может вызываться из любого потока, то поступать так неудобно. Проектируя библиотеку С++ Thread Library, мы предвидели этот случай, поэтому снабдили каждый поток уникальным идентификатором.

2.5. Идентификация потоков

Идентификатор потока имеет тип std::thread::id, и получить его можно двумя способами. Во-первых, идентификатор потока, связанного с объектом std::thread, возвращает функция-член get_id() этого объекта. Если с объектом std::thread не связан никакой поток, то get_id() возвращает сконструированный по умолчанию объект типа std::thread::id, что следует интерпретировать как «не поток». Идентификатор текущего потока можно получить также, обратившись к функции std::this_thread::get_id(), которая также определена в заголовке <thread>.

Назад Дальше