Основы многопоточного и распределенного программирования

Асинхронная передача сообщений


В этом разделе представлены две реализации асинхронной передачи сообщений. В первой из них к ядру для разделяемой памяти из главы 6 добавлены каналы и примитивы передачи сооб­щений. Эта реализация подходит для работы на одном процессоре или на мультипроцессоре с раз­деляемой памятью. Во второй реализации ядро с разделяемой памятью дополнено до распределен­ного ядра, которое может работать в многопроцессорной системе или в сети из отдельных машин.

10.1.1. Ядро для разделяемой памяти

Каждый канал программы представлен в ядре дескриптором канала. Дескриптор канала содержит заголовки списков сообщений и заблокированных процессов. В списке сообщений находятся сообщения, поставленные в очередь; в списке блокированных процессов — про­цессы, ожидающие получения сообщений. Хотя бы один из этих списков всегда пуст, по­скольку, если есть доступное сообщение, процесс не блокируется, а если есть заблокирован­ный процесс, то сообщения не ставятся в очередь.

Дескриптор создается с помощью примитива ядра createChan, который вызывается по одному разу для каждой декларации chan в программе до создания процессов. Массив кана­лов создается либо вызовом примитива createChan для каждого элемента, либо одним вы­зовом примитива createChan с параметром, указывающим размер массива. Примитив cre­ateChan возвращает имя (индекс или адрес) дескриптора.

376                                                                            Часть 2. Распределенное программирование

Оператор send реализован с помощью примитива sendChan. Сначала процесс-отправитель вычисляет выражения и собирает значения в единое сообщение, которое обычно записывает в стек выполнения процесса, передающего сообщение. Затем вызывается примитив sendChan; его аргументами являются имя канала (возвращенное из вызова createChan) и само сообще­ние. Примитив sendChan сначала находит дескриптор канала. Если в списке заблокированных процессов есть хотя бы один процесс, то оттуда удаляется самый старый процесс, а сообщение копируется в его адресное пространство.
После этого дескриптор процесса помещается в список готовых к работе. Если заблокированных процессов нет, сообщение необходимо сохранить в списке сообщений дескриптора, поскольку передача является неблокирующей операцией, и, следовательно, отправителю нужно позволить продолжать выполнение.

Пространство для сохраненного сообщения можно выделять динамически из единого буферного пула, или с каждым каналом может быть связан отдельный коммуникационный буфер. Однако асинхронная передача сообщений поднимает важный вопрос реализации: что, если пространство ядра исчерпано? У ядра есть два выхода: либо остановить выполне­ние программы из-за переполнения буфера, либо заблокировать передающий процесс, по­ка не появится достаточно места.

Остановка программы — это решительный шаг, поскольку свободное пространство может вскоре и появиться, но программист сразу получает сигнал о том, что сообщения производятся быстрее, чем потребляются (это обычно говорит об ошибке). С другой сто­роны, блокировка передающего процесса нарушает неблокирующую семантику оператора send и усложняет ядро, создавая дополнительный источник блокировок. И здесь автор па­раллельной программы не может ничего предполагать о скорости и порядке выполнения процессов. Ядра операционных систем блокируют отправителей сообщений и при необхо­димости выгружают заблокированные процессы из памяти в файл подкачки, поскольку должны избегать отказов системы. Однако для языков программирования высокого уровня приемлемым выбором является остановка программы.

Оператор receive реализуется с помощью примитива receiveChan. Его аргументами являются имя канала и адрес буфера сообщений. Действия примитива receiveChan дуаль­ны действиям примитива sendChan. Сначала ядро находит дескриптор, соответствующий выбранному каналу, затем проверяет его список сообщений. Если список не пуст, первое сооб­щение из него удаляется и копируется в буфер сообщений получателя. Если список сообщений пуст, процесс-получатель добавляется в список заблокированных процессов.




Получив сообще­ние, процесс- адресат распаковывает сообщение из буфера в соответствующие переменные.

Четвертый примитив, emptyChan, используется для реализации функции empty (ch). Он просто находит дескриптор и проверяет, не пуст ли список сообщений. В действительно­сти структуры данных ядра находятся не в защищенной области, и выполняемый процесс может сам проверять свой список сообщений. Критическая секция не нужна, поскольку про­цессу нужно просмотреть только заголовок списка сообщений.

В листинге 10.1 показаны схемы всех четырех примитивов. Эти примитивы добавлены к однопроцессорному ядру (см. листинг 6.1). Значением executing является адрес дескрип­тора процесса, выполняемого в данный момент, a dispatcher — это процедура, планирую­щая работу процессов на данном процессоре. Действия примитивов sendChan и re­ceiveChan очень похожи на действия примитивов Р и V в семафорном ядре (см. лис­тинг 6.4). Основное отличие состоит в том, что дескриптор канала содержит список сообщений, тогда как дескриптор семафора — только его значение.

