QNX/UNIX: Анатомия параллелизма - Олег Цилюрик 8 стр.


struct sched_param param;

pthread_getschedparam(pthread_self(), &policy, &param);

param sched_priority -= 2;

pthread_attr_t attr;

pthread_attr_init(&attr);

pthread_attr_setschedparam(&attr, &param);

pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);

pthread_create(NULL, &attr, &func, NULL);

Или даже так (хотя это немного грубее):

int policy;

struct sched_param param;

pthread_getschedparam(pthread_self(), &policy, &param);

pthread_attr_t attr;

pthread_attr_init(&attr);

attr.param.sched_priority = param.sched_priority - 2;

pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);

pthread_create(NULL, &attr, &func, NULL);

Примечание

Как и при установке политики диспетчеризации, параметры диспетчеризации потока (и приоритет в их составе) будут установлены, только если параметр типа наследования от родителя установлен в PTHREAD_EXPLICIT_SCHED посредством вызова pthread_attr_setinheritsched().

Заметим здесь вскользь (в дальнейшем нам представится возможность использовать эти знания), что помимо "продуктивных" потоков (компонент системы и пользовательских приложений) в системе всегда существует один "паразитный" поток, запущенный с приоритетом 0 (idle-поток). Он "выбирает" весь остаток процессорного времени в те периоды, когда все имеющиеся в системе продуктивные потоки перейдут в блокированные состояния (ожидания). Подобная практика хорошо известна и реализуется также в большинстве других операционных систем.

Отличия от POSIX

Если следовать POSIX-стандарту, то некоторые из атрибутов невозможно переопределить до фактического создания этого стандарта (их можно изменить позже в самом коде потока, но иногда это не совсем правильное решение). Все эти возможности относятся к асинхронному завершению потока; детали функционирования этого механизма рассматриваются позже. К подобного рода атрибутам относятся:

• запретить асинхронное завершение (отмену) потока;

• установить тип завершаемости потока;

• определить, что должно происходить при доставке потоку сигналов.

QNX расширяет возможности POSIX, позволяя по условию OR установить соответствующие биты-флаги в поле flags атрибутной записи, прежде чем будет произведен вызов, создающий поток. Не существует функций вида pthread_attr_set_*(), эквивалентных этим установкам. К этим флагам относятся:

PTHREAD_CANCEL_ENABLE - запрос на завершение будет обрабатываться в соответствии с типом завершаемости, установленным для потока (значение по умолчанию);

PTHREAD_CANCEL_DISABLE - запросы на завершение будут отложены;

PTHREAD_CANCEL_ASYNCHRONOUS - если завершение разрешено, отложенные или текущие запросы будут выполнены немедленно;

PTHREAD_CANCEL_DEFERRED - если завершение разрешено, запросы на завершение будут отложены до достижения точки завершаемости (значение по умолчанию);

PTHREAD_MULTISIG_ALLOW - завершать по сигналу все потоки в процессе (POSIX-умолчание);

PTHREAD_MULTISIG_DISALLOW - завершать по сигналу только тот поток, который принял сигнал.

После запуска потока все атрибуты, связанные с завершаемостью потока, могут быть изменены вызовами pthread_setcancelstate() и pthread_setcanceltype().

Передача параметров потоку

Зачастую каждый поток из группы последовательно создаваемых потоков, выполняющих одну и ту же функцию, нужно запускать со своим индивидуальным блоком данных (параметром потока). Для этого предназначен 4-й параметр вызова pthread_create() - указатель на блок данных типа void*. Характерно, что это может быть произвольная структура данных сколь угодно сложного типа, структуризацию которой вызывающий pthread_create() код и функция потока должны понимать единообразно; никакого контроля соответствия типов на этапе вызова не производится.

Достаточно часто встречающийся на практике образец многопоточного кода - это циклическая процедура ожидания наступления некоторого условия (события), после которого порождается новый экземпляр потока, призванный обслужить наступившее событие (типичная схема всего разнообразия многопоточных сетевых серверов). В таких случаях код, порождающий потоки, выглядит подобно следующему фрагменту:

// функция потока:

void* ThreadProc(void* data) {

// ... выполняется обработка, используя структуру *(DataParam*)data

return NULL;

}

// порождающий потоки код:

while (true) {

// инициализация области параметров

struct DataParam data(...);

if ( /* ожидаем нечто */ )

pthread_create(NULL, &attr, &ThreadProc, &data);

}

