QNX/UNIX: Анатомия параллелизма - Олег Цилюрик 15 стр.


• Здесь мы обеспечиваем только одну синхронизированную последовательность вызовов функции func(). А если бы нам потребовалось несколько (много) синхросерий, в каждой из которых выполняется своя функция, а периоды серий не кратны друг другу?

• Наконец, время выполнения целевой функции func() включается в период одного "кругового пробега" цикла, то есть период T отсчитывается от конца предыдущего выполнения функции до начала текущего, а это не совсем то, что мы подразумевали при использовании термина "синхронное".

• Более того, если время выполнения функции func() достаточно флуктуирует от одного вызова до другого (например, из-за изменений данных, с которыми работает функция), то периоды вызовов начинают "гулять", а дисперсия периода результирующей последовательности вызовов func() становится просто непомерно большой.

Ниже показано решение, свободное от многих из этих недостатков (файл t3.cc). Приложение представляет собой тестовую программу, осуществляющую 3 цепочки выполнения различных целевых функций (mon1, mon2, mon3) с разными периодами для каждой цепочки (массив period[]):

Синхронизация выполнения участка кода

#include <stdlib.h>

#include <stdio.h>

#include <unistd.h>

#include <inttypes.h>

#include <errno.h>

#include <iostream.h>

#include <sys/neutrino.h>

#include <sys/syspage.h>

#include <sys/netmgr.h>

#include <pthread.h>

#include <signal.h>

#include <algorithm>

static void out(char s) {

int policy;

sched_param param;

pthread_getschedparam(pthread_self(), &policy, &param);

cout << s << param.sched_curpriority << flush;

}

// целевые функции каждой из последовательностей только

// выводят свой символ-идентификатор и следующий за ним

// приоритет, на котором выполняется целевая функция

static void mon1(void) { out('.'); }

static void mon2(void) { out('*'); }

static void mon3(void) { out('+'); }

// это всего лишь перерасчет временных интервалов,

// измеренных в тактах процессора (в наносекундах)

inline uint64_t cycles2nsec(uint64_t с) {

const static uint64_t cps =

// частота процессора

SYSPAGE_ENTRY(qtime)->cycles_per_sec;

return (с * 1000000000) / cps;

}

// структура, необходимая только для накопления статистики параметров

// ряда временных отметок: среднего, среднеквадратичного отклонения,

// минимального и максимального значений

struct timestat {

private:

uint64_t prev;

public:

uint64_t num;

double mean, disp, tmin, tmax;

timestat(void) {

mean = disp = tmin = tmax = 0.0;

num = 0;

}

// новая временная отметка в ряду:

void operator++(void) {

uint64_t next = ClockCycles(), delta;

if (num i= 0) {

double delta = cycles2nsec(next - prev);

if (num == 1) tmin = tmax = delta;

else tmin = min(tmin, delta), tmax = max(tmax, delta);

mean += delta;

disp += delta * delta;

}

prev = next;

num++;

}

// подвести итог ряда;

void operator !(void) {

mean /= (num - 1);

disp = sqrt(disp / (num - 1) - mean * mean);

}

}

// предварительное описание функции потока объекта

void* syncthread(void*);

