QNX/UNIX: Анатомия параллелизма - Олег Цилюрик 23 стр.


Операции блокировки

Для семафора определены три модификации операции блокировки:

int sem_wait(sem_t* sem);

int sem_trywait(sem_t* sem);

#include <time.h>

int sem_timedwait(sem_t* sem, const struct timespec * abs_timeout);

Все эти функции опираются на функцию (native QNX API):

int SyncSemWait(sync_t* sync, int try);

Функция простого ожидания sem_wait() пытается выполнить декремент счетчика семафора. Если исходное значение счетчика было больше или равно 1, то функция после выполнения операции декремента возвращает управление в вызвавший код. Если же значение внутреннего счетчика семафора равнялось 0, выполнение вызвавшего функцию потока блокируется до тех пор, пока какой-либо другой поток не увеличит значение счетчика семафора. После того как значение счетчика будет увеличено, блокированный поток получает управление (в порядке очереди), выполняет (завершает) декремент счетчика семафора и возвращает управление. Блокированное состояние потока может быть также прервано получением направленного ему сигнала (см. главу 3).

Функция ожидания с проверкой семафора, sem_trywait(), до выполнения операции над семафором проверяет значение счетчика. Если это значение больше нуля, функция выполняет декремент счетчика, а если значение равно нулю - возвращает управление потоку, не блокируясь на ожидании доступности семафора (но захват семафора в этом случае не происходит, о чем извещает код возврата).

Функция ожидания с тайм-аутом, sem_timedwait(), ожидает возможности уменьшить на 1 счетчик семафора (блокируется на ожидании) до момента времени abs_timeout. Это ожидание реализовано с помощью функции TimerTimeout() (native QNX API) с параметром SIGEV_UNBLOCK. Благодаря этой функции может быть прерван ряд состояний блокировки потоков (в том числе блокировки на семафоре, мьютексе и условной переменной).

Операции освобождения

int sem_post(sem_t* sem);

Эта операция увеличивает на единицу (инкремент) внутренний счетчик семафора, и в этом она диаметрально противоположна операции блокировки. Если перед этим значение счетчика семафора было равно 0, то один из потоков, ожидающих разблокирования семафора, переходит в состояние готовности. Из списка ожидающих потоков система выбирает самый приоритетный, а если таких несколько, то поток, дольше всех ждавший из очереди наиболее приоритетных потоков.

Эта функция имеет свой оригинал в native API QNX:

int SyncSemPost(sync_t* sync);

Фактически разница между POSIX и QNX API в вариантах этой функции состоит в регистрируемых ею ошибках.

Функция sem_post() сообщает о следующих ошибках:

EINVAL - неверный дескриптор семафора sem;

ENOSYS - функция sem_post() не поддерживается системой.

В отличие от sem_post(), функция SyncSemPost() может указывать на ошибки:

EAGAIN - недостаточно памяти для создания внутреннего объекта синхронизации;

EFAULT - неверный указатель на семафор sync;

EINTR - выполнение функции прервано сигналом;

EINVAL - аргумент sync не указывает на инициированный семафор.

Как видим, функция QNX API несколько разнообразнее в плане контроля передаваемых аргументов и результата выполнения функции.

Получение статуса семафора

int sem_getvalue(sem_t* sem, int* value);

Эта функция используется преимущественно для отладки операций над семафорами. По адресу, указанному в value, устанавливается текущее значение счетчика семафора. Поскольку значение счетчика семафора может измениться в любой момент, то значение, которое возвращает эта функция, имеет смысл только непосредственно в точке ее вызова.

Возможные ошибки:

EINVAL - неправильный объект семафор sem.

Использование семафора