Ядро в листинге 10.1 можно изменить для работы на мультипроцессоре с разделяемой памятью, используя методику, описанную в разделе 6.2. Основное требование состоит в том, что структуры данных ядра нужно хранить в памяти, доступной всем процессорам, а для защиты критических секций кода ядра, дающих доступ к разделяемым данным, ис­пользовать блокировки.



10.1.2. Распределенное ядро

Покажем, как для поддержки распределенного выполнения расширить ядро с разделяе­мой памятью. Главная идея — дублировать ядро, помещая по одной его копии на каждую ма­шину, и обеспечить взаимодействие копий с помощью сетевых примитивов.

В распределенной программе каждый канал хранится на отдельной машине. Предполо­жим пока, что у канала может быть сколько угодно отправителей и только один получатель. Тогда дескриптор канала было бы логично поместить на ту же машину, на которой выполня­ется получатель. Процесс, выполняемый на этой машине, обращается к каналу так же, как



378                                                                            Часть 2. Распределенное программирование

и при использовании ядра с разделяемой памятью. Но процесс, выполняемый на другой ма­шине, не может обращаться к каналу напрямую; для этого должны взаимодействовать два яд­ра, выполняемых на этих машинах. Ниже будет описано, как изменить ядро с разделяемой памятью и как для реализации распределенной программы использовать сеть.

На рис. 10.1 показана структура распределенного ядра. Ядро, выполняемое на каждой машине, содержит дескрипторы каналов и процессы, расположенные на данной машине. Как и раньше, в каждом ядре есть обработчики локальных прерываний для вызовов супервизора (внутренние ловушки), таймеры и устройства ввода-вывода. Сеть связи является особым ви­дом устройства ввода-вывода. Таким образом, в каждом ядре есть обработчики прерывания сети и процедуры, которые читают из сети и записывают в нее.



В качестве конкретного примера рассмотрим типичный доступ в сети Ethernet. Контрол­лер Ethernet состоит из двух независимых частей (для записи и для чтения). С каждой из этих частей в ядре связан обработчик прерывания. Прерывание записи устанавливается, когда операция записи завершается; сам контроллер следит за доступом к сети. Прерывание чтения устанавливается на процессоре, получающем по сети сообщение.

Примитив ядра, выполняемый в результате вызова из прикладного процесса, при переда­че сообщения на другую машину вызывает процедуру ядра netWrite. Она имеет три аргу­мента: процессор назначения, вид сообщения (см. ниже) и само сообщение. Сначала проце­дура netWrite получает буфер, форматирует сообщение и записывает его в буфер. Затем, ес­ли записывающая часть сетевого контроллера свободна, инициируется запись; в противном случае буфер добавляется в очередь запросов на запись. В обоих случаях происходит выход из netWrite. Позже при возникновении прерывания записи связанный с ним обработчик ос­вобождает буфер сообщения, которое только что было записано.


Если очередь записи не пус­та, обработчик прерывания инициирует следующую сетевую запись.

Ввод из сети обычно обрабатывается в обратном порядке. Когда к ядру приходит сообще­ние, вызывается обработчик прерывания чтения из сети. Сначала он сохраняет состояние выполняющегося процесса, затем выделяет новый буфер для следующего входного сетевого сообщения. Наконец обработчик чтения распаковывает первое поле сообщения, чтобы опре­делить его вид, и вызывает соответствующий виду примитив ядра."

В листинге 10.2 схематически представлены процедуры сетевого интерфейса. К ним отно­сятся обработчики сетевых прерываний и процедура netWrite. Обработчик пе-

" При другом подходе к обработке сетевого ввода используется процесс-демон, выполняемый вне ядра. Обработчик прерывания просто передает сообщение в канал, из которого демон постоянно выби­рает сообщения. Использование демона уменьшает время выполнения собственно обработчика чтения, но увеличивает общее время, необходимое для обработки сетевого ввода. С другой стороны, упрощается ядро, освобождаясь от подробностей обработки сетевых сообщений.



;

Для простоты предполагается, что передача по сети происходит без ошибок, и, следова­тельно, не нужно подтверждать получение сообщений или передавать их заново. Также игно­рируется проблема исчерпания области буфера для входящих или исходящих сообщений. На практике для ограничения числа сообщений в буфере используется управление потоком. Ссылки на литературу, в которой описаны эти темы, даны в исторической справке.

