Tutorial 6 - kolejki POSIX #
Ten tutorial zawiera wyjaśnienia działania funkcji wymaganych na laboratoriach oraz ich parametrów. Jest to jednak wciąż zbiór jedynie poglądowy najważniejszych informacji – należy koniecznie przeczytać wskazane strony manuala, aby dobrze poznać i zrozumieć wszystkie szczegóły.
Operacje na kolejkach POSIX #
Temat kolejek jest wyraźnie mniej obszerny niż temat łączy typu pipe/FIFO, a ten tutorial jest również krótszy. Kolejki POSIX nie mają problemów z równoczesnym zapisem i odczytem, gwarantują atomowość zapisu nawet dla dużych wiadomości, posiadają wyznaczone granice rekordów oraz obsługują priorytety. Wszystkie te cechy sprawiają, że temat ten jest stosunkowo łatwy, a ewentualne trudności ograniczają się głównie do obsługi notyfikacji o zdarzeniach w kolejce.
Głównym tematem tych zajęć są kolejki komunikatów POSIX, ale zadania będą również opierać się na elementach już wcześniej ćwiczonych, takich jak wątki, procesy i sygnały.
Manual man 7 mq_overview
zawiera opis działania kolejki. Omawiane funkcje są zdefiniowane w pliku nagłówkowym <mqueue.h>
, a interfejs został zaprojektowany tak, aby z kolejek można było korzystać na zasadzie podobnej do korzystania z plików, warto zauważać analogie w definicjach i użyciu kolejno omawianych funkcji.
Kolejki POSIX są kolejkami priorytetowymi, zatem kolejność komunikatów zależy najpierw od priorytetu wiadomości, a następnie od kolejności wysłania.
Tworzenie, otwieranie i usuwanie kolejek #
Kolejki w systemie są identyfikowane po nazwie (definiowanej przez programistę) /<name>
, np. /queue
. Do utworzenia i otwarcia kolejki służy funkcja mq_open
(man 3p mq_open
):
mqd_t mq_open(const char *name, int oflag, ...);
name
jest tutaj nazwą kolejki,oflag
jest maską bitową definiującą sposób dostępu do kolejki. Mamy do dyspozycji następujące opcje:O_RDONLY
,O_WRONLY
,O_RDWR
– otwarcie kolejki w trybie odczytu/zapisu/oba,O_CREAT
– tworzenie kolejki; ta opcja wymaga kolejnych dwóch argumentów:mode_t mode
, określający uprawnienia dostępu, orazmq_attr* attr
, strukturę zawierającą 4 pola:mq_flags
, będąca maską bitową (POSIX definiuje tylko obecność (lub nie) flagiO_NONBLOCK
, inne zależne od implementacji),mq_maxmsg
– maksymalna liczba wiadomości w kolejce,mq_msgsize
– maksymalny rozmiar kolejki,mq_curmsgs
– aktualna liczba wiadomości w kolejce (ignorowana przy tworzeniu kolejki).
Jako
attr
można także podaćNULL
, wtedy kolejka zostanie utworzona z domyślnymi ustawieniami.O_EXCL
– może być obecna tylko zO_CREAT
, powoduje błąd, gdy kolejka o podanej nazwie już istnieje,O_NONBLOCK
– ustawia odczyt/zapis nieblokujący.
Funkcja zwraca deskryptor kolejki (typu mqd_t
) lub (mqd_t)-1
w przypadku błędu.
Nieużywany deskryptor zamykamy funkcją mq_close
(man 3p mq_close
).
Kolejkę o podanej nazwie można natomiast usunąć funkcją mq_unlink
. Należy jednak uważać na sytuację, w której chcemy usunąć kolejkę niezamkniętą przez pewien inny proces. Jak można przeczytać w manualu (man 3p mq_unlink
), wywołanie usunięcia kolejki powoduje tylko zwolnienie jej nazwy (np. do ponownego utworzenia). Sama kolejka jest rzeczywiście usuwana dopiero, gdy wszystkie związane z nią deskryptory zostaną zamknięte. W zależności od implementacji, mq_unlink
może wstrzymać dalsze działanie programu, aż inne procesy zamkną odpowiednie deskryptory. W Linuksie jednak (man 3 mq_unlink
), funkcja ta zwraca natychmiastowo. Nawet w takiej sytuacji, stworzona kolejka o tej samej nazwie będzie inną kolejką od tej starszej.
Atrybuty kolejki #
Kolejka przechowuje swoje atrybuty w strukturze mq_attr
, omówionej w poprzedniej sekcji (patrz wyjaśnienie funkcji mq_open
, flaga O_CREAT
). Pobrać atrybuty kolejki możemy, używając funkcji mq_getattr
(man 3p mq_getattr
), której definicja wygląda następująco:
int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat);
mqdes
to deskryptor otwartej kolejki,mqstat
to wskaźnik do już zaalokowanej zmiennej typustruct mq_attr
, której pola wypełni omawiana funkcja.
Atrybuty kolejki ustawiamy za pomocą funkcji mq_setattr
(man 3p mq_setattr
):
int mq_setattr(mqd_t mqdes, const struct mq_attr *restrict mqstat, struct mq_attr *restrict omqstat);
mqdes
to deskryptor otwartej kolejki,mqstat
to wskaźnik do zmiennej typustruct mq_attr
z atrybutami, które chcemy ustawić,omqstat
to wskaźnik do już zaalokowanej zmiennej typustruct mq_attr
. Pola tej struktury zostaną wypełnione wartościami sprzed wywołaniamq_setattr
. Można też jako ten argument podaćNULL
.
WAŻNE: Funkcja mq_setattr
ustawia wartość tylko pola mq_flags
. Wartości pozostałych pól (mq_maxmsg
, mq_msgsize
i mq_curmsgs
) są niezmienne (można je ustawić tylko w momencie tworzenia kolejki) i są ignorowane przez mq_setattr
. W praktyce więc, zgodnie z POSIX-em, mamy możliwość zmiany tylko trybu zapisu/odczytu na blokujący lub nie.
Obie funkcje, standardowo, zwracają 0
w razie sukcesu i -1
w razie niepowodzenia.
Wysyłanie i odbieranie sygnałów #
Do tej pory, za czytanie i pisanie do różnych obiektów (pliki, łącza) odpowiadały funkcje read
i write
. Kolejki używają zaś własnych funkcji: mq_send
i mq_receive
(man 3p mq_send
, man 3p mq_receive
). Przyjrzyjmy się definicjom obu funkcji:
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
mqdes
to deskryptor otwartej kolejki,msg_ptr
to wskaźnik do bufora z wiadomością do wysłania (dlamq_send
) lub do wypełnienia odbieraną wiadomością (dlamq_receive
),msg_len
to rozmiar buforamsg_ptr
w bajtach (nie może być większy niż wartość atrybutumq_msgsize
w przypadkumq_send
, a dlamq_receive
musi być większy lub równymq_msgsize
),msg_prio
:- dla
mq_send
jest to priorytet wiadomości (musi być mniejszy niż wartość stałejMQ_PRIO_MAX
– POSIX zapewnia, żeMQ_PRIO_MAX >= 32
), - dla
mq_receive
, jest to wskażnik na zmienną, do której zostanie zapisany priorytet odbieranej wiadomości (można też podaćNULL
, jeśli priorytet nie jest nam potrzebny).
- dla
Jeśli próbujemy odczytać dane z pustej kolejki lub zapisać dane do pełnej kolejki i kolejka jest w trybie nieblokującym, funkcje zwrócą -1
, a errno
zostanie ustawione na EAGAIN
.
W trybie blokującym, funkcję czekają, aż możliwe będzie odebranie lub wysłanie komunikatu.
Jeśli wysłanie komunikatu funkcją mq_send
się powiedzie, funkcja zwróci 0
. Funkcja mq_receive
, w razie odebrania komunikatu, zwraca długość tego komunikatu w bajtach.
W trybie blokującym, możemy także ustawić maksymalny czas oczekiwania na wiadomość lub na możliwość jej wysłania. Służą do tego funkcje mq_timedsend
i mq_timedreceive
:
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abstime);
ssize_t mq_timedreceive(mqd_t mqdes, char *restrict msg_ptr, size_t msg_len, unsigned *restrict msg_prio, const struct timespec *restrict abstime);
które zawierają dodatkowy argument abstime
, będący wskaźnikiem na strukturę typu timespec
(z pliku nagłówkowego <time.h>
), wyznaczającą bezwzględny czas, po którym proces przestanie oczekiwać na wiadomość (mq_timedreceive
) lub na miejsce w kolejce (mq_timedsend
). Obie funkcje zachowują się tak jak ich odpowiedniki bez podawanego czasu, z wyjątkiem sytuacji, w której przekroczymy podany czas oczekiwania. Wtedy, obie funkcje zwrócą -1
, a errno
zostanie ustawione na ETIMEDOUT
.
WAŻNE: Jak zostało wspomniane, argument abstime
wyznacza bezwzględny czas, po którym oczekiwanie zakończy się. W celu ustawienia poprawnej wartości, musimy najpierw pobrać aktualną wartość czasu, np. funkcją clock_gettime
dla zegara o ID CLOCK_REALTIME
(man 3p clock_getres
), a następnie do pobranej wartości dodać względny czas oczekiwania (czyli np. jak chcemy czekać 1 sekundę, to po wywołaniu clock_gettime
musimy dodać 1
do pola tv_sec
używanej zmiennej typu struct timespec
).
Gdy kolejka jest w trybie nieblokującym, funkcje mq_timed(send|receive)
zachowują się dokładnie tak samo jak mq_(send|receive)
.
Powiadamianie #
POSIX przewiduje możliwość asynchronicznego powiadamiania procesów o nadchodzących wiadomościach do pustej kolejki. Zarejestrować proces w celu powiadomienia możemy używając funkcji mq_notify
(man 3p mq_notify
):
int mq_notify(mqd_t mqdes, const struct sigevent *notification);
mqdes
to desktyptor otwartej kolejki,notification
to wskaźnik na strukturęsigevent
(plik nagłówkowy<signal.h>
), mówiącą, w jaki sposób powiadomienie powinno zostać zrealizowane.
Strukturę sigevent
powinniśmy wypełnić w następujący sposób, w zależności od typu powiadomienia (man 7 sigevent
):
Dla powiadomienia sygnałem (tylko sygnały realtime):
- ustawiamy
sigev_notify
naSIGEV_SIGNAL
, - ustawiamy
sigev_signo
na sygnał, który chcemy otrzymać (np.SIGRTMIN
), - ustawiamy
sigev_value.sival_int
lubsigev_value.sival_ptr
na wartość, którą chcemy otrzymać w argumencie handlera sygnału (odpowiednio: liczba całkowita lub adres w pamięci).
Ponadto:
- przy ustawianiu handlera sygnałów funkcją
sigaction
, w strukturzesigaction
podanej jako argument funkcji, należy ustawić polesa_flags
naSA_SIGINFO
(patrz funkcjasethandler
z rozwiązania poniższego zadania), - przekazane wartości (w
sigev_value
) są przekazywane do handlera w polusi_value
strukturysiginfo_t
(patrz funkcjamq_handler
z rozwiązania zadania).
- ustawiamy
Dla powiadomienia wątkiem:
- ustawiamy
sigev_notify
naSIGEV_THREAD
, - ustawiamy
sigev_notify_function
na wskaźnik do funkcji przyjmującejunion sigval
(typ polasigev_value
) i zwracającejvoid
, - ustawiamy
sigev_value.sival_int
lubsigev_value.sival_ptr
na wartość, którą chcemy otrzymać w argumencie podanej wyżej funkcji (odpowiednio: liczba całkowita lub adres w pamięci). - opcjonalnie ustawiamy parametry wątku w
sigev_notify_attributes
.
- ustawiamy
Jeśli chcemy wyrejestrować proces z powiadomienia, należy wywołać mq_notify
i jako notification
przekazać NULL
. Przekazanie NULL
, gdy proces nie jest zarejestrowany, spowoduje zwrócenie wartości -1
i ustawienie errno
na EINVAL
(czyli błąd).
WAŻNE:
- Ustawienia powiadomień na kolejce są jednokrotne, czyli po odebraniu notyfikacji nie spodziewamy się kolejnych. Jeśli chcemy otrzymać kolejne powiadomienie, musimy wywołać
mq_notify
ponownie. - Notyfikacja zadziała tylko wtedy, gdy kolejka przejdzie ze stanu pustego w niepusty.
- Tylko jeden proces może być zarejestrowany do otrzymywania notyfikacji (inaczej
mq_notify
zwróci-1
i ustawierrno
naEBUSY
).
Zwróć uwagę na realizację powiadamiania sygnałem w Zadaniu 1. Powiadamianie wątkiem jest natomiast zaimplementowane w rozwiązaniu Zadania 2.
Uwagi #
- Dołączenie biblioteki
librt
jest wymagane podczas linkowania programu używającego kolejek POSIX. - Jeśli otwieramy istniejącą kolejkę, może zawierać ona jakieś dane. Nie należy zakładać, że jest pusta. Aby zapewnić, że kolejka jest pusta, można ją usunąć przed ponownym utworzeniem.
Zadania z rozwiązaniami #
Zadanie 1: powiadomienie sygnałem #
Napisz program, który symuluje prostą wersję gry w bingo. Losującym liczby jest proces rodzic, a graczami – jego procesy potomne. Komunikacja między nimi odbywa się za pomocą kolejek komunikatów POSIX. Proces rodzic tworzy n
procesów potomnych (0 < n < 100
, gdzie n
to parametr programu) oraz dwie kolejki komunikatów. Pierwsza kolejka pout
służy do przekazywania co sekundę losowanych liczb z przedziału [0,9]
do procesów potomnych, druga pin
do odbierania od procesów potomnych informacji o wygranej lub zakończeniu gry.
Procesy potomne na początku losują swoją liczbę oczekiwaną (wygrywającą) E
(z przedziału [0,9]
) oraz liczbę N
liczb, jakie odczytają z kolejki (także z przedziału [0,9]
). Następnie cyklicznie konkurują o dostęp do danych w kolejce pout
– jedna wysłana liczba może być odebrana tylko przez jeden proces, a nie przez wszystkie naraz. Procesy potomne porównują odczytaną z pout
liczbę do swojej liczby E
i, jeśli jest to ta sama liczba, to poprzez drugą kolejkę pin
przekazują informację o jej wartości, a następnie kończą działanie. Po wykonaniu N
sprawdzeń proces potomny przed zakończeniem wysyła przez kolejkę pin
swój numer porządkowy (z przedziału [1,n]
).
Proces rodzica cały czas, asynchronicznie względem wysyłania liczb, ma odbierać komunikaty z pin
i wyświetlać odpowiednie treści na ekranie. Gdy wszystkie procesy potomne zakończą działanie, proces rodzic również kończy działanie i usuwa kolejki.
UWAGA: W tym zadaniu, rozmiar komunikatów w kolejce powinien być ograniczony do 1 bajta!
Rozwiązanie zadania #
Nowe strony z manuala:
man 7 mq_overview
man 3p mq_open
man 3p mq_close
man 3p mq_unlink
man 3p mq_getattr
man 3p mq_setattr
man 3p mq_send
man 3p mq_receive
man 3p mq_notify
man 7 sigevent
#define _GNU_SOURCE
#include <errno.h>
#include <mqueue.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
#define LIFE_SPAN 10
#define MAX_NUM 10
#define ERR(source) \
(fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), perror(source), kill(0, SIGKILL), exit(EXIT_FAILURE))
volatile sig_atomic_t children_left = 0;
void sethandler(void (*f)(int, siginfo_t *, void *), int sigNo)
{
struct sigaction act;
memset(&act, 0, sizeof(struct sigaction));
act.sa_sigaction = f;
act.sa_flags = SA_SIGINFO;
if (-1 == sigaction(sigNo, &act, NULL))
ERR("sigaction");
}
void mq_handler(int sig, siginfo_t *info, void *p)
{
mqd_t *pin;
uint8_t ni;
unsigned msg_prio;
pin = (mqd_t *)info->si_value.sival_ptr;
static struct sigevent notif;
notif.sigev_notify = SIGEV_SIGNAL;
notif.sigev_signo = SIGRTMIN;
notif.sigev_value.sival_ptr = pin;
if (mq_notify(*pin, ¬if) < 0)
ERR("mq_notify");
for (;;)
{
if (mq_receive(*pin, (char *)&ni, 1, &msg_prio) < 1)
{
if (errno == EAGAIN)
break;
else
ERR("mq_receive");
}
if (0 == msg_prio)
printf("MQ: got timeout from %d.\n", ni);
else
printf("MQ: %d is a bingo number!\n", ni);
}
}
void sigchld_handler(int sig, siginfo_t *s, void *p)
{
pid_t pid;
for (;;)
{
pid = waitpid(0, NULL, WNOHANG);
if (pid == 0)
return;
if (pid <= 0)
{
if (errno == ECHILD)
return;
ERR("waitpid");
}
children_left--;
}
}
void child_work(int n, mqd_t pin, mqd_t pout)
{
srand(getpid());
uint8_t my_bingo = (uint8_t)(rand() % MAX_NUM);
uint8_t ni;
for (int life = rand() % LIFE_SPAN + 1; life > 0; life--)
{
if (TEMP_FAILURE_RETRY(mq_receive(pout, (char *)&ni, 1, NULL)) < 1)
ERR("mq_receive");
printf("[%d] Received %d\n", getpid(), ni);
if (my_bingo == ni)
{
if (TEMP_FAILURE_RETRY(mq_send(pin, (const char *)&my_bingo, 1, 1)))
ERR("mq_send");
return;
}
}
if (TEMP_FAILURE_RETRY(mq_send(pin, (const char *)&n, 1, 0)))
ERR("mq_send");
}
void parent_work(mqd_t pout)
{
srand(getpid());
while (children_left)
{
uint8_t ni = (uint8_t)(rand() % MAX_NUM);
if (TEMP_FAILURE_RETRY(mq_send(pout, (const char *)&ni, 1, 0)))
ERR("mq_send");
sleep(1);
}
printf("[PARENT] Terminates \n");
}
void create_children(int n, mqd_t pin, mqd_t pout)
{
while (n > 0)
{
switch (fork())
{
case 0:
child_work(n, pin, pout);
exit(EXIT_SUCCESS);
case -1:
ERR("fork");
}
children_left++;
n--;
}
}
void usage(char *name)
{
fprintf(stderr, "USAGE: %s n k p l\n", name);
fprintf(stderr, "100 > n > 0 - number of children\n");
exit(EXIT_FAILURE);
}
int main(int argc, char **argv)
{
int n;
if (argc != 2)
usage(argv[0]);
n = atoi(argv[1]);
if (n <= 0 || n >= 100)
usage(argv[0]);
mqd_t pin, pout;
struct mq_attr attr = {};
attr.mq_maxmsg = 10;
attr.mq_msgsize = 1;
if ((pin = TEMP_FAILURE_RETRY(mq_open("/bingo_in", O_RDWR | O_NONBLOCK | O_CREAT, 0600, &attr))) == (mqd_t)-1)
ERR("mq_open in");
if ((pout = TEMP_FAILURE_RETRY(mq_open("/bingo_out", O_RDWR | O_CREAT, 0600, &attr))) == (mqd_t)-1)
ERR("mq_open out");
sethandler(sigchld_handler, SIGCHLD);
sethandler(mq_handler, SIGRTMIN);
create_children(n, pin, pout);
static struct sigevent noti;
noti.sigev_notify = SIGEV_SIGNAL;
noti.sigev_signo = SIGRTMIN;
noti.sigev_value.sival_ptr = &pin;
if (mq_notify(pin, ¬i) < 0)
ERR("mq_notify");
parent_work(pout);
mq_close(pin);
mq_close(pout);
if (mq_unlink("/bingo_in"))
ERR("mq unlink");
if (mq_unlink("/bingo_out"))
ERR("mq unlink");
return EXIT_SUCCESS;
}
Uwagi i pytania #
Zwróć uwagę na użycie wskaźnika przesyłanego z sygnałem. Prototyp funkcji obsługi sygnału zawiera dodatkowy parametr
siginfo_t*
, a podczas instalowania obsługi użyto flagi SA_SIGINFO, aby przesłanie wskaźnika było możliwe. Warto zauważyć, że nie wysyłamy sami takiego sygnału. Dostajemy go jako notyfikację o zdarzeniu w kolejce. Funkcjąkill
nie można wysłać wskaźnika. Można to zrobić jedynie za pomocą funkcjisigqueue
.Do przesyłania liczb wykorzystano typ uint8_t (
stdint.h
), czyli jednobajtową liczbę bez znaku o zakresie od 0 do 255. Typy całkowite o precyzyjnie określonym rozmiarze (np.int32_t
) są bardziej przenośne niż typy bazowe, takie jak np. int, który może mieć różne zakresy poprawnych wartości na różnych platformach.W programie występuje obsługa sygnałów, więc konieczna jest ochrona przed przerwaniami sygnałem, np. za pomocą
TEMP_FAILURE_RETRY
. Makra te zostały dodane w całym kodzie, chociaż zagrożony przerwaniem jest tylko kod procesu rodzica, bo tylko on otrzymuje notyfikacje o stanie kolejki. Dodawanie zabezpieczeń przed przerwaniem funkcji przez obsługę sygnału nie spowalnia kodu, a czyni go bardziej przenośnym.W kodzie procesu rodzica brak jest ochrony przed przerwaniem w jednym z wywołań, w którym? Dlaczego tam ochrona taka nie jest konieczna?
Odpowiedź:
Chodzi o wywołaniemq_receive
w funkcji obsługi sygnału. Nie spodziewamy się przerwania funkcji obsługi sygnału obsługiwanym sygnałem, gdyż domyślnie na czas tej obsługi sygnał ten jest blokowany.Jak zrealizowane jest zliczanie procesów potomnych?
Odpowiedź:
Zliczane są skuteczne wywołaniawaitpid
w funkcji obsługiSIGCHLD
, nie sygnałySIGCHLD
, ponieważ te mogą się sklejać. Licznik aktywnych procesów potomnych jest zmienną globalną.Czemu podczas odbierania sygnału najpierw instalujemy/restartujemy notyfikację, a dopiero później czytamy z kolejki?
Odpowiedź:
Gdyby było odwrotnie, to po przeczytaniu wiadomości z kolejki (możliwe, że kilku) a tuż przed założeniem notyfikacji mogłoby się coś w kolejce pojawić. Ponieważ powiadomienia są wysyłane tylko, gdy w pustej kolejce pojawi się coś nowego, nie mielibyśmy szansy na odczyt, bo nasza kolejka w momencie instalowania notyfikacji już coś zawierałaby. Jeśli jednak zainstalujemy notyfikację najpierw, a potem przeczytamy zawartość kolejki, mamy pewność, że kolejka jest pusta z włączoną notyfikacją. Jeśli teraz coś się w niej znajdzie, to na pewno dostaniemy notyfikację.Czemu jedna z kolejek jest w trybie nieblokującym?
Odpowiedź:
Kolejkapin
jest w stanie nieblokującym ze względu na powyższy punkt. Gdyby mogła się blokować, to po odczytaniu ostatniej wiadomości program zablokowałby się wewnątrz asynchronicznej obsługi sygnału, co byłoby poważnym błędem.Jak są wykorzystane priorytety wiadomości i jak to się ma do limitu 1 bajta dla długości wiadomości?
Odpowiedź:
W tym programie priorytety nie służą do ustalania kolejności, ale jako wyznaczniki typu informacji. Zmusza nas do tego krótki, jednobajtowy rozmiar wiadomości. Trudno (choć się da) zakodować w nim więcej niż samą liczbę. Informacja, czy chodzi o wygraną czy zakończenie gry, jest zawarta właśnie w priorytecie.Spora część logiki programu “wylądowała” w funkcji obsługi sygnału, co było możliwe, ponieważ nie ma zależności między kodem wysyłającym liczby a tym odbierającym notyfikacje. Jednak zadanie łatwo można skomplikować, aby taka zależność istniała. Jako ćwiczenie przenieś całą logikę związaną z odbiorem wiadomości do kodu właściwego rodzica (czyli poza asynchroniczne wywołanie funkcji obsługi sygnału).
Zadanie 2: powiadamianie wątkiem #
Napisz program, który symuluje rozmowy na rzymskim forum.
Zdefiniuj stałą CHILD_COUNT
.
Proces rodzic tworzy CHILD_COUNT
kolejek, otwiera je w trybie nieblokującym i uruchamia CHILD_COUNT
dzieci.
Każdemu dziecku przekazuje jego wygenerowane w dowolny sposób imię.
Każde dziecko to inny obywatel na forum.
Obywatele wysyłają sobie losowo wiadomości z ich imionami, a otrzymując je wypisują treść poprzedzoną swoim imieniem.
Po każdym wysłaniu śpią losową ilość czasu, a po wysłaniu ROUNDS
(stała) wiadomości kończą pracę.
Każdy obywatel czeka na wiadomości poprzez powiadamianie wątkiem.
Rozwiązanie zadania #
#include <errno.h>
#include <fcntl.h>
#include <mqueue.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/wait.h>
#include <unistd.h>
#define CHILD_COUNT 4
#define QUEUE_NAME_MAX_LEN 32
#define CHILD_NAME_MAX_LEN 32
#define MSG_SIZE 64
#define MAX_MSG_COUNT 4
#define ROUNDS 5
#define ERR(source) \
(fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), perror(source), kill(0, SIGKILL), exit(EXIT_FAILURE))
mqd_t create_queue(char* name)
{
struct mq_attr attr = {};
attr.mq_maxmsg = MAX_MSG_COUNT;
attr.mq_msgsize = MSG_SIZE;
mqd_t res = mq_open(name, O_RDWR | O_NONBLOCK | O_CREAT, 0600, &attr);
if (res == -1)
ERR("mq_open");
return res;
}
typedef struct
{
char* name;
mqd_t queue;
} ChildData;
void handle_messages(union sigval data);
void register_notification(ChildData* data)
{
struct sigevent notification = {};
notification.sigev_value.sival_ptr = data;
notification.sigev_notify = SIGEV_THREAD;
notification.sigev_notify_function = handle_messages;
int res = mq_notify(data->queue, ¬ification);
if (res == -1)
ERR("mq_notify");
}
void handle_messages(union sigval data)
{
ChildData* child_data = data.sival_ptr;
char message[MSG_SIZE];
register_notification(child_data);
for (;;)
{
int res = mq_receive(child_data->queue, message, MSG_SIZE, NULL);
if (res != -1)
printf("%s: Accepi \"%s\"\n", child_data->name, message);
else
{
if (errno == EAGAIN)
break;
else
ERR("mq_receive");
}
}
}
void close_queues(mqd_t* queues)
{
for (int i = 0; i < CHILD_COUNT; ++i)
{
mq_close(queues[i]);
}
}
void unlink_queues(char queue_names[CHILD_COUNT][CHILD_NAME_MAX_LEN])
{
for (int i = 0; i < CHILD_COUNT; ++i)
{
int result = mq_unlink(queue_names[i]);
// The case when the queue does not exist already
// is not treated as an error
if (result == -1 && errno != ENOENT)
ERR("mq_close");
}
}
void send_message(mqd_t queue, char* content)
{
int result = mq_send(queue, content, MSG_SIZE, 0);
if (result == -1)
ERR("mq_send");
}
void child_function(char* name, mqd_t* queues, int i)
{
printf("%s: A PID %d incipiens.\n", name, getpid());
srand(getpid());
ChildData child_data = {};
child_data.name = name;
child_data.queue = queues[i];
union sigval data;
data.sival_ptr = &child_data;
handle_messages(data);
for (int i = 0; i < ROUNDS; ++i)
{
int receiver = rand() % CHILD_COUNT;
char message[MSG_SIZE];
switch (rand() % 3)
{
case 0:
snprintf(message, MSG_SIZE, "%s: Salve %d!", name, receiver);
break;
case 1:
snprintf(message, MSG_SIZE, "%s: Visne garum emere, %d?", name, receiver);
break;
case 2:
snprintf(message, MSG_SIZE, "%s: Fuistine hodie in thermis, %d?", name, receiver);
break;
}
send_message(queues[receiver], message);
int sleep_time = rand() % 3 + 1;
sleep(sleep_time);
}
printf("%s: Disceo.\n", name);
exit(EXIT_SUCCESS);
}
void spawn_child(char* name, mqd_t* queues, int i)
{
switch (fork())
{
case 0:
child_function(name, queues, i);
case -1:
ERR("fork");
}
}
int main(void)
{
mqd_t queues[CHILD_COUNT];
char names[CHILD_COUNT][CHILD_NAME_MAX_LEN];
char queue_names[CHILD_COUNT][CHILD_NAME_MAX_LEN];
for (int i = 0; i < CHILD_COUNT; ++i)
{
snprintf(queue_names[i], QUEUE_NAME_MAX_LEN, "/child_%d", i);
queues[i] = create_queue(queue_names[i]);
snprintf(names[i], CHILD_NAME_MAX_LEN, "Persona %d", i);
}
for (int i = 0; i < CHILD_COUNT; ++i)
{
spawn_child(names[i], queues, i);
}
close_queues(queues);
while (wait(NULL) > 0)
;
printf("Parens: Disceo.");
unlink_queues(queue_names);
return EXIT_SUCCESS;
}
Uwagi i pytania #
Dlaczego wywołując
mq_receive
przerywamy pętlę przy błędzieEAGAIN
?Odpowiedź:
Błąd ten oznacza, że w kolejce nie ma już niczego więcej do przeczytania.Dlaczego w
child_function
wywołujemy funkcjęhandle_messages
, a nie tylkoregister_notification
?Odpowiedź:
Bo jeśli w kolejce już są jakieś wiadomości od innych dzieci, to powiadomienie nigdy by nie zostało wysłane. Musimy więc przeczytać wiadomości w głównym wątku.Proces rodzica zamyka wszystkie deskryptory kolejek zaraz po utworzeniu dzieci. Dlaczego nie stanowi to dla nich problemu?
Odpowiedź:
Ponieważ dziecko dziedziczy kopię danych w rodzicu, w tym kopię deskryptorów. Oznacza to, że jeśli rodzic zamknie swoje deskryptory, to deskryptory dzieci nadal pozostają otwarte.Dlaczego procesy dzieci nie zamykają kolejek na sam koniec?
Odpowiedź:
Kiedy zamykamy kolejki, to usuwane są powiadomienia, więc nie wystartują nowe wątki obsługi powiadomień. Co jednak, gdybyśmy zamknęli deskryptory kolejek w trakcie działania wątku powiadamiającego? Próbowałby on wtedy wywołaćmq_notify
lubmq_receive
na niepoprawnych deskryptorach. Kolejki muszą więc być otwarte do samego końca działania programu. Deskryptory zostaną zamknięte przez jądro systemu przy wyjściu z programu, więc nie jest to stricte błąd.Czy nie wystarczyłoby zaczekać, aż wątek obsługi sygnału zakończy pracę, zanim zamknęlibyśmy kolejki?
Odpowiedź:
Nie możemy czekać na ten wątek. Nie mamy jego TID. Nawet gdyby jakimś kanałem komunikacji wątek ten przekazywałby nam TID, to przy zamykaniu kolejek nie wiemy, czy nadal on istnieje. Poza tym, implementacja może ten wątek tworzyć w staniedetached
. Na taki wątek nie możemy już nigdy czekać.Czy w powyższej sytuacji nie możemy użyć jakiejś struktury synchronizacyjnej, aby uniknąć takiego impasu?
Odpowiedź:
Niestety nie możemy. Gdybyśmy tak zrobili, to problem byłby z samą strukturą. Należałoby ją na koniec działania programu zniszczyć, ale co jeśli w tym czasie działa jeszcze wątek obsługi powiadomień? Będzie wtedy próbował użyć zniszczonej struktury. W tym konkretnym przypadku nie ma idealnego rozwiązania. Zamykanie kolejki oznacza, że program już się kończy - gdyby więc na przykład wątki miały zapisywać jakieś dane do pliku, byłoby warto dodać synchronizację (np. zmienną warunkową zliczającą uruchomione wątki), żeby upewnić się, że wszystkie już rozpoczętę wątki skończą pracę i nie zostawią pliku w niepoprawnym stanie. Jednak w tym konkretnym przypadku nie ma to znaczenia.Czy wywołanie
exit(EXIT_SUCCESS)
w procesie dziecka możemy przenieść z funkcjichild_function
dospawn_child
zaraz za wywołaniechild_function
?Odpowiedź:
Nie, z tego samego powodu, dlaczego nie zamykamy kolejek. Gdybyśmy tak zrobili, to zmiennachild_data
straciłaby ważność po wyjściu z funkcjichild_function
. Ale używa jej wątek, który być może jeszcze działa. Jeśliexit(EXIT_SUCCESS)
pozostaje wchild_function
, to nie ma tego problemu.
Przykładowe zadania #
Wykonaj przykładowe zadania. Podczas laboratorium będziesz miał więcej czasu oraz dostępny startowy kod, jeśli jednak wykonasz poniższe zadania w przewidzianym czasie, to znaczy że jesteś dobrze przygotowany do zajęć.