class thrblock {

private:

static int code;

bool ok, st;

public:

pthread_t tid;

struct sigevent event;

timer_t timer;

int chid;

void* (*func)(void*);

sched_param param;

// структура только для статистики:

timestat sync;

// конструктор класса - он не только инициализирует структуру данных

// создаваемого объекта, но и запускает отдельный поток для его исполнения

thrblock(

// параметры конструктора

// - целевая функция последовательности

void (*dofunc)(void);

// - период ее синхронизации

unsigned long millisec;

// - приоритет возбуждения синхросерии

unsigned short priority;

// - копить ли статистику временных интервалов?

bool statist = false

) {

// создание канала для получения уведомлений от таймера

if (!(ok = ((chid = ChannelCreate(0)) >= 0))) return;

// создать соединение по каналу, которое будет использовать таймер

event.sigev_coid =

ConnectAttach(ND_LOCAL_NODE, 0, chid, NTO_SIDE_CHANNEL, 0);

if (!(ok = (event.sigev_coid >= 0))) return;

// занести целевую функцию, заодно выполнив

// трюк преобразования над ее типом

func = (void*(*)(void*))dofunc;

int policy;

// запомнить приоритет вызывающей программы

// под этим приоритетом и вызывать целевую функцию

pthread_getschedparam(pthread_self(), &policy, &param);

st = statist;

event.sigev_code = code++;

event.sigev_notify = SIGEV_PULSE;

// а вот это приоритет, с которым нужно будет пробуждаться от таймера!

event.sigev_priority = priority;

// создание таймера

if (!(ok = (timer_create(CLOCK_REALTIME, &event, &timer) == 0))) return;

// запуск отдельного потока, который по сигналу

// таймера будет выполнять целевую функцию

if (!(ok = (pthread_create(&tid, NULL, &syncthread, (void*)this) == EOK)))

return;

// и только после этого можно установить период срабатывания

// таймера, после чего он фактически и запускается

struct itimerspec itime;

nsec2timespec(&itime.it_value, millisec * 1000000ull);

itime it_interval = itime.it_value;

if (!(ok = (timer_settime(timer, 0, &itime, NULL) == 0))) return;

}

// признак того, что объект создан успешно и его поток запущен:

bool OK(void) { return ok; }

bool statistic(void) { return st; }

};

int thrblock.code = _PULSE_CODE_MINAVAIL;

// функция потока объекта

void* syncthread(void *block) {

thrblock *p = (thrblock*)block;

struct _pulse buf;

pthread_attr_t attr;

while(true) {

// ожидание пульса от периодического таймера объекта

MsgReceivePulse(p->chid, &buf, sizeof(struct _pulse), NULL);

pthread_attr_init(&attr);

pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

// восстановить приоритет целевой функции до уровня того,

// кто ее устанавливал, вызывая конструктор

pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);

pthread_attr_setschedparam(&attr, &p->param);

// запуск целевой функции в отдельном "отсоединенном" потоке

pthread_create(NULL, &attr, p->func, NULL);

if (p->statistic()) ++p->sync;

}

}

// 'пустой' обработчик сигнала SIGINT (реакция на ^С)

inline static void empty(int signo) {}

int main(int argc, char **argv) {

// с этой точки стандартная реакция на ^С отменяется...

signal(SIGINT, empty);

// массив целевых функций

void(*funcs[])(void) = { &mon1, &mon2, &mon3 };

// периоды их синхросерий запуска

int period[] = { 317, 171, 77 };

// приоритеты, на которых отрабатывается реакция

// синхросерий на каждый из таймеров синхросерий

int priority[] = { 15, 5, 25 };

int num = sizeof(funcs) / sizeof(*funcs);

// запуск 3-х синхронизированных последовательностей

// выполнения (созданием объектов)

thrblock** tb = new (thrblock*)[num];

for (int i = 0; i < num; i++) {

tb[i] = new thrblock(funcs[i], period[i],

priority[i], true);

if (!tb[i]->OK())

perror("synchro thread create"), exit(EXIT_FAILURE);

}

// ... а теперь ожидаем ^С.

pause();

// подсчет статистики и завершение программы

cout << endl << "Monitoring finalisation!" << endl;

// вывод временных интервалов будем делать в миллисекундах:

const double n2m = 1000000.;

for (int i = 0; i < num, i++) {

timestat *p = &tb[i]->sync;

!(*p); // подсчет статистики по объекту

cout << i << '\t' << p->num << "\t=> " << p->mean / n2m << " [" <<

p->tmin / n2m << "..." << p->tmax / n2m << "]\t~" << p->disp / n2m <<

" (" << p->disp / p->mean * 100 << "%)" << endl;

}

return EXIT_SUCCESS;

}

Вся функциональность программы сосредоточена в одном классе - thrblock, который может в неизменном виде использоваться для разных приложений. Необычной особенностью объекта этого класса является то, что он выполнен в технике "активных объектов", навеянной поверхностным знакомством с языками программирования школы Н. Вирта - ActiveOberon и Zormon. В ней говорится, что конструктор такого объекта не только создает объект данных, но и запускает (как вариант) отдельный поток выполнения для каждого создаваемого объекта. В нашем случае задача потоковой функции состоит в вызове целевой функции, адрес которой был передан конструктору объекта в качестве одного из параметров.