Канал может храниться локально или удаленно, поэтому его имя должно состоять из двух полей: номера машины и индекса или смещения. Номер машины указывает, где хранится де-

380                                                                            Часть 2 Распределенное программирование

скриптор; индекс определяет положение дескриптора в ядре указанной машины. Примитив createChan также нужно дополнить аргументом, указывающим, на какой машине нужно создать канал.


Выполняя примитив createChan, ядро сначала проверяет этот аргумент. Ес­ли канал находится на той же машине, ядро создает канал (как в листинге 10.1). В противном случае ядро блокирует выполняемый процесс и передает на удаленную машину сообщение create_chan. Это сообщение содержит идентификатор выполняемого процесса. В конце концов локальное ядро получит сообщение chan_done, которое говорит о том, что на уда­ленной машине канал создан. Сообщение содержит имя канала и указывает процесс, для ко­торого создан канал. Как показано в листинге 10.2, обработчик netRead_handler, получая это сообщение, вызывает еще один примитив ядра, chanDone, который снимает блокировку процесса, запросившего создание канала, и возвращает ему имя созданного канала.

Демон ядра на другой стороне сети, получив сообщение create_chan, вызывает примитив remoteCreate. Этот примитив создает канал и возвращает сообщение CHAN_DONE первому ядру. Таким образом, при создании канала на удаленной машине выполняются следующие шаги.

•    Прикладной процесс вызывает локальный примитив createChan.

•    Локальное ядро передает сообщение create_chan удаленному ядру.

•    Обработчик прерывания чтения в удаленном ядре получает это сообщение и вызывает примитив remoteCreate удаленного ядра.

•    Удаленное ядро создает канал и передает сообщение CHAN_DONE локальному ядру.

•    Обработчик прерывания чтения в локальном ядре получает это сообщение и вызывает примитив chanDone, запускающий прикладной процесс.

В распределенном ядре нужно также изменить примитив sendChan. Примитив send-Chan здесь будет намного проще, чем createChan, поскольку операция передачи send яв­ляется асинхронной. В частности, если канал находится на локальной машине, примитив sendChan должен выполнить такие же операции, как в листинге 10.1. Если канал находится на удаленной машине, примитив sendChan передает на эту машину сообщение SEND. В этот момент выполняемый процесс может продолжить работу. Получив сообщение SEND, удален­ное ядро вызывает примитив remoteSend, который, по существу, выполняет те же действия, что и (локальный) примитив sendChan.


Его отличие состоит лишь в том, что входящее со­общение уже записано в буфер, поэтому ядру не нужно выделять для него новый буфер.

В листинге 10.3 схематически представлены примитивы распределенного ядра. Примити­вы receiveChan и emptyChan по сравнению с листингом 10.1 не изменились, поскольку у каждого канала есть только один получатель, причем расположенный на той же машине, что и канал. Однако если это не так, то для взаимодействия машины, на которой был вызван примитив receiveChan или empty, и машины, на которой расположен канал, нужны до­полнительные сообщения. Это взаимодействие аналогично взаимодействию при создании канала — локальное ядро передает сообщение удаленному ядру, которое выполняет прими­тив и возвращает результаты локальному ядру.





10.2. Синхронная передача сообщений

Напомним, что при синхронной передаче сообщений примитивы send и receive явля­ются блокирующими: пытаясь взаимодействовать, процесс должен сначала подождать, пока

382                                                                        Часть 2. Распределенное программирование

к этому не будет готов второй процесс. Это делает ненужными потенциально неограничен­ные очереди буферизованных сообщений, но требует, чтобы для установления синхрониза­ции получатель и отправитель обменялись управляющими сигналами.

Ниже будет показано, как реализовать синхронную передачу сообщений с помощью асинхронной, а затем — как реализовать операторы ввода, вывода и защищенные операторы взаимодействия библиотеки CSP, используя специальный учетный процесс (clearinghouse process). Вторую реализацию можно адаптировать для реализации пространства кортежей Linda (см. раздел 7.7). В исторической справке в конце главы даны ссылки на децентрализо­ванные реализации; см. также упражнения.

10.2.1. Прямое взаимодействие с использованием асинхронных сообщений

Пусть дан набор из п процессов, которые взаимодействуют между собой, используя асин­хронную передачу сообщений. Передающая сторона называет нужный ей приемник сообще­ний, а принимающая сторона может получать сообщения от любого отправителя.


Например, исходный процесс S передает сообщение процессу назначения D, выполняя операцию

synch_send(D, expressions);

Процесс назначения ждет получения сообщения из любого источника при выполнении опе­ратора

synch_receive(source,   variables);