Как уже говорилось выше, семафор является крайне гибким и эффективным средством синхронизации, особенно удобным для построения собственных средств планирования выполнения потоков. В этом смысле семафор представляет ценность не только как самостоятельное средство синхронизации выполнения потоков, но и как материал для построения специфических средств планирования и синхронизации для конкретных задач. Мы уже говорили, что семафор образует самодостаточный базис, позволяющий строить гораздо более сложные средства синхронизации без привлечения других средств синхронизации. В принципе это так, но нет ничего плохого и в смешанном использовании как семафоров, так и мьютексов для построения собственных средств синхронизации.

Проиллюстрируем все вышесказанное на двух примерах. Сначала мы построим "очередь сообщений", предназначенную для трансляции сообщений графической системы к "медленному" обработчику реакций. Это одно из решений весьма распространенной задачи о предотвращении "зависания" пользовательского интерфейса на период выполнения медленного обработчика. Для решения этой задачи обработчик события оконной системы (например, нажатия кнопки или выбора пункта меню) и функция, которая непосредственно производит требуемые действия (предусмотренные по наступлению указанного события - нажатия кнопки), должны располагаться в разных потоках.

Было бы удобно, если бы при поступлении новых данных от графической системы поток обработки автоматически (неявно) разблокировался и немедленно приступал к обработке, а в периоды отсутствия таких данных - простаивал в блокированном состоянии. Для реализации такой схемы мы построили синхронизирующую очередь сообщений, которая использует семафор для уведомления потока обработки о наличии новых данных. В принципе указанная задача сводится к уже упоминавшемуся ранее классу задач о синхронизации производителя и потребителя данных.

class event {

/* класс синхронизирующего события, доставляющего

уведомление о добавлении нового элемента в буфер */

public:

event() { sem_init(&_block, 0, 0); }

~event() { sem_destroy(&_block); }

void wait() { sem_wait(&_block); }

void reset() { sem_post(&_block); }

private:

sem_t _block;

};

/* шаблонный класс очереди данных */

template <class T> class CDataQueue {

public:

CDataQueue() {}

~CDataQueue() {}

void push(T _new_data) {

_data_queue.push(_new_data);

data_event.reset();

}

T pop() {

data_event.wait();

T res = _data_queue.front();

_data_queue.pop();

return res;

}

private:

std::queue<T> _data_queue;

event data_event;

};

Принцип работы CDataQueue заключается в том, что для хранения вновь поступающих данных используется очередь, что делает практически независимыми потоки производителя и потребителя. Независимыми во всех случаях, кроме пустой очереди. Потребитель должен быть блокирован до тех пор, пока нет данных от производителя. Как только производитель внесет данные в очередь, поток потребителя разблокируется и считает эти данные. Тонкость заключается в том, что поток потребителя блокируется сам при вызове функции pop(), а разблокируется из потока производителя при вызове им функции push().

Как видите, в построении специфических средств синхронизации нет ничего сложного, вопреки часто встречающемуся утверждению, что создание средств синхронизации со специфическим поведением неадекватно трудоемко, а простейший код позволяет адаптировать возможности тривиального семафора под конкретную задачу.

А теперь хотелось бы обратить ваше внимание на тот факт, что "безопасным" использованием описанной схемы будет только вариант двух потоков - одного производителя и одного потребителя. Если несколько (более двух) потоков одновременно попробуют выполнить функции pop() или push(), начнется путаница, и чем это закончится, сказать трудно. По своей логике код обеих функций в многопоточной системе требует эксклюзивного исполнения. Чтобы обеспечить исключительный доступ к этим участкам кода, мы могли бы использовать дополнительный семафор, но есть другой вариант - специальное средство синхронизации, разработанное именно для решения задачи взаимного исключения, - мьютекс.

Мьютекс

Мьютекс (от mutual exclusion - взаимное исключение) - это один из базовых примитивов синхронизации QNX Neutrino. Этот элемент реализуется на уровне микроядра системы и имеет широкий набор атрибутов и настроек. Назначение мьютекса - защита участка кода от совместного выполнения несколькими потоками. Такой участок кода иногда называют критической секцией, и обычно он является областью модификации общих переменных или обращения к разделяемому ресурсу.