Ниже представлены отличия нашей реализации от простого цикла с задержкой, обсуждавшейся выше (помимо исправлений очевидных недостатков):

• Для каждого синхронизирующего таймера установлен свой приоритет "пробуждения", и он может быть достаточно высоким, для того чтобы предотвратить вытеснение этого синхронизирующего потока.

• После "пробуждения" по таймеру запускается целевая функция, но выполняется это отдельным потоком, причем потоком "отсоединенным". Другими словами, процесс выполнения целевой функции никак не влияет на общую схему синхронизации.

• Перед запуском целевой функции выполняющему ее потоку восстанавливается приоритет породившего потока (но не потока обслуживания таймера!), ведь нам не нужно, чтобы целевая функция, тем более, возможно и не очень значимая, как в нашем примере, могла влиять вытеснением на процессы синхронизации.

Запустим наше тестовое приложение:

# t3

+10+10*10+10+10.10*10+10+10*10+10+10.10*10+10+10+10*10+10.10+10*10+10+10*10+10.10+10*10+10+10*10+10.10+10+10*10+10+10+10.10+10+10*10+10+10.10*10+10+10+10*10+10.10+10*10+10+10*10+10+10.10*10+10+10*10+10+10.10+10*10+10+10*10+10.10+10*10+10+10*10+10.10+10+10*10+10+10*10^C

Monitoring finalisation!

0 32 => 316.919 [316.867...317.895] ~0.178511 (0.056327%)

1 59 => 170.955 [168.583...173.296] ~0.92472 (0.540914%)

2 132 => 76.9796 [76.942...77.9524] ~0.085977 (0.111688%)

Первое, что мы должны отметить, - это очень приличную точность выдержки периода синхронизации (последняя колонка вывода). Для того чтобы убедиться в том, что целевая функция при этом выполняется под приоритетом породившего ее потока, закомментируем строки, выделенные жирным шрифтом в коде программы:

# t3

+25+25*5+25+25.15*5+25+25*5+25+25.15*5+25+25+25*5+25.15+25*5+25+25*5+25.15+25*5+25+25*5*5+25.15+25+25*5+25+25*5.15+25+25*5+25+25.15*5+25+25+25*5+25.15+25*5+25+25*5+25+25.15*5+25+25*5+25+25^C

Monitoring finalisation!

0 32 => 316.919 [316.797...317.915] ~0.185331 (0.0584792%)

1 60 => 170.955 [168.964...173.925] ~0.47915 (0.280279%)

2 34 => 76.9796 [76.8895...77.9694] ~0.0937379 (0.12177%)

В этом варианте (и диагностический вывод это подтверждает) мы искусственно ликвидировали наследование приоритета по цепочке порождения: сработавший таймер - функция потока - целевая функция объекта. Это не совсем соответствует цели, намеченной в начале этого раздела, но все же этот вариант иллюстрирует, что именно наш предыдущий вариант удовлетворял всем поставленным целям.

3. Сигналы

Сигналы инициируются некоторыми событиями в системе и посылаются процессу для его уведомления о том, что произошло нечто неординарное, требующее определенной реакции. Порождающее сигнал событие может быть действием пользователя или может быть вызвано другим процессом или ядром операционной системы. Сигналы являются одним из самых старых и традиционных механизмов UNIX.

Уже из этого краткого описания можно заключить, что:

• действия, вызываемые для обработки сигнала, являются принципиально асинхронными;

• сигналы могут быть использованы как простейшее, но мощное средство межпроцессного взаимодействия.

Все сигналы определяются целочисленными константами, но для программиста это в принципе не так важно, поскольку каждому сигналу приписано символическое наименование вида SIG*. Все относящиеся к сигналам определения находятся в заголовочном файле <signal.h>, который должен быть включен в любой код, работающий с сигналами. В последующих примерах мы не будем показывать эту включающую директиву, чтобы не загромождать текст.

Назад Дальше