Когда процессы доходят до выполнения этих операторов, идентификатор отправителя и зна­чения выражений передаются в виде сообщения от процесса S процессу D. Затем эти данные записываются в переменные source и variables соответственно. Получатель, таким обра­зом, узнает идентификатор отправителя сообщения.

Описанные примитивы можно реализовать с помощью асинхронной передачи сообще­ний, используя три массива каналов: sourceReady, destReady и transmit. Первые два массива используются для обмена управляющими сигналами, а третий — для передачи дан­ных. Каналы используются, как показано в листинге 10.4. Процесс-получатель ждет сообще­ния из своего элемента массива sourceReady; сообщение идентифицирует отправителя. За­тем получатель разрешает отправителю продолжить передачу, и передается само сообщение.

Код в листинге 10.4 обрабатывает отправку в указанное место назначения и прием сооб­щения из любого источника. Если обе стороны должны всегда называть друг друга, то в лис­тинге 10.4 не нужны каналы sourceReady, а получатель может просто передавать отправи­телю сигнал о готовности к получению сообщения. Оставшихся операций передачи и приема вполне достаточно для синхронизации двух процессов. С другой стороны, если процесс-получатель может называть источник или принимать сообщения из любого источника, ситуа­ция становится намного сложнее. (Такая возможность есть в библиотеке MPI.) Тогда либо нужно иметь отдельный канал для каждого пути взаимодействия и опрашивать каналы, либо получающий процесс должен проверять каждое сообщение и сохранять те из них, которые он еще не готов принять. Читателю предоставляется задача изменить реализацию, чтобы она об­рабатывала описанную ситуацию (см. упражнения в конце главы).



Листинг 10.4. Синхронное взаимодействие с использованием асинхронных сообщений

разделяемые переменные:

chan sourceReady[n](int);              # готовность отправителя

chan destReady[n]();                        # готовность получателя

chan transmit[n](byte msg[*]);   # передача данных



10.2.2. Реализация защищенного взаимодействия с помощью учетного процесса

Вновь предположим, что есть n процессов, но они взаимодействуют и синхронизируются с помощью операторов ввода и вывода языка CSP (см. раздел 7.6). Напомним, что они имеют такой вид.

Source?port (переменные) ;                 # оператор ввода

Destination 'port (выражения) ;         #  оператор вывода

Эти операторы согласуются, когда процесс Destination выполняет оператор ввода, а про­цесс Source — оператор вывода, имена портов одинаковы, переменных и выражений поров­ну, и их типы совпадают.

В языке CSP также представлено защищенное взаимодействие с недетерминированным порядком. Напомним, что операторы защищенного взаимодействия имеют следующий вид. В;   С  ->  S;

Здесь В — необязательное логическое выражение (защита), С — оператор ввода или вывода, as— список операторов. Операторы защищенного взаимодействия используются внутри операторов i f или do для выбора из нескольких возможных взаимодействий.

Основное в реализации операторов ввода, вывода и защищенных операторов — объеди­нить в пары процессы, желающие выполнить согласованные операторы взаимодействия. Для подбора пар используется специальный "учетный процесс" СН ("clearinghouse"). Пусть обычный процесс Рг собирается выполнить оператор вывода, в котором про­цессом назначения является Р:, а процесс Р3 — операцию ввода с pj. в качестве источника. Предположим, что имя порта и типы сообщений совпадают. Эти процессы взаи­модействуют с учетным процессом и между собой, как показано на рис. 10.2. Каждый из процессов Рх и Р-, пере­дает учетному процессу СН сообщение, описывающее же­лаемое взаимодействие.


Процесс сн сначала сохраняет ;

первое из этих сообщений. Получив второе, он возвраща­ется к первому и определяет, согласуются ли операторы двух процессов. Затем СН передает обоим процессам ответ. Получив ответ, процесс рг отсы­лает выражения своего оператора вывода процессу р.,, получающему их в переменные своего оператора ввода. В этот момент каждый процесс начинает выполнять код, следующий за опе­ратором взаимодействия.



384                                                                            Часть 2. Распределенное программирование

Чтобы уточнить программную структуру на рис. 10.2, нужен канал для каждого пути взаи­модействия. Один канал используется для сообщений от обычных процессов к учетному. Эти сообщения содержат шаблоны, описывающие возможные варианты согласованных операто­ров. Каждому обычному процессу для возвращения сообщений от учетного процесса нужен канал ответа. Наконец, нужен один канал данных для каждого обычного процесса, содержа­щего операторы ввода; такие каналы используются другими обычными процессами.