Принцип работы мьютекса заключается в следующем: при обращении потока к функции блокировки (захвата) pthread_mutex_lock() проверяется, захвачен ли уже мьютекс, и если да, то вызвавший поток блокируется до освобождения критической секции. Если же нет, то объект мьютекс запоминает, какой поток его захватил (то есть владельца) и устанавливает признак, что он захвачен.

Когда действия, которые нельзя производить совместно, закончены, поток должен вызвать функцию разблокировки (освобождения) pthread_mutex_unlock(), которая проверяет, действительно ли вызвавший ее поток является тем, который в данный момент владеет мьютексом, и если да, то она разблокирует мьютекс, после чего ОС проводит редиспетчеризацию потоков. Если есть потоки, ожидающие освобождения мьютекса, то один из таких потоков, имеющий наивысший приоритет, переводится из состояния блокирования в состояние готовности и захватывает мьютекс.

В QNX Neutrino 6.2.1 мьютекс имеет наибольшие возможности по тонкой настройке своих параметров среди всех иных элементов синхронизации. В связи с этим поведение мьютекса очень сильно зависит от того, какие значения вы присвоите его атрибутам.

Как видите, главное отличие мьютекса от семафора заключается в том, что он хранит информацию о потоке, исполняющем код критической секции. Отсюда и важнейшие свойства мьютекса. Мьютекс нельзя разблокировать из другого потока. Если поток захватил мьютекс, то только он может его "отпустить". Используя информацию о владельце (tid потока), система может изменять в нужное время приоритет владельца для разрешения проблемы инверсии приоритетов. Наконец, зная идентификатор потока, мьютекс может выделить ситуацию, когда поток, уже захвативший мьютекс, пытается захватить его повторно (одна из разновидностей deadlock - мертвой блокировки, когда не существует ни одного потока, способного отпустить мьютекс и разблокировать потоки, ожидающие освобождения этого мьютекса).

В ОС QNX возможен вариант работы мьютекса, не предусмотренный стандартом POSIX, - рекурсивный мьютекс. В этом режиме поток, владеющий мьютексом при повторном его захвате, не блокируется. Мьютекс только отмечает в своем внутреннем счетчике, сколько раз он был захвачен, и разблокируется только после равного количества освобождений (естественно, тем же потоком).

Все объявления относительно мьютексов находятся в заголовочном файле <pthread.h>, и программный код, их использующий, должен включать директиву:

#include <pthread.h>

Параметры мьютекса

Параметры мьютекса хранятся в структуре pthread_mutexattr_t, которая определена типом sync_attr_t. Эта структура должна быть, создана и определена до инициализации мьютекса, после чего может быть переопределена и использована для других объектов типа мьютекс.

Инициализация параметров

int pthread_mutexattr_init(const pthread_mutexattr_t* attr);

Функция инициализирует структуру атрибутов мьютекса, на которую указывает параметр attr. Тип данных pthread_mutexattr_t определен в файле <pthread.h> (производный от типа sync_attr_t, который в свою очередь определен в файле <target_nto.h>) следующим образом:

struct _sync_attr_t {

int protocol;

int flags;

int prioceiling;

int clockid; /* только для condvar */

int reserved[4];

};

После инициализации значения по умолчанию могут быть прочитаны или изменены группой функций, приведенной ниже.

Установка граничного приоритета

int pthread_mutexattr_setprioceiling(

pthread_mutexattr_t* attr, int prioceiling);

int pthread_mutexattr_getprioceiling(

const pthread_mutexattr_t* attr, int* prioceiling);

Эти функции записывают/читают значение приоритета, которое будет присваиваться потоку, захватившему мьютекс, если поле protocol структуры pthread_mutexattr_t установлено в значение PTHREAD_PRIO_PROTECT.