Этот простейший код крайне опасен: при быстрых циклах и, что намного важнее, непредсказуемых моментах повторных созданий экземпляров потоков из вызывающего цикла необходимо обеспечить, чтобы используемое в функции потока ThreadProc() значение данных было адекватным. Оно может быть изменено в вызывающем коде или даже, более того, просто разрушено при выходе из локальной области видимости, как в следующем коде:

// порождающий потоки код:

while(true) {

if ( /* ожидаем нечто */ ) {

struct DataParam data(...);

pthread_create(NULL, &attr, &ThreadProc, &data);

}

// здесь может идти достаточно длительная обработка

}

Здесь блок данных, выделяемый в стеке порождающего потока, к началу его использования в дочернем потоке может быть просто уничтожен.

Единственно надежный способ обеспечить требование актуальности передаваемых данных - это создание копии блока параметров непосредственно при входе в функцию потока, например так (если определена операция копирования):

// функция потока:

void* ThreadProc(void* data) {

DataParam copy = *(DataParam*)data;

// выполняется обработка, используя структуру copy

return NULL;

}

или так (если определен инициализирующий конструктор структуры данных):

// функция потока:

void* ThreadProc(void* data) {

DataParam copy(*(DataParam*)data);

// ... выполняется обработка, используя структуру copy

return NULL;

}

Но и этот код оказывается некорректен. При порождении потока нам нужно, чтобы инициализация копии переданных данных в теле функции потока произошла до того, как на очередном цикле оригинал этих данных будет разрушен или изменен. Но дисциплины диспетчеризации равнозначных потоков (в данном случае родительского и порожденного) в операционной системе никак не регламентируют (и не имеют права этого делать!) порядок их выполнения после точки ветвления - pthread_create().

Обеспечить актуальность копии переданных данных можно несколькими искусственными способами:

1. Передачей в качестве аргумента pthread_create() специально сделанной ранее временной копии экземпляра данных, например:

if ( /* нечто */ ) {

// static обеспечивает неразрушаемость

static struct DataParam copy;

copy = data;

pthread_create(NULL, &attr, &ThreadProc, &copy);

}

Этот способ иногда хорошо "срабатывает" для данных типа символьных строк, представленных в стандарте языка С (однако используется он не часто):

void* ThreadProc(void *data) {

...

// можно даже не делать копию - это уже копия:

printf("%s", (char*)data);

}

...

while (true) {

char *data = ... /* инициализация данных */;

if ( /* нечто */ )

pthread_create(NULL, &attr, &ThreadProc, strdup(data));

}

2. Для передачи параметра скалярного типа (char, short, int), не превышающего размер указателя, очень часто в самых разнообразных источниках [1, 3] можно увидеть такой трюк, когда указателю присваивается непосредственное значение скалярной величины:

// функция потока:

void* ThreadProc(void* data) {

// ... выполняется обработка, используя значение параметра (char)data

return NULL;

}

// порождающий потоки код:

while (true) {

char data = /* инициализация параметра */;

if ( /* ожидаем нечто */ )

pthread_create(NULL, &attr, &ThreadProc, (void*)data);

}

Или даже так:

pthread_create(NULL, &attr, &ThreadProc, (void*)5);

pthread_create(NULL, &attr, &ThreadProc, (void*)(x + y));

Положительной стороной этого решения (которое тем не менее остается трюкачеством) является то, что параметр в ThreadProc() передается по значению, то есть неявным копированием, и любые последующие манипуляции с ним не приведут к порче переданного значения. Таким образом, в ThreadProc() нет необходимости создавать локальную копию полученного параметра.

3. Создание экземпляра данных в родительском потоке для каждого нового экземпляра создаваемого потока с гарантированным уничтожением экземпляра данных при завершении порожденного потока:

void* ThreadProc(void *data) {

// используем экземпляр data без копирования ...

...

delete data;

return NULL;

}

...

if ( /* нечто */ ) {

// создание экземпляра вместе с инициализацией

// (предполагаем, что для DataParam ранее определен

// копирующий конструктор):

pthread_create(NULL, &attr, &ThreadProc, new DataParam(data));

}

Это один из самых безошибочно срабатывающих способов, и единственным его недостатком является то, что объекты создаются в одной структурной единице (родителе), а уничтожаться должны в другой (потомке), которые иногда даже размещаются в различных файлах программного кода, а ошибки с парностью операций над динамической памятью обходятся очень дорого.

4. "Ручной" вызов диспетчеризации в порождающем потоке, по крайней мере при дисциплине по умолчанию для QNX - round-robin:

if ( /* нечто */ ) {

pthread_create(NULL, &attr, &ThreadProc, &data);

sched_yield();

}

Мы не можем произвольно изменять последовательность выполнения потоков (чем нарушили бы принципы диспетчеризации) и не можем утверждать, что при наличии многих потоков именно только что порожденный поток получит управление. Но после выполнения sched_yield() мы можем гарантировать, что родительский поток будет помещен именно в хвост очереди потоков равных приоритетов, готовых к исполнению, и его активизация произойдет позже всех наличных в системе потоков, в том числе и последнего порожденного.

Примечание

В этом месте внимательный читатель вправе оживиться: "Обманывают, обвешивают…". Да, описываемое здесь экзотическое решение не совсем корректно с позиции уже упоминавшегося определения Э. Дейкстры "слабосвязанных процессов" и независимости результата от относительных скоростей: в SMP-системе при количестве процессоров, большем, чем количество параллельных потоков, это решение не будет работать так, как мы ему предписываем. Но к настоящему времени такое "стечение обстоятельств" может быть либо чисто теоретически умозрительным, либо возникать на экспериментальных единичных образцах SMP, содержащих десятки и сотни процессоров…, но где QNX, насколько нам известно, не используется.

В этом варианте и в порожденном потоке можно симметрично сделать так:

void* ThreadProc(void *data) {

struct DataParam copy(*data);

sched_yield();

...

}

Примечание

Иногда для выражения этой техники используется и такая, в общем несколько небрежная, форма записи:

pthread_create(NULL, &attr, &ThreadProc, &data);

delay(1); // вместо sched_yield()

Фокус здесь состоит не в том, что 1 миллисекунда - это время, заведомо достаточное для копирования экземпляра данных, а в том, что POSIX определяет, что операция delay() (а также все родственные ей функции: sleep(), nanosleep() и другие функции пассивной задержки) является операцией пассивного ожидания и должна сопровождаться принудительной диспетчеризацией.

5. Создание потока с приоритетом выше, чем родительский, с последующим возвратом его приоритета на прежний уровень после выполнения требуемой инициализации копии:

void* ThreadProc(void* data) {

struct sched_param param;

int policy;

pthread_getschedparam(pthread_self(), &policy, &param);

param.sched_priority -= 2;

// инициализация копии блока данных

...

pthread_setschedparam(pthread_self(), policy, &param);

...

return NULL;

}

...

if ( /* нечто */ ) {

pthread_attr_t attr;

pthread_attr_init(&attr);

pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);

pthread_attr_setschedpolicy(&attr, SCHED_RR);

int policy;

struct sched_param param;

pthread_getschedparam(pthread_self(), &policy, &param);

attr.param.sched_priority = param.sched_priority + 2;

pthread_create(NULL, &attr, &ThreadProc, &data);

}

Здесь в точке создания порожденный поток сразу же вытесняет своего родителя и выполняет инициализацию копии области параметров, после чего возвращается к нормальной (с равными приоритетами) диспетчеризации. Этот вариант может показаться искусственно усложненным, но отлично вписывается своими побочными достоинствами в создание многопоточных GUI-приложений для графической подсистемы Photon.

Данные потока

В реальном коде часто возникает ситуация, когда одновременно исполняются несколько экземпляров потоков, использующих один и тот же код (при создании потоков указывается одна и та же функция потока). При этом некоторые данные (например, статические объекты, глобальные объекты программного файла, объявленные вне функции потока) будут представлены для различных экземпляров потока в виде единого экземпляра данных, а другие (блок параметров функции потока, локальные данные функции потока) будут представлять собой индивидуальные экземпляры для каждого потока:

class DataBlock {

DataBlock(void);

DataBlock(DataBlock&);

}

DataBlock A;

void* ThreadProc(void *data) {

static DataBlock B;

DataBlock C, D(*(DataBlock*)data);

...

delete data;

return NULL;

}

...

for(int i = 0; i < N; i++ ) {

DataBlock E;

// ... обработка и заполнение E ...

pthread_create(NULL, NULL, &ThreadProc, new DataBlock(E));

}

Назад Дальше