Пусть у каждого обычного процесса есть уникальный идентификатор (целое число от 1 до п). Эти идентификаторы используются для индексирования каналов данных и каналов отве­та. Сообщения-ответы от учетного процесса определяют направление взаимодействия и идентификатор другого процесса. Сообщения по каналу данных передаются в виде массива байтов. Предполагается, что сообщения описывают сами себя, т.е. содержат метки, позво­ляющие получателю определить типы данных в сообщении.

Доходя до выполнения операторов ввода, вывода или операторов защищенного взаимо­действия, обычные процессы передают учетному шаблоны. Эти шаблоны используются для подбора соответствующих пар операторов. Каждый шаблон имеет четыре поля. direction,   source,   destination,   port

Для операторов вывода поле направления (direction) имеет значение OUT, для операторов ввода— IN. Источник (source) и приемник (destination) — это идентификаторы отпра­вителя и желаемого получателя (для вывода) или желаемого отправителя и получателя (для ввода).


Поле порт (port) содержит целое число, которое однозначно определяет порт и, сле­довательно, типы данных операторов ввода и вывода. Каждому типу порта в исходном тексте программы должен соответствовать определенный номер. Это значит, что каждому явному имени порта должно быть назначено уникальное целочисленное значение, как и для каждого безымянного порта. (Напомним, что имена портов используются в исходной программе, по­этому номера портов можно присвоить статически во время компиляции программы.)

Листинг 10.5 содержит объявления разделяемых типов данных и каналов взаимодействия, а также код, выполняемый обычными процессами при достижении операторов ввода и выво­да. Выполняя незащищенный оператор взаимодействия, процесс передает учетному процессу один шаблон и ждет ответа. Найдя согласующийся вызов (как описано ниже), учетный про­цесс передает ответ. Получив его, процесс-отправитель передает выражения оператора выво­да в процесс назначения, который записывает их в переменные своего оператора ввода.





Используя защищенный оператор взаимодействия, процесс сначала должен проверить каждую защиту. Для каждого истинного выражения защиты процесс создает шаблон и добав­ляет его в множество t. После вычисления всех выражений защиты процесс передает множе­ство t учетному процессу и ждет ответа. (Если t пусто, процесс просто продолжает работу.) Полученный ответ указывает процесс, выбранный для взаимодействия, и направление этого взаимодействия. Если направление OUT, процесс отсылает сообщение другому процессу, иначе ждет получения данных. После этого процесс выбирает соответствующий защищенный оператор и выполняет его. (Предполагается, что полей direction и who достаточно, чтобы определить, какой из операторов защищенного взаимодействия был выбран учетным процес­сом в качестве согласованного. В общем случае для этого нужны также порт и типы данных.)

В листинге 10.6 представлен учетный процесс СН. Массив pending содержит по одному набору шаблонов для каждого обычного процесса.


Если pending[i] не пусто, обычный процесс i блокируется в ожидании согласованного оператора взаимодействия. Получая но­вое множество t, процесс СН сначала просматривает один из шаблонов, чтобы определить, какой из процессов s передал его. (Если в шаблоне указано направление OUT, то источником является процесс s; если указано направление IN, то s —приемник.) Затем учетный процесс сравнивает элементы множества t с шаблонами в массиве pending, чтобы увидеть, есть ли согласование. По способу своего создания два шаблона являются согласованными, если их направления противоположны, а порты и источник с приемником одинаковы. Если СН нахо­дит соответствие с некоторым процессом i, он отсылает ответы процессам s и i (в ответах каждому процессу сообщаются идентификатор другого процесса и направление взаимодейст­вия). В этом случае процесс СН очищает элемент pending [ i ], поскольку процесс i больше не заблокирован. Не найдя соответствия ни для одного шаблона во множестве t, процесс СН сохраняет t в элемент pending [s], где s — передающий процесс.

Листинг 10.6. Централизованный учетный процесс

# декларации глобальных типов и канале, как в листинге 10.5 process СН {



Глава 10 Реализация языковых механизмов                                                                             387

ловии, что в программе не может быть взаимных блокировок. Пусть элемент, с которого начинает­ся поиск, указывается значением целочисленной переменной start. Получая новый набор шаб­лонов, процесс СН сначала просматривает элемент pending [ s tart ], затем pending [ s tart+1 ] и т.д. Как только процесс start получает шанс взаимодействия, учетный процесс СН увеличивает значение переменной start до индекса следующего процесса с непустым множеством ожидания. Тогда значение переменной s tar t будет циклически проходить по индексам процессов (при усло­вии, что процесс start не блокируется навсегда). Таким образом, каждый процесс периодически будет получать шанс быть проверенным первым.


Содержание раздела