Tutorial 3 - Wątki, mutexy i sygnały #
Uwagi wstępne:
- Szybkie przejrzenie tutoriala prawdopodobnie nic nie pomoże, należy samodzielnie uruchomić programy, sprawdzić jak działają, poczytać materiały dodatkowe takie jak strony man. W trakcie czytania sugeruję wykonywać ćwiczenia a na koniec przykładowe zadanie.
- Na żółtych polach podaję dodatkowe informacje, niebieskie zawierają pytania i ćwiczenia. Pod pytaniami znajdują się odpowiedzi, które staną się widoczne dopiero po kliknięciu. Proszę najpierw spróbować sobie odpowiedzieć na pytanie samemu a dopiero potem sprawdzać odpowiedź.
- Pełne kody do zajęć znajdują się w załącznikach na dole strony. W tekście są tylko te linie kodu, które są konieczne do zrozumienia problemu.
- Materiały i ćwiczenia są ułożone w pewną logiczną całość, czasem do wykonania ćwiczenia konieczny jest stan osiągnięty poprzednim ćwiczeniem dlatego zalecam wykonywanie ćwiczeń w miarę przyswajania materiału.
- Większość ćwiczeń wymaga użycia konsoli poleceń, zazwyczaj zakładam, ze pracujemy w jednym i tym samym katalogu roboczym więc wszystkie potrzebne pliki są “pod ręką” tzn. nie ma potrzeby podawania ścieżek dostępu.
- To co ćwiczymy wróci podczas kolejnych zajęć. Jeśli po zajęciach i teście coś nadal pozostaje niejasne proszę to poćwiczyć a jeśli trzeba dopytać się u prowadzących.
- Ten tutorial opiera się na przykładach od naszego studenta, drobne ich słabości występują jedynie w częściach nie związanych z wątkami i zostawiłem je celowo. Są one omawiane w komentarzach i zadaniach. To co się powtarza to:
- Funkcje main są za długie, można je łatwo podzielić
- Nadużywany jest typ bool, wymaga to nawet specjalnego pliku nagłówkowego, prościej i bardziej klasycznie można użyć 0 i 1
- Pojawiają się “magic numbers” - stałe liczbowe zamiast zdefiniowanych makr
Tworzenie wątków #
Wątek tworzymy używając funkcji pthread_create:
int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void*),
void *restrict arg);
W przeciwieństwie do funkcji fork, wartość zwracana przez pthread_create niesie informacje jedynie o poprawnym wykonaniu funkcji lub błędzie. Identyfikator nowego wątku zostaje zapisany do pierwszego argumentu.
Drugi argument pozwala wyspecyfikować atrybuty wątku - zobacz man 3p pthread_attr_destroy it man -k pthread_attr_set. Przekazując NULL ustawimy domyślne wartości artybutów.
Trzecim argumentem pthread_create jest wskaźnik na funkcję o sygnaturze:
void *function(void *args);
która będzie wykonywana przez nowoutworzony wątek.
Ostatni argument służy do przekazania argumentów do funkcji. Jest typu void*, co w C oznacza wskaźnik o nieustalonym typie - na cokolwiek. Żeby go użyć musimy zrzutować go właściwy typ.
man 3p pthread_create
Czekanie na zakończenie wątków #
Podobnie jak przy procesach, najczęściej chcemy, żeby główny wątek poczekał na zakończeniu nowoutworzonych wątków. Służy do tego funkcja pthread_join:
int pthread_join(pthread_t thread, void **value_ptr);
działająca podobnie do wait przy procesach. Jej wywołanie blokuje wykonanie programu do czasu aż dany wątek się zakończy.
Podobnie jak przy pthread_create wartość zwracania służy tu tylko da sygnalizowania błędów. Wartość zwracana przez wątek (czyli przez funkcję, którą wykonuje) zostanie zapisana do drugie argumentu pthread_join.
W przeciwieństwie do funkcji wait nie mamy tu możliwości czekania na dowolny wątek - zawsze musimy podać konkretny identyfikator wątku, na który czekamy.
man 3p pthread_join
Zadanie #
Cel: Napisać program przybliżający wartość PI metodą Monte Carlo. Program przyjmuje dwa parametry:
- k … liczbę wątków użytych do estymacji,
- n … liczbę losowanych wartości przez każdy wątek.
Każdy wątek (za wyjątkiem głównego) ma przeprowadzać własną estymację. W momencie kiedy wszystkie wątki zakończą obliczenia, wątek główny wypisuje obliczoną wartość jako średnią z wszystkich symulacji.
Co student musi wiedzieć:
- man 7 pthreads
- man 3p pthread_create
- man 3p pthread_join
- man 3p rand (informacja o rand_r)
- metoda Monte-Carlo, w paragrafie “Monte Carlo methods” na stronie.
Rozwiązanie #
Makefile
CC=gcc
CFLAGS=-std=gnu99 -Wall -fsanitize=address,undefined
LDFLAGS=-fsanitize=address,undefined
LDLIBS=-lpthread
Flaga -lpthread będzie nam potrzebna w całym tutorialu, biblioteka nazywa się libpthread.so (po -l podajemy nazwę z
odciętym początkowym lib)
prog17.c:
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))
typedef struct targs
{
pthread_t tid;
unsigned int seed;
int samples_count;
} targs_t;
void* pi_estimation(void* args);
void usage(char* pname)
{
fprintf(stderr, "Usage: %s [num_threads > 0] [num_samples > 0]\n", pname);
exit(EXIT_FAILURE);
}
void create_threads(int thread_count, int samples_count, targs_t** estimations)
{
*estimations = malloc(sizeof(targs_t) * thread_count);
if (estimations == NULL)
ERR("malloc");
targs_t* targs = *estimations;
srand(time(NULL));
for (int i = 0; i < thread_count; i++)
{
targs[i].seed = rand();
targs[i].samples_count = samples_count;
if (pthread_create(&(targs[i].tid), NULL, pi_estimation, &targs[i]) != 0)
ERR("pthread_create");
}
}
int main(int argc, char** argv)
{
if (argc != 3)
usage(argv[0]);
int thread_count = atoi(argv[1]);
int samples_count = atoi(argv[2]);
if (thread_count <= 0 || samples_count <= 0)
usage(argv[0]);
targs_t* estimations;
create_threads(thread_count, samples_count, &estimations);
int* subresult = NULL;
int cumulative_result = 0;
for (int i = 0; i < thread_count; i++)
{
if (pthread_join(estimations[i].tid, (void**)&subresult) != 0)
ERR("pthread_join");
if (NULL != subresult)
{
cumulative_result += *subresult;
free(subresult);
}
}
const double sum_avg = (double)cumulative_result / (double)thread_count;
const double pi_estimate = (2.0 * (double)samples_count) / (sum_avg * sum_avg);
printf("Estimated value of PI is %f\n", pi_estimate);
free(estimations);
}
void* pi_estimation(void* void_ptr)
{
targs_t* args = void_ptr;
int* result = malloc(sizeof(int));
if (result == NULL)
ERR("malloc");
int inner_count = 0;
for (int i = 0; i < args->samples_count; i++)
{
inner_count += (rand_r(&args->seed) % 2) ? 1 : -1;
}
*result = abs(inner_count);
return result;
}
Uwagi i pytania #
Deklaracje zapowiadające (nie definicje) funkcji mogą być bardzo użyteczne, czasem są wręcz niezbędne, jeśli nie wiesz o czym mowa poczytaj tutaj.
W programach wielowątkowych nie można poprawnie używać funkcji rand, zamiast tego używamy rand_r, jednak ta funkcja
wymaga indywidualnego ziarna dla każdego z wątków.
W tym programie wykorzystanie wątków jest bardzo proste, program główny czeka zaraz po utworzeniu na ich zakończenie, możliwe są dużo bardziej skomplikowane scenariusze.
Pamiętaj, że niemal każde wywołanie funkcji systemowej (i wielu funkcji bibliotecznych) wymaga sprawdzenia czy nie wystąpił błąd i odpowiedniej reakcji jeśli wystąpił.
Makro ERR nie wysyła sygnału jak w programie wieloprocesowym, czemu?Odpowiedź
Jak dane są przekazywane do nowo tworzonych wątków?Odpowiedź
Czy dane na których pracują wątki są współdzielone pomiędzy nimi?Odpowiedź
Skąd wątki biorą “ziarno” do swoich wywołań rand_r?Odpowiedź
Czemu w kodzie używamy srand/rand czy to nie przeczy uwadze podanej kilka punktów powyżej?Odpowiedź
Czy moglibyśmy mieć jedną strukturę z parametrami startowymi programu? Czemu?Odpowiedź
Czy tablica z strukturami startowymi mogłaby być zmienna automatyczną a nie dynamiczną ?Odpowiedź
Czemu służy zwalnianie pamięci danych zwróconych przez wątek?Odpowiedź
Czy można zwrócić z wątku adres lokalnej (dla wątku) zmiennej automatycznej z wynikiem?Odpowiedź
Czy można jakoś uniknąć dodatkowej alokacji w funkcji wątku?Odpowiedź
Wątki typu detached i synchronizacja mutexem #
Co do zasady przy zwykłych wątkach musimy wywołać pthread_join. Wyjątek stanowią wątki typu “detached” (ang. detached thread) - czyli wątki “odłączone” od głownego. Taki typ wątków może być użyteczny, ma jednak dwie wady - skoro nie możemy na nim użyć pthread_join nie mamy jak poczekać na jego zakończenie oraz jak przekazać z niego dane do wątku głównego.
Wątek “detached” możemy uzyskać przez podanie odpowiednich atrybutów przy pthread_create albo poprzez “odłączenie” już uruchomionego wątku.
Pierwsza metoda polega na inicjalizacji obiektu typu pthread_attr_t używając pthread_attr_init. Następnie używając funkcji pthread_attr_setdetachstate ustawiamy PTHREAD_CREATE_DETACHED. Potem tworzymy wątek funkcją pthread_create przekazując utworzone atrybuty jak parametr. Należe pamiętać, żeby po użyciu usunąć stworzone atrybuty funkcją pthread_attr_destroy.
man 3p pthread_attr_destroy
man 3p pthread_attr_getdetachstate
man 3p pthread_create
Druga metoda polega na wywołaniu funkcji pthread_detach podając identyfikator wątku. Tym sposobe wątek może też sam się “odłączyć” - uzyskując swój identyfikator dzięki pthread_self. Próba “odłączenia” już “odłączonego” wątku to UB.
man 3p pthread_detach
man 3p pthread_self
Mutex #
Żeby z wątków tego typu dało się zwrócić dane potrzebujemy zmiennych współdzielonych między wątkami. Jednak sytuacja, w której kilka wątków na raz zmienia współdzieloną zmienną prowadzi do błędów, jeśli nie zapewnimy odpowiedniej synchronizacji.
Podstawowym sposobem na zapewninie tejże jest użycie mutexu, czyli obiektu pozwalającemu tylko jednemu wątkowi na raz wejść do danej sekcji krytycznej (więcej o tym w wykładzie).
Mutex inicjalizujemy funkcją pthread_mutex_init, a gdy skończymy go używać niszczymy pthread_mutex_destroy.
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
Podobnie jak przy pthread_create możemy tu podać dodatkowe atrybuty - dla uzyskania domyślnego zachowaniu po prostu przekazujemy NULL
Mając utworzony mutex wątek może go zająć używając funkcji pthread_mutex_lock. Ta funkcja blokuje wykonanie programu do czasu, aż mutex nie będzie wolny - po czym zajmuje go dla jednego z czekających wątków. Funkcja ta może zakończyć się błędem tylko w bardzo specyficznych sytuacjach (więcej o nich na SOP2), dlatego w programach niżej nie będziemy tego sprawdzać i nie jest to wymagane na laboratorium.
Po zajęciu mutexu i wykonaniu niezbędnych operacji w sekcji krytycznej wątek powinien zwolnić mutex funkcją pthread_mutex_unlock. Warto pamiętać o tym, żeby w programie sekcja krytyczna była możliwie krótka - jej wydłużenie sprawie, że pozostałe wątki muszą dłużej czekać.
Z mutexami trzeba bardzo uważać. Łatwo o zakleszczenie, np. gdy dany wątek spróbuje zająć dany mutex dwa razy pod rząd (bez zwolnienia go pomiędzy). Dodatkowo mutex może być odblokowany tylko przez ten sam wątek, który go zablokował - próba odblokowania z innego wątku to UB.
Uwaga
man 3p pthread_mutex_destroy
man 3p pthread_mutexattr_destroy
man -k pthread_mutexattr_set
man 3p pthread_mutex_lock
Zadanie #
Cel: Napisać program symulujący wizualizację rozkładu dwumianowego za pomocą deski Galtona (Galton Board) z 11 pojemnikami na kulki. Program ma przyjmować dwa parametry:
- k … liczbę wątków zrzucających kulki,
- n … całkowitą liczbę kulek do zrzucenia (w sumie).
Każdy wątek ma zrzucać kulki pojedynczo i po każdym rzucie aktualizuje licznik kulek dla odpowiedniego pojemnika. Główny wątek co sekundę sprawdza czy symulacja została zakończona (nie korzystamy z funkcji pthread_join). Po zrzuceniu wszystkich kulek główny wątek ma wypisać ilości kulek w pojemnikach, całkowitą liczbę kulek i otrzymaną “wartość oczekiwaną” przy dowolnym numerowaniu pojemników.
Co student musi wiedzieć:
- man 3p pthread_mutex_destroy (cały opis)
- man 3p pthread_mutex_lock
- man 3p pthread_mutex_unlock
- man 3p pthread_detach
- man 3p pthread_attr_init
- man 3p pthread_attr_destroy
- man 3p pthread_attr_setdetachstate
- man 3p pthread_attr_getdetachstate
- deska Galtona na stronie.
prog18.c:
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#define BIN_COUNT 11
#define NEXT_DOUBLE(seedptr) ((double)rand_r(seedptr) / (double)RAND_MAX)
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))
typedef struct shared_state
{
int balls_waiting;
int bins[BIN_COUNT];
int balls_thrown;
pthread_mutex_t bins_mtx[BIN_COUNT];
pthread_mutex_t balls_waiting_mtx;
pthread_mutex_t balls_thrown_mtx;
} shared_state_t;
typedef struct thrower_args
{
pthread_t tid;
unsigned int seed;
shared_state_t* shared;
} thrower_args_t;
void make_throwers(thrower_args_t** args_ptr, int num_throwers, shared_state_t* shared);
void* throwing_func(void* args);
int throw_ball(unsigned int* seedptr);
void usage(char* pname)
{
fprintf(stderr, "Usage: %s [num_balls>0] [num_throwers>0]", pname);
exit(EXIT_FAILURE);
}
int main(int argc, char** argv)
{
int balls_count, throwers_count;
if (argc != 3)
usage(argv[0]);
balls_count = atoi(argv[1]);
throwers_count = atoi(argv[2]);
if (balls_count <= 0 || throwers_count <= 0)
usage(argv[0]);
shared_state_t shared = {
.balls_waiting = balls_count,
.balls_thrown = 0,
.balls_waiting_mtx = PTHREAD_MUTEX_INITIALIZER,
.balls_thrown_mtx = PTHREAD_MUTEX_INITIALIZER,
.bins = {0},
};
int balls_thrown = shared.balls_thrown;
for (int i = 0; i < BIN_COUNT; i++)
{
if (pthread_mutex_init(&shared.bins_mtx[i], NULL))
ERR("pthread_mutex_init");
}
thrower_args_t* args;
make_throwers(&args, throwers_count, &shared);
while (balls_thrown < balls_count)
{
sleep(1);
pthread_mutex_lock(&shared.balls_thrown_mtx);
balls_thrown = shared.balls_thrown;
pthread_mutex_unlock(&shared.balls_thrown_mtx);
}
int final_balls_count = 0;
double mean_val = 0.0;
for (int i = 0; i < BIN_COUNT; i++)
{
final_balls_count += shared.bins[i];
mean_val += shared.bins[i] * i;
}
mean_val = mean_val / final_balls_count;
printf("Bins count:\n");
for (int i = 0; i < BIN_COUNT; i++)
printf("%d\t", shared.bins[i]);
printf("\nTotal balls count : %d\nMean value: %f\n", final_balls_count, mean_val);
// for (int i = 0; i < BIN_COUNT; i++) pthread_mutex_destroy(&mxBins[i]);
// free(args);
// The resources used by detached threads cannod be freed as we are not sure
// if they are running yet.
exit(EXIT_SUCCESS);
}
void make_throwers(thrower_args_t** args_ptr, int throwers_count, shared_state_t* shared)
{
*args_ptr = malloc(sizeof(thrower_args_t) * throwers_count);
thrower_args_t* args = *args_ptr;
if (args == NULL)
ERR("malloc");
pthread_attr_t attr;
if (pthread_attr_init(&attr))
ERR("pthread_attr_init");
if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED))
ERR("pthread_attr_setdetachstate");
srand(time(NULL));
for (int i = 0; i < throwers_count; i++)
{
args[i].seed = rand();
args[i].shared = shared;
if (pthread_create(&args[i].tid, &attr, throwing_func, &args[i]))
ERR("pthread_create");
}
pthread_attr_destroy(&attr);
}
void* throwing_func(void* void_args)
{
thrower_args_t* args = void_args;
while (1)
{
pthread_mutex_lock(&args->shared->balls_waiting_mtx);
if (args->shared->balls_waiting > 0)
{
args->shared->balls_waiting--;
pthread_mutex_unlock(&args->shared->balls_waiting_mtx);
}
else
{
pthread_mutex_unlock(&args->shared->balls_waiting_mtx);
break;
}
int binno = throw_ball(&args->seed);
pthread_mutex_lock(&args->shared->bins_mtx[binno]);
args->shared->bins[binno]++;
pthread_mutex_unlock(&args->shared->bins_mtx[binno]);
pthread_mutex_lock(&args->shared->balls_thrown_mtx);
args->shared->balls_thrown++;
pthread_mutex_unlock(&args->shared->balls_thrown_mtx);
}
return NULL;
}
/* returns # of bin where ball has landed */
int throw_ball(unsigned int* seedptr)
{
int result = 0;
for (int i = 0; i < BIN_COUNT - 1; i++)
if (NEXT_DOUBLE(seedptr) > 0.5)
result++;
return result;
}
Uwagi i pytania #
Wszystkie dane konieczne do pracy wątku są przekazywane przez strukturę thrower_args_t, wynik jego pracy to modyfikacja w
tablicy bins, znowu nic nie jest przekazywane przez zmienne globalne.
W programie mamy dwa muteksy chroniące dostęp do liczników oraz całą tablice muteksów chroniących dostęp do komórek tablicy bins (po jednym na komórkę). Zatem muteksów jest BIN_COUNT+2.
W programie tworzymy wątki typu “detachable” czyli nie ma potrzeby (ani możliwości) czekać na ich zakończenie, stąd brak pthread_join, za to nie wiemy kiedy wątki robocze skończą działanie, musimy do tego stworzyć własny test.
W programie muteksy są inicjowane zarówno automatycznie jak i dynamicznie, jako zmienne automatyczne muteksy są prostsze w tworzeniu ale trzeba z góry znać ilość muteksów. Muteksy jako zmienne dynamicznie nie mają tego ograniczenia ale trzeba inicjować i usuwać mutex w kodzie.
Czy dane przekazane przez thrower_args_t są współdzielone pomiędzy wątki?Odpowiedź
Czemu używamy wskaźnika do przekazania shared_state_t?Odpowiedź
Ten program używa dużo muteksów, czy może być ich mniej?Odpowiedź
Metoda czekania na zakończenie się wątków liczących polega na okresowym sprawdzaniu czy licznik kul zrzuconych jest równy sumie kul w programie, czy jest to optymalne rozwiązanie?Odpowiedź
Czy wszystkie wątki w tym programie na prawdę pracują?Odpowiedź
Wątki i sygnały #
Zadanie 3 - wątki i sygnały z czekaniem na sygnał za pomocą funkcji sigwait #
Cel: Napisać program, który przyjmuje jeden parametr ‘k’ i co sekundę wyświetla listę liczb, początkowo od 1 do k. Program ma obsługiwać dwa sygnały za pomocą oddzielnego wątku. Wątek ten po otrzymaniu sygnału podejmuje następujące akcje:
- SIGINT (C-c) … usuwa losową liczbę z listy (jeżeli lista jest pusta to nic nie robi),
- SIGQUIT (C-\) … ustawia flagę ‘STOP’ i kończy swoje działanie.
Wątek główny ma za zadanie co sekundę wyświetlić listę lub, jeżeli ustawiona jest flaga ‘STOP’, poprawnie zakończyć swoje działanie.
Co student musi wiedzieć:
- man 3p pthread_sigmask
- man 3p sigwait
prog19.c:
#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define MAXLINE 4096
#define DEFAULT_ARRAYSIZE 10
#define DELETED_ITEM -1
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))
typedef struct argsSignalHandler
{
pthread_t tid;
int *pArrayCount;
int *array;
pthread_mutex_t *pmxArray;
sigset_t *pMask;
bool *pQuitFlag;
pthread_mutex_t *pmxQuitFlag;
} argsSignalHandler_t;
void ReadArguments(int argc, char **argv, int *arraySize);
void removeItem(int *array, int *arrayCount, int index);
void printArray(int *array, int arraySize);
void *signal_handling(void *);
int main(int argc, char **argv)
{
int arraySize, *array;
bool quitFlag = false;
pthread_mutex_t mxQuitFlag = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mxArray = PTHREAD_MUTEX_INITIALIZER;
ReadArguments(argc, argv, &arraySize);
int arrayCount = arraySize;
if (NULL == (array = (int *)malloc(sizeof(int) * arraySize)))
ERR("Malloc error for array!");
for (int i = 0; i < arraySize; i++)
array[i] = i + 1;
sigset_t oldMask, newMask;
sigemptyset(&newMask);
sigaddset(&newMask, SIGINT);
sigaddset(&newMask, SIGQUIT);
if (pthread_sigmask(SIG_BLOCK, &newMask, &oldMask))
ERR("SIG_BLOCK error");
argsSignalHandler_t args;
args.pArrayCount = &arrayCount;
args.array = array;
args.pmxArray = &mxArray;
args.pMask = &newMask;
args.pQuitFlag = &quitFlag;
args.pmxQuitFlag = &mxQuitFlag;
if (pthread_create(&args.tid, NULL, signal_handling, &args))
ERR("Couldn't create signal handling thread!");
while (true)
{
pthread_mutex_lock(&mxQuitFlag);
if (quitFlag == true)
{
pthread_mutex_unlock(&mxQuitFlag);
break;
}
else
{
pthread_mutex_unlock(&mxQuitFlag);
pthread_mutex_lock(&mxArray);
printArray(array, arraySize);
pthread_mutex_unlock(&mxArray);
sleep(1);
}
}
if (pthread_join(args.tid, NULL))
ERR("Can't join with 'signal handling' thread");
free(array);
if (pthread_sigmask(SIG_UNBLOCK, &newMask, &oldMask))
ERR("SIG_BLOCK error");
exit(EXIT_SUCCESS);
}
void ReadArguments(int argc, char **argv, int *arraySize)
{
*arraySize = DEFAULT_ARRAYSIZE;
if (argc >= 2)
{
*arraySize = atoi(argv[1]);
if (*arraySize <= 0)
{
printf("Invalid value for 'array size'");
exit(EXIT_FAILURE);
}
}
}
void removeItem(int *array, int *arrayCount, int index)
{
int curIndex = -1;
int i = -1;
while (curIndex != index)
{
i++;
if (array[i] != DELETED_ITEM)
curIndex++;
}
array[i] = DELETED_ITEM;
*arrayCount -= 1;
}
void printArray(int *array, int arraySize)
{
printf("[");
for (int i = 0; i < arraySize; i++)
if (array[i] != DELETED_ITEM)
printf(" %d", array[i]);
printf(" ]\n");
}
void *signal_handling(void *voidArgs)
{
argsSignalHandler_t *args = voidArgs;
int signo;
srand(time(NULL));
for (;;)
{
if (sigwait(args->pMask, &signo))
ERR("sigwait failed.");
switch (signo)
{
case SIGINT:
pthread_mutex_lock(args->pmxArray);
if (*args->pArrayCount > 0)
removeItem(args->array, args->pArrayCount, rand() % (*args->pArrayCount));
pthread_mutex_unlock(args->pmxArray);
break;
case SIGQUIT:
pthread_mutex_lock(args->pmxQuitFlag);
*args->pQuitFlag = true;
pthread_mutex_unlock(args->pmxQuitFlag);
return NULL;
default:
printf("unexpected signal %d\n", signo);
exit(1);
}
}
return NULL;
}
Uwagi i pytania #
W strukturze argumentów argsSignalHandler_t przekazujemy wskazania na dane współdzielone przez oba wątki czyli tablicę i flagę STOP oraz muteksy je chroniące. Dodatkowo maskę sygnałów i tid wątku obsługi sygnałów, które to dane nie są współdzielone.
W wątkach do ustawiania maski sygnałów (per wątek) używamy pthread_sigmask, nie powinno się, gdy istnieje więcej wątków niż jeden w programie używać f. sigprocmask.
Delegowanie oddzielnego wątku do obsługi sygnałów jest typowym i wygodnym sposobem radzenia sobie w programach wielowątkowych.
Ile wątków jest w tym programie?Odpowiedź
Wypunktuj różnicę i podobieństwa pomiędzy sigwait i sigsuspend:Odpowiedź
Po wywołaniu sigwait tylko jeden typ sygnału jest zdejmowany z wektora sygnałów oczekujących wiec problem jaki mieliśmy z obsługa wielu sygnałów w trakcie pojedynczego sigsuspend w przykładzie z L2 nie wystąpiłby jeśli zamienić użycie sigsuspend na sigwait (ćwiczenie do zrobienia)
Czy metoda czekania na potomka jest w tym programie równie słaba jak w poprzednim?Odpowiedź
Czy w tym programie można użyć sigprocmask zamiast pthread_sigmask?Odpowiedź
Czemu nie sprawdzamy błędów wywołań funkcji systemowych związanych pozyskaniem i zwolnieniem muteksówOdpowiedź
Anulowanie wątków #
Program może anulować wątek poprzez użycie funkcji pthread_cancel.
man 3p pthread_cancel
To jak wątek zareaguje na pthread_cancel zależy od jego ustawień, ustawianych funkcjami:
int pthread_setcancelstate(int state, int *oldstate);
int pthread_setcanceltype(int type, int *oldtype);
man 7 pthreads (specifically the "Cancelation Points" section)
W szczególności wątek może ignorować anulowanie poprzez ustawienie PTHREAD_CANCEL_DISABLE jak również z powrotem na nie reagować poprzez PTHREAD_CANCEL_ENABLE. Domyślnie anulowanie jest włączone.
Typ anulacji (cancelacion type) definiuje w jaki sposób wątek się zakończy po anulowaniu.
W przypadku PTHREAD_CANCEL_DEFERRED (domyślna wartość) wątek zakończy się na najbliższym “cancelation point”.
W przypadku PTHREAD_CANCEL_ASYNCHRONOUS wątek zakończy się tak szybko, jak to możliwe.
Ogólnie rzecz biorąc bezpieczniej jest używać PTHREAD_CANCEL_DEFERRED jako że anulowanie asynchroniczne może prowadzić do nieoczywistych błędów, wątek nie ma jak po sobie “posprzątać” (o czym niżej).
Cleanup handlers #
Zwykle zakończenie wątku w trakcie może prowadzić do problemów, takich jak zakleszczenia (wątek zablokował mutex i już go nie odblokuje), resource leak (nie zwolnił zasobów), itd.
Aby temu zapobiec możliwe jest zdefiniowane tzw. “funkcji czyszczących” (ang. cleanup handlers), które zostaną wywołane, gdy wątek zakończy się w niestandardowy sposób (przez anulowanie lub pthread_exit). Taka funkcja ma syngaturę:
void function(void* arg);
i może zostać przypisana do wątku poprzez wywołanie pthread_cleanup_push.
void pthread_cleanup_push(void (*routine)(void*), void *arg);
Jak widać, jest nieco podobna do pthread_create - poza samą funkcją przekazujemy też jej parametry jako void*.
Przy zakończeniu wątku wszystkie funkcje czyszczące zostaną wywołane w odwrotnej kolejności niż były dodawane.
Funkcje czyszczące mogą być też usunięte przez przez wywołanie:
void pthread_cleanup_pop(int execute);
Ciekawostką jest tutaj argument execute, który pozwala wywołać usuwaną funkcję przed usunięciem.
Na każde wywołanie pthread_cleanup_push musi przypadać odpowiadające mu pthread_cleanup_pop.
man 3p pthread_cleanup_pop
Czekanie na anulowane wątki #
Na anulowane wątki możemy czekać używając pthread_join. Jeśli wątek został anulowany wskaźnik przekazany jako value_ptr zostanie ustawiony na PTHREAD_CANCELED.
man 3p pthread_join
man 3p pthread_exit (sekcja RATIONALE)
Zadanie #
Cel: Napisać program symulujący losy studentów wydziału MiNI. Program ma następujące argumenty:
- n <= 100 … liczba nowych studentów,
Program przechowuje liczniki studentów na 1, 2, 3 roku oraz inżynierów.
Wątek główny: Inicjuje studentów, a przez następne 4 sekundy, w losowych odstępach czasu (100-300ms) wyrzuca jednego studenta (anuluje wątek). Po czterech sekundach, czeka na zakończenie działania studentów i wypisuje liczbę inżynierów oraz wyrzuconych osób.
Wątek studenta: Każdy student jest nowo-utworzonym wątkiem. Student dodaje się do licznika 1. roku, po sekundzie odejmuje się z tego licznika i dodaję do 2. roku. Analogicznie po sekundzie przechodzi do 3. roku i po jeszcze jednej sekundzie do inżynierów. Student jest zawsze przygotowany na opuszczenie uczelni.
Co student musi wiedzieć:
- man 3p pthread_cancel
- man 3 pthread_cleanup_push
- man 3 pthread_cleanup_pop
- man 7 time
- man 3p clock_getres
prog20.c
#include <errno.h>
#include <pthread.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#define MAXLINE 4096
#define DEFAULT_STUDENT_COUNT 100
#define ELAPSED(start, end) ((end).tv_sec - (start).tv_sec) + (((end).tv_nsec - (start).tv_nsec) * 1.0e-9)
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))
typedef unsigned int UINT;
typedef struct timespec timespec_t;
typedef struct studentList
{
bool *removed;
pthread_t *thStudents;
int count;
int present;
} studentsList_t;
typedef struct yearCounters
{
int values[4];
pthread_mutex_t mxCounters[4];
} yearCounters_t;
typedef struct argsModify
{
yearCounters_t *pYearCounters;
int year;
} argsModify_t;
void ReadArguments(int argc, char **argv, int *studentsCount);
void *student_life(void *);
void increment_counter(argsModify_t *args);
void decrement_counter(void *_args);
void msleep(UINT milisec);
void kick_student(studentsList_t *studentsList);
int main(int argc, char **argv)
{
int studentsCount;
ReadArguments(argc, argv, &studentsCount);
yearCounters_t counters = {.values = {0, 0, 0, 0},
.mxCounters = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER,
PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER}};
studentsList_t studentsList;
studentsList.count = studentsCount;
studentsList.present = studentsCount;
studentsList.thStudents = (pthread_t *)malloc(sizeof(pthread_t) * studentsCount);
studentsList.removed = (bool *)malloc(sizeof(bool) * studentsCount);
if (studentsList.thStudents == NULL || studentsList.removed == NULL)
ERR("Failed to allocate memory for 'students list'!");
for (int i = 0; i < studentsCount; i++)
studentsList.removed[i] = false;
for (int i = 0; i < studentsCount; i++)
if (pthread_create(&studentsList.thStudents[i], NULL, student_life, &counters))
ERR("Failed to create student thread!");
srand(time(NULL));
timespec_t start, current;
if (clock_gettime(CLOCK_REALTIME, &start))
ERR("Failed to retrieve time!");
do
{
msleep(rand() % 201 + 100);
if (clock_gettime(CLOCK_REALTIME, ¤t))
ERR("Failed to retrieve time!");
kick_student(&studentsList);
} while (ELAPSED(start, current) < 4.0);
for (int i = 0; i < studentsCount; i++)
if (pthread_join(studentsList.thStudents[i], NULL))
ERR("Failed to join with a student thread!");
printf(" First year: %d\n", counters.values[0]);
printf("Second year: %d\n", counters.values[1]);
printf(" Third year: %d\n", counters.values[2]);
printf(" Engineers: %d\n", counters.values[3]);
free(studentsList.removed);
free(studentsList.thStudents);
exit(EXIT_SUCCESS);
}
void ReadArguments(int argc, char **argv, int *studentsCount)
{
*studentsCount = DEFAULT_STUDENT_COUNT;
if (argc >= 2)
{
*studentsCount = atoi(argv[1]);
if (*studentsCount <= 0)
{
printf("Invalid value for 'studentsCount'");
exit(EXIT_FAILURE);
}
}
}
void *student_life(void *voidArgs)
{
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
argsModify_t args;
args.pYearCounters = voidArgs;
for (args.year = 0; args.year < 3; args.year++)
{
increment_counter(&args);
pthread_cleanup_push(decrement_counter, &args);
msleep(1000);
pthread_cleanup_pop(1);
}
increment_counter(&args);
return NULL;
}
void increment_counter(argsModify_t *args)
{
pthread_mutex_lock(&(args->pYearCounters->mxCounters[args->year]));
args->pYearCounters->values[args->year] += 1;
pthread_mutex_unlock(&(args->pYearCounters->mxCounters[args->year]));
}
void decrement_counter(void *_args)
{
argsModify_t *args = _args;
pthread_mutex_lock(&(args->pYearCounters->mxCounters[args->year]));
args->pYearCounters->values[args->year] -= 1;
pthread_mutex_unlock(&(args->pYearCounters->mxCounters[args->year]));
}
void msleep(UINT milisec)
{
time_t sec = (int)(milisec / 1000);
milisec = milisec - (sec * 1000);
timespec_t req = {0};
req.tv_sec = sec;
req.tv_nsec = milisec * 1000000L;
if (nanosleep(&req, &req))
ERR("nanosleep");
}
void kick_student(studentsList_t *studentsList)
{
int idx;
if (0 == studentsList->present)
return;
do
{
idx = rand() % studentsList->count;
} while (studentsList->removed[idx] == true);
pthread_cancel(studentsList->thStudents[idx]);
studentsList->removed[idx] = true;
studentsList->present--;
}
Uwagi i pytania #
Wątkom przekazuje się strukturę z aktualnym rokiem oraz z wskazaniem na 4 liczniki z muteksami, struktura argsModify_t nie powiela błędu z zadania 2 czyli nie przekazuje indywidualnych kopii zbyt wielu wskaźników.
Struktura studentsList_t jest używana jedynie przez wątek główny, nie jest “widziana” przez wątki studentów.
“Sprytna” inicjalizacja struktury yearCounters_t nie zadziała z archaicznymi standardami języka C (c89/c90). Warto wiedzieć, oczywiście używamy wszelkich dobrodziejstw nowszych standardów.
Cleanup handlery w funkcji wątku służą do bezpiecznego usunięcia studenta z licznika roku w razie anulowania w czasie spania, bez nich takie anulowanie pozostawiłoby studenta na roku do końca działania programu!
Pamiętaj, że pthread_cleanup_push musi być sparowany z pthread_cleanup_pop w obrębie tego samego bloku {}
Ile w programie występuje mutexów i co chronią?Odpowiedź
Czy aktualny rok studenta musi być częścią struktury argsModify_t?Odpowiedź
Co to znaczy, że wątek ma stan anulowania PTHREAD_CANCEL_DEFERRED ?Odpowiedź
Które z funkcji użytych w funkcji wątku są punktami anulowania wątku?Odpowiedź
Skąd wiemy, które funkcje są takimi punktami?Odpowiedź
Co oznacza jedynka w wywołaniu: pthread_cleanup_pop(1); ?Odpowiedź
Kiedy jest zmniejszany liczniku roku?Odpowiedź
W algorytmie losowania wątku do anulowania jest spory błąd logiczny, gdzie i jakie niesie zagrożenie?Odpowiedź
Jako ćwiczenie popraw sposób losowania “żyjącego” wątku do anulowania.
Zwróć uwagę na sposób odliczenia 4 sekund w losowych interwałach za pomocą clock_gettime i nanosleep, to można zmienić, dodając wywołanie f.alarm i oddając obsługę sygnału SIGALRM. Rozwiązanie takie można wykonać jako ćwiczenie.
Wykonaj przykładowe zadania. Podczas laboratorium będziesz miał więcej czasu oraz dostępny startowy kod, jeśli jednak wykonasz poniższe zadania w przewidzianym czasie, to znaczy, że jesteś dobrze przygotowany do zajęć.