Этот параметр используется для реализации протокола граничного приоритета (Priority Ceiling Protocol), предлагающего альтернативный вариант защиты от инверсии приоритетов там, где наследование приоритетов может быть неэффективно или нежелательно.

Примечание

Как известно, классический случай инверсии приоритетов может возникать, когда более двух потоков разного приоритета конкурируют и разделяют один общий ресурс. В этой ситуации возможно такое стечение обстоятельств, когда низкоприоритетный поток захватывает ресурс, но позже вытесняется потоком среднего приоритета, а поток с высоким приоритетом блокируется в ожидании освобождения ресурса, захваченного низкоприоритетным потоком. Эта ситуация детально описана в соответствующей главе [4], где приводится сравнительный анализ поведения различных ОС в этой ситуации. Классическим решением этой проблемы является протокол наследования приоритетов, когда ОС распознает ситуацию подобного блокирования и автоматически повышает приоритет вытесненного потока (низкого приоритета) до уровня наивысшего приоритета из числа потоков, ожидающих освобождения ресурса. В результате поток с низким приоритетом не вытесняется, как надлежало бы в соответствии с его изначальным приоритетом, а быстро проходит критическую секцию (освобождая ее), после чего его приоритет возвращается на исходный уровень.

В ряде случаев наследование приоритетов может оказаться не самым оптимальным решением. Примером здесь может служить ситуация, когда один высокоприоритетный поток разделяет много ресурсов с низкоприоритетными потоками - по одному ресурсу на каждый поток. В такой ситуации может возникнуть положение, когда много низкоприоритетных потоков (вытесненных) выстроятся перед высокоприоритетным. Но тогда длинный ряд последовательных операций вытеснения ("поштучно") и наследования приоритетов блокирующих потоков может привести к тому, что не хватит времени до окончания критического срока выполнения потока высокого приоритета, пока ОС будет анализировать ситуацию и последовательно проводить протокол наследования приоритетов.

Именно для таких ситуаций и предназначен протокол граничного приоритета. В соответствии с этим протоколом примитив синхронизации (в нашем рассмотрении это мьютекс) наделяется собственным фиксированным приоритетом, а приоритет любого потока, захватившего этот мьютекс, поднимается до предустановленного граничного уровня приоритета мьютекса.

Определение протокола защиты от инверсии приоритетов

int pthread_mutexattr_setprotocol(

pthread_mutexattr_t* attr, int protocol);

int pthread_mutexattr_getprotocol(

pthread_mutexattr_t* attr, int* protocol);

Эти функции устанавливают/считывают протокол, который реализуется мьютексом для защиты от инверсии приоритетов. Переменная protocol может принимать следующие значения:

PTHREAD_PRIO_INHERIT (значение по умолчанию) - определяет, что для воспрепятствования возникновению инверсии приоритетов будет использоваться протокол наследования приоритетов.

PTHREAD_PRIO_PROTECT - любой поток, захвативший мьютекс и созданный с таким параметром, будет устанавливать фиксированный уровень приоритета в соответствии со значением поля prioceiling, возвращаемого функцией pthread_mutexattr_getprioceiling(). Таким образом, установка этого значения в качестве протокола мьютекса приводит к реализации протокола граничного приоритета для защиты от инверсии приоритетов.

Внешний доступ

int pthread_mutexattr_setpshared(

pthread_mutexattr_t* attr, int pshared);

int pthread_mutexattr_getpshared(

const pthread_mutexattr_t* attr, int* pshared);

Эти функции устанавливают/считывают внутреннее поле атрибутной записи мьютекса, определяющее, возможен ли доступ к мьютексу из потоков, запущенных вне процесса, в котором был создан и инициализирован мьютекс. Параметр pshared может принимать следующие значения:

PTHREAD_PROCESS_SHARED - любой поток из любого процесса в системе, который может получить доступ к синхронизирующему объекту (для этого придется использовать какой-либо из методов IPC, возможно shared memory), может использовать его по назначению.

Назад Дальше