L3 - Wątki, mutexy i sygnały

Tutorial 3 - Wątki, mutexy i sygnały #

Uwagi wstępne:

  • Szybkie przejrzenie tutoriala prawdopodobnie nic nie pomoże, należy samodzielnie uruchomić programy, sprawdzić jak działają, poczytać materiały dodatkowe takie jak strony man. W trakcie czytania sugeruję wykonywać ćwiczenia a na koniec przykładowe zadanie.
  • Na żółtych polach podaję dodatkowe informacje, niebieskie zawierają pytania i ćwiczenia. Pod pytaniami znajdują się odpowiedzi, które staną się widoczne dopiero po kliknięciu. Proszę najpierw spróbować sobie odpowiedzieć na pytanie samemu a dopiero potem sprawdzać odpowiedź.
  • Pełne kody do zajęć znajdują się w załącznikach na dole strony. W tekście są tylko te linie kodu, które są konieczne do zrozumienia problemu.
  • Materiały i ćwiczenia są ułożone w pewną logiczną całość, czasem do wykonania ćwiczenia konieczny jest stan osiągnięty poprzednim ćwiczeniem dlatego zalecam wykonywanie ćwiczeń w miarę przyswajania materiału.
  • Większość ćwiczeń wymaga użycia konsoli poleceń, zazwyczaj zakładam, ze pracujemy w jednym i tym samym katalogu roboczym więc wszystkie potrzebne pliki są “pod ręką” tzn. nie ma potrzeby podawania ścieżek dostępu.
  • To co ćwiczymy wróci podczas kolejnych zajęć. Jeśli po zajęciach i teście coś nadal pozostaje niejasne proszę to poćwiczyć a jeśli trzeba dopytać się u prowadzących.
  • Ten tutorial opiera się na przykładach od naszego studenta, drobne ich słabości występują jedynie w częściach nie związanych z wątkami i zostawiłem je celowo. Są one omawiane w komentarzach i zadaniach. To co się powtarza to:
    • Funkcje main są za długie, można je łatwo podzielić
    • Nadużywany jest typ bool, wymaga to nawet specjalnego pliku nagłówkowego, prościej i bardziej klasycznie można użyć 0 i 1
    • Pojawiają się “magic numbers” - stałe liczbowe zamiast zdefiniowanych makr

Ten tutorial został przygotowany w oparciu o pomysły na zadania i ich rozwiązania przygotowane przez studenta Mariusza Kowalskiego.

Zadanie 1 - proste wątki typu joinable, synchronizacja, zwracanie wyników przez wątek #

Cel: Napisać program przybliżający wartość PI metodą Monte Carlo. Program przyjmuje dwa parametry:

  • k … liczbę wątków użytych do estymacji,
  • n … liczbę losowanych wartości przez każdy wątek.

Każdy wątek (za wyjątkiem głównego) ma przeprowadzać własną estymację. W momencie kiedy wszystkie wątki zakończą obliczenia, wątek główny wypisuje obliczoną wartość jako średnią z wszystkich symulacji.

Co student musi wiedzieć:

  • man 7 pthreads
  • man 3p pthread_create
  • man 3p pthread_join
  • man 3p rand (informacja o rand_r)
  • metoda Monte-Carlo, w paragrafie “Monte Carlo methods” na stronie.

rozwiązanie Makefile:

CC=gcc
CFLAGS=-std=gnu99 -Wall -fsanitize=address,undefined
LDFLAGS=-fsanitize=address,undefined
LDLIBS=-lpthread -lm

Flaga -lpthread będzie nam potrzebna w całym tutorialu, biblioteka nazywa się libpthread.so (po -l podajemy nazwę z odciętym początkowym lib)

Flaga -lm włącza bibliotekę math plik biblioteki to lm.so a nie libmath.so jak moglibyśmy zgadywać

rozwiązanie, cały prog17.c:

#include <math.h>
#include <pthread.h>
#include <stdarg.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define MAXLINE 4096
#define DEFAULT_THREADCOUNT 10
#define DEFAULT_SAMPLESIZE 100

#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))

typedef unsigned int UINT;
typedef struct argsEstimation
{
    pthread_t tid;
    UINT seed;
    int samplesCount;
} argsEstimation_t;

void ReadArguments(int argc, char **argv, int *threadCount, int *samplesCount);
void *pi_estimation(void *args);

int main(int argc, char **argv)
{
    int threadCount, samplesCount;
    double *subresult;
    ReadArguments(argc, argv, &threadCount, &samplesCount);
    argsEstimation_t *estimations = (argsEstimation_t *)malloc(sizeof(argsEstimation_t) * threadCount);
    if (estimations == NULL)
        ERR("Malloc error for estimation arguments!");
    srand(time(NULL));
    for (int i = 0; i < threadCount; i++)
    {
        estimations[i].seed = rand();
        estimations[i].samplesCount = samplesCount;
    }
    for (int i = 0; i < threadCount; i++)
    {
        int err = pthread_create(&(estimations[i].tid), NULL, pi_estimation, &estimations[i]);
        if (err != 0)
            ERR("Couldn't create thread");
    }
    double cumulativeResult = 0.0;
    for (int i = 0; i < threadCount; i++)
    {
        int err = pthread_join(estimations[i].tid, (void *)&subresult);
        if (err != 0)
            ERR("Can't join with a thread");
        if (NULL != subresult)
        {
            cumulativeResult += *subresult;
            free(subresult);
        }
    }
    double result = cumulativeResult / threadCount;
    printf("PI ~= %f\n", result);
    free(estimations);
}

void ReadArguments(int argc, char **argv, int *threadCount, int *samplesCount)
{
    *threadCount = DEFAULT_THREADCOUNT;
    *samplesCount = DEFAULT_SAMPLESIZE;

    if (argc >= 2)
    {
        *threadCount = atoi(argv[1]);
        if (*threadCount <= 0)
        {
            printf("Invalid value for 'threadCount'");
            exit(EXIT_FAILURE);
        }
    }
    if (argc >= 3)
    {
        *samplesCount = atoi(argv[2]);
        if (*samplesCount <= 0)
        {
            printf("Invalid value for 'samplesCount'");
            exit(EXIT_FAILURE);
        }
    }
}

void *pi_estimation(void *voidPtr)
{
    argsEstimation_t *args = voidPtr;
    double *result;
    if (NULL == (result = malloc(sizeof(double))))
        ERR("malloc");

    int insideCount = 0;
    for (int i = 0; i < args->samplesCount; i++)
    {
        double x = ((double)rand_r(&args->seed) / (double)RAND_MAX);
        double y = ((double)rand_r(&args->seed) / (double)RAND_MAX);
        if (sqrt(x * x + y * y) <= 1.0)
            insideCount++;
    }
    *result = 4.0 * (double)insideCount / (double)args->samplesCount;
    return result;
}

Program ten (i następne) nie pokazuje usage, zamiast tego ma zdefiniowane wartości domyślne dla parametrów pracy, wywołaj go bez parametrów w celu sprawdzenia.

Deklaracje zapowiadające (nie definicje) funkcji mogą być bardzo użyteczne, czasem są wręcz niezbędne, jeśli nie wiesz o czym mowa poczytaj tutaj.

W programach wielowątkowych nie można poprawnie używać funkcji rand(), zamiast tego używamy rand_r, jednak ta funkcja wymaga indywidualnego ziarna dla każdego z wątków.

W tym programie wykorzystanie wątków jest bardzo proste, program główny czeka zaraz po utworzeniu na ich zakończenie, możliwe są dużo bardziej skomplikowane scenariusze.

Pamiętaj, że niemal każde wywołanie funkcji systemowej (i wielu funkcji bibliotecznych) wymaga sprawdzenia czy nie wystąpił błąd i odpowiedniej reakcji jeśli wystąpił.

Makro ERR nie wysyła sygnału jak w programie wieloprocesowym, czemu?

Odpowiedź
Funkcja exit zamyka cały proces tzn. wszystkie wątki w jego obrębie.

Jak dane są przekazywane do nowo tworzonych wątków?

Odpowiedź
Wyłącznie przez strukturę argsEstimation_t do której wskaźnik jest argumentem funkcji wątku, nie ma żadnego powodu aby użyć zmiennych globalnych.

Czy dane na których pracują wątki są współdzielone pomiędzy nimi?

Odpowiedź
W tym programie nie, zatem nie ma też konieczności niczego synchronizować, każdy wątek dostaje tylko swoje dane.

Skąd wątki biorą “ziarno” do swoich wywołań rand_r?

Odpowiedź
Dostają to ziarno jako jedno z pól struktury z danymi wątku.

Czemu w kodzie używamy srand/rand czy to nie przeczy uwadze podanej kilka punktów powyżej?

Odpowiedź
Tylko jeden wątek używa srand/rand do tego czyni to zanim wystartują inne wątki zatem na etapie gdy program jest jeszcze jednowątkowy. Problem z srand/rand polega na pojedynczej zmiennej globalnej przechowującej aktualne ziarno (zmienna ta jest ukryta w bibliotece).

Czy moglibyśmy mieć jedną strukturę z parametrami startowymi programu? Czemu?

Odpowiedź
Nie, ze względu na pole z ziarnem losowości, to musi być inne dla wszystkich wątków

Czy tablica z strukturami startowymi mogłaby być zmienna automatyczną a nie dynamiczną ?

Odpowiedź
Tyko jeśli dodalibyśmy istotne ograniczenie na ilość wątków (np.: do tysiąca), w przeciwnym wypadku tablica ta może zająć cały stos.

Czemu służy zwalnianie pamięci danych zwróconych przez wątek?

Odpowiedź
Pamięć na te dane te została przydzielona dynamicznie, sterta jest wspólna dla wątków więc musimy zwolnić tą pamięć, zakończenie się wątku nie zrobi tego za nas.

Czy można zwrócić z wątku adres lokalnej (dla wątku) zmiennej automatycznej z wynikiem?

Odpowiedź
Nie, w momencie końca wątku jego stos jest zwalniany, więc jakikolwiek wskaźnik na tą pamięć przestaje być wiarygodny. Co gorsza, w wielu przypadkach to będzie wyglądać jakby działało, bo do "zepsucia" potrzeba aby program napisał sobie tą pamięć. Należy być wyjątkowo wyczulonym na tego rodzaju błędy bo są one potem bardzo trudne do wyszukania!

Czy można jakoś uniknąć dodatkowej alokacji w funkcji wątku?

Zadanie 2 - proste wątki typu detachable z wspólnymi zmiennymi i mutexem do synchronizacji #

Cel: Napisać program symulujący wizualizację rozkładu dwumianowego za pomocą deski Galtona (Galton Board) z 11 pojemnikami na kulki. Program ma przyjmować dwa parametry:

  • k … liczbę wątków zrzucających kulki,
  • n … całkowitą liczbę kulek do zrzucenia (w sumie).

Każdy wątek ma zrzucać kulki pojedynczo i po każdym rzucie aktualizuje licznik kulek dla odpowiedniego pojemnika. Główny wątek co sekundę sprawdza czy symulacja została zakończona (nie korzystamy z funkcji pthread_join). Po zrzuceniu wszystkich kulek główny wątek ma wypisać ilości kulek w pojemnikach, całkowitą liczbę kulek i otrzymaną “wartość oczekiwaną” przy dowolnym numerowaniu pojemników.

Co student musi wiedzieć:

  • man 3p pthread_mutex_destroy (cały opis)
  • man 3p pthread_mutex_lock
  • man 3p pthread_mutex_unlock
  • man 3p pthread_detach
  • man 3p pthread_attr_init
  • man 3p pthread_attr_destroy
  • man 3p pthread_attr_setdetachstate
  • man 3p pthread_attr_getdetachstate
  • deska Galtona na stronie.

rozwiązanie, cały prog18.c:

#include <pthread.h>
#include <stdarg.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define MAXLINE 4096
#define DEFAULT_N 1000
#define DEFAULT_K 10
#define BIN_COUNT 11
#define NEXT_DOUBLE(seedptr) ((double)rand_r(seedptr) / (double)RAND_MAX)
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))

typedef unsigned int UINT;
typedef struct argsThrower
{
    pthread_t tid;
    UINT seed;
    int *pBallsThrown;
    int *pBallsWaiting;
    int *bins;
    pthread_mutex_t *mxBins;
    pthread_mutex_t *pmxBallsThrown;
    pthread_mutex_t *pmxBallsWaiting;
} argsThrower_t;

void ReadArguments(int argc, char **argv, int *ballsCount, int *throwersCount);
void make_throwers(argsThrower_t *argsArray, int throwersCount);
void *throwing_func(void *args);
int throwBall(UINT *seedptr);

int main(int argc, char **argv)
{
    int ballsCount, throwersCount;
    ReadArguments(argc, argv, &ballsCount, &throwersCount);
    int ballsThrown = 0, bt = 0;
    int ballsWaiting = ballsCount;
    pthread_mutex_t mxBallsThrown = PTHREAD_MUTEX_INITIALIZER;
    pthread_mutex_t mxBallsWaiting = PTHREAD_MUTEX_INITIALIZER;
    int bins[BIN_COUNT];
    pthread_mutex_t mxBins[BIN_COUNT];
    for (int i = 0; i < BIN_COUNT; i++)
    {
        bins[i] = 0;
        if (pthread_mutex_init(&mxBins[i], NULL))
            ERR("Couldn't initialize mutex!");
    }
    argsThrower_t *args = (argsThrower_t *)malloc(sizeof(argsThrower_t) * throwersCount);
    if (args == NULL)
        ERR("Malloc error for throwers arguments!");
    srand(time(NULL));
    for (int i = 0; i < throwersCount; i++)
    {
        args[i].seed = (UINT)rand();
        args[i].pBallsThrown = &ballsThrown;
        args[i].pBallsWaiting = &ballsWaiting;
        args[i].bins = bins;
        args[i].pmxBallsThrown = &mxBallsThrown;
        args[i].pmxBallsWaiting = &mxBallsWaiting;
        args[i].mxBins = mxBins;
    }
    make_throwers(args, throwersCount);
    while (bt < ballsCount)
    {
        sleep(1);
        pthread_mutex_lock(&mxBallsThrown);
        bt = ballsThrown;
        pthread_mutex_unlock(&mxBallsThrown);
    }
    int realBallsCount = 0;
    double meanValue = 0.0;
    for (int i = 0; i < BIN_COUNT; i++)
    {
        realBallsCount += bins[i];
        meanValue += bins[i] * i;
    }
    meanValue = meanValue / realBallsCount;
    printf("Bins count:\n");
    for (int i = 0; i < BIN_COUNT; i++)
        printf("%d\t", bins[i]);
    printf("\nTotal balls count : %d\nMean value: %f\n", realBallsCount, meanValue);
    // for (int i = 0; i < BIN_COUNT; i++) pthread_mutex_destroy(&mxBins[i]);
    // free(args);
    // The resources used by detached threads cannod be freed as we are not sure
    // if they are running yet.
    exit(EXIT_SUCCESS);
}

void ReadArguments(int argc, char **argv, int *ballsCount, int *throwersCount)
{
    *ballsCount = DEFAULT_N;
    *throwersCount = DEFAULT_K;
    if (argc >= 2)
    {
        *ballsCount = atoi(argv[1]);
        if (*ballsCount <= 0)
        {
            printf("Invalid value for 'balls count'");
            exit(EXIT_FAILURE);
        }
    }
    if (argc >= 3)
    {
        *throwersCount = atoi(argv[2]);
        if (*throwersCount <= 0)
        {
            printf("Invalid value for 'throwers count'");
            exit(EXIT_FAILURE);
        }
    }
}

void make_throwers(argsThrower_t *argsArray, int throwersCount)
{
    pthread_attr_t threadAttr;
    if (pthread_attr_init(&threadAttr))
        ERR("Couldn't create pthread_attr_t");
    if (pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_DETACHED))
        ERR("Couldn't setdetachsatate on pthread_attr_t");
    for (int i = 0; i < throwersCount; i++)
    {
        if (pthread_create(&argsArray[i].tid, &threadAttr, throwing_func, &argsArray[i]))
            ERR("Couldn't create thread");
    }
    pthread_attr_destroy(&threadAttr);
}

void *throwing_func(void *voidArgs)
{
    argsThrower_t *args = voidArgs;
    while (1)
    {
        pthread_mutex_lock(args->pmxBallsWaiting);
        if (*args->pBallsWaiting > 0)
        {
            (*args->pBallsWaiting) -= 1;
            pthread_mutex_unlock(args->pmxBallsWaiting);
        }
        else
        {
            pthread_mutex_unlock(args->pmxBallsWaiting);
            break;
        }
        int binno = throwBall(&args->seed);
        pthread_mutex_lock(&args->mxBins[binno]);
        args->bins[binno] += 1;
        pthread_mutex_unlock(&args->mxBins[binno]);
        pthread_mutex_lock(args->pmxBallsThrown);
        (*args->pBallsThrown) += 1;
        pthread_mutex_unlock(args->pmxBallsThrown);
    }
    return NULL;
}

/* returns # of bin where ball has landed */
int throwBall(UINT *seedptr)
{
    int result = 0;
    for (int i = 0; i < BIN_COUNT - 1; i++)
        if (NEXT_DOUBLE(seedptr) > 0.5)
            result++;
    return result;
}

Wszystkie dane konieczne do pracy wątku są przekazywane przez strukturę argsThrower_t, wynik jego pracy to modyfikacja w tablicy bins, znowu nic nie jest przekazywane przez zmienne globalne.

W programie mamy dwa muteksy chroniące dostęp do liczników oraz całą tablice muteksów chroniących dostęp do komórek tablicy bins (po jednym na komórkę). Zatem muteksów jest BIN_COUNT+2.

W programie tworzymy wątki typu “detachable” czyli nie ma potrzeby (ani możliwości) czekać na ich zakończenie, stąd brak pthread_join, za to nie wiemy kiedy wątki robocze skończą działanie, musimy do tego stworzyć własny test.

W programie muteksy są inicjowane zarówno automatycznie jak i dynamicznie, jako zmienne automatyczne muteksy są prostsze w tworzeniu ale trzeba z góry znać ilość muteksów. Muteksy jako zmienne dynamicznie nie mają tego ograniczenia ale trzeba inicjować i usuwać mutex w kodzie.

Czy dane przekazane przez argsThrower_t są współdzielone pomiędzy wątki?

Odpowiedź
Częściowo tak, wspólne dane muszą być chronione przy wielodostępie, stąd muteksy dla liczników i koszy.

Czy struktura argsThrower_t jest optymalna?

Odpowiedź
Nie wiele pól kopiujemy dla każdego wątku, chociaż te wskaźniki są takie same. Wspólne elementy można przenieść do drugiej struktury i tylko podawać jeden wskaźnik każdemu wątkowi zamiast 6 jak w kodzie. Dodatkowo przechowujmy w tej strukturze tid'y wątków, ale z tego nigdzie nie korzystamy.

Czemu używamy głównie wskaźników do przekazania danych do wątków?

Odpowiedź
Współdzielimy te dane, nie chcemy mieć lokalnych kopi zupełnie niezależnych od innych wątków, chcemy mieć jedną kopię tych danych dla wszystkich wątków.

Czy mutex’y można przekazać nie przez wskaźnik?

Odpowiedź
NIE, TO ZABRONIONE PRZEZ POSIX, kopia muteksu nie musi być muteksem, a już zupełnie oczywiste, że nie byłaby tym samym muteksem

Ten program używa dużo muteksów, czy może być ich mniej?

Odpowiedź
Tak, w skrajnym wypadku wystarczyłby 1 ale kosztem równoległości, możliwe są też rozwiązania pośrednie np. po jednym na liczniki i jeden wspólny na kosze. To ostatnie rozwiązanie, pomimo zmniejszenia równoległości, może okazać się szybsze bo liczne operacje na muteksach są dodatkowych obciążeniem dla systemu.

Metoda czekania na zakończenie się wątków liczących polega na okresowym sprawdzaniu czy licznik kul zrzuconych jest równy sumie kul w programie, czy jest to optymalne rozwiązanie?

Odpowiedź
Nie jest. To jest tzw. soft busy waiting, ale bez użycia zmiennych warunkowych nie możemy tego zrobić lepiej.

Czy wszystkie wątki w tym programie na prawdę pracują?

Odpowiedź
Nie, zwłaszcza gdy będzie ich dużo. Możliwe jest tzw. zagłodzenie wątku, czyli nie danie mu szansy na pracę. W tym programie działania wątku są na prawdę szybkie a tworzenie nowych wątków dość czasochłonne, może się tak stać, że wątki utworzone jako ostatnie nie będą miały co robić bo te wcześniejsze przerzucą wszystkie kulki wcześniej. Jako sprawdzenie dodaj do wątków liczniki ile kulek przerzucił dany wątek i wypisz je na koniec na ekran. Można zapobiec temu zjawisku synchronizując moment rozpoczęcia pracy przez wątki, ale tu znowu nie wystarczy muteks, najlepiej taką synchronizację zrobić na barierze lub zmiennej warunkowej, a te mechanizmy poznacie w przyszłym semestrze.

Zadanie 3 - wątki i sygnały z czekaniem na sygnał za pomocą funkcji sigwait #

Cel: Napisać program, który przyjmuje jeden parametr ‘k’ i co sekundę wyświetla listę liczb, początkowo od 1 do k. Program ma obsługiwać dwa sygnały za pomocą oddzielnego wątku. Wątek ten po otrzymaniu sygnału podejmuje następujące akcje:

  • SIGINT (C-c) … usuwa losową liczbę z listy (jeżeli lista jest pusta to nic nie robi),
  • SIGQUIT (C-) … ustawia flagę ‘STOP’ i kończy swoje działanie.

Wątek główny ma za zadanie co sekundę wyświetlić listę lub, jeżeli ustawiona jest flaga ‘STOP’, poprawnie zakończyć swoje działanie.

Co student musi wiedzieć:

  • man 3p pthread_sigmask
  • man 3p sigprocmask
  • man 3p sigwait

rozwiązanie, cały prog19.c:

#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define MAXLINE 4096
#define DEFAULT_ARRAYSIZE 10
#define DELETED_ITEM -1
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))

typedef struct argsSignalHandler
{
    pthread_t tid;
    int *pArrayCount;
    int *array;
    pthread_mutex_t *pmxArray;
    sigset_t *pMask;
    bool *pQuitFlag;
    pthread_mutex_t *pmxQuitFlag;
} argsSignalHandler_t;

void ReadArguments(int argc, char **argv, int *arraySize);
void removeItem(int *array, int *arrayCount, int index);
void printArray(int *array, int arraySize);
void *signal_handling(void *);

int main(int argc, char **argv)
{
    int arraySize, *array;
    bool quitFlag = false;
    pthread_mutex_t mxQuitFlag = PTHREAD_MUTEX_INITIALIZER;
    pthread_mutex_t mxArray = PTHREAD_MUTEX_INITIALIZER;
    ReadArguments(argc, argv, &arraySize);
    int arrayCount = arraySize;
    if (NULL == (array = (int *)malloc(sizeof(int) * arraySize)))
        ERR("Malloc error for array!");
    for (int i = 0; i < arraySize; i++)
        array[i] = i + 1;
    sigset_t oldMask, newMask;
    sigemptyset(&newMask);
    sigaddset(&newMask, SIGINT);
    sigaddset(&newMask, SIGQUIT);
    if (pthread_sigmask(SIG_BLOCK, &newMask, &oldMask))
        ERR("SIG_BLOCK error");
    argsSignalHandler_t args;
    args.pArrayCount = &arrayCount;
    args.array = array;
    args.pmxArray = &mxArray;
    args.pMask = &newMask;
    args.pQuitFlag = &quitFlag;
    args.pmxQuitFlag = &mxQuitFlag;
    if (pthread_create(&args.tid, NULL, signal_handling, &args))
        ERR("Couldn't create signal handling thread!");
    while (true)
    {
        pthread_mutex_lock(&mxQuitFlag);
        if (quitFlag == true)
        {
            pthread_mutex_unlock(&mxQuitFlag);
            break;
        }
        else
        {
            pthread_mutex_unlock(&mxQuitFlag);
            pthread_mutex_lock(&mxArray);
            printArray(array, arraySize);
            pthread_mutex_unlock(&mxArray);
            sleep(1);
        }
    }
    if (pthread_join(args.tid, NULL))
        ERR("Can't join with 'signal handling' thread");
    free(array);
    if (pthread_sigmask(SIG_UNBLOCK, &newMask, &oldMask))
        ERR("SIG_BLOCK error");
    exit(EXIT_SUCCESS);
}

void ReadArguments(int argc, char **argv, int *arraySize)
{
    *arraySize = DEFAULT_ARRAYSIZE;

    if (argc >= 2)
    {
        *arraySize = atoi(argv[1]);
        if (*arraySize <= 0)
        {
            printf("Invalid value for 'array size'");
            exit(EXIT_FAILURE);
        }
    }
}

void removeItem(int *array, int *arrayCount, int index)
{
    int curIndex = -1;
    int i = -1;
    while (curIndex != index)
    {
        i++;
        if (array[i] != DELETED_ITEM)
            curIndex++;
    }
    array[i] = DELETED_ITEM;
    *arrayCount -= 1;
}

void printArray(int *array, int arraySize)
{
    printf("[");
    for (int i = 0; i < arraySize; i++)
        if (array[i] != DELETED_ITEM)
            printf(" %d", array[i]);
    printf(" ]\n");
}

void *signal_handling(void *voidArgs)
{
    argsSignalHandler_t *args = voidArgs;
    int signo;
    srand(time(NULL));
    for (;;)
    {
        if (sigwait(args->pMask, &signo))
            ERR("sigwait failed.");
        switch (signo)
        {
            case SIGINT:
                pthread_mutex_lock(args->pmxArray);
                if (*args->pArrayCount > 0)
                    removeItem(args->array, args->pArrayCount, rand() % (*args->pArrayCount));
                pthread_mutex_unlock(args->pmxArray);
                break;
            case SIGQUIT:
                pthread_mutex_lock(args->pmxQuitFlag);
                *args->pQuitFlag = true;
                pthread_mutex_unlock(args->pmxQuitFlag);
                return NULL;
            default:
                printf("unexpected signal %d\n", signo);
                exit(1);
        }
    }
    return NULL;
}

W strukturze argumentów argsSignalHandler_t przekazujemy wskazania na dane współdzielone przez oba wątki czyli tablicę i flagę STOP oraz muteksy je chroniące. Dodatkowo maskę sygnałów i tid wątku obsługi sygnałów, które to dane nie są współdzielone.

W wątkach do ustawiania maski sygnałów (per wątek) używamy pthread_sigmask, nie powinno się, gdy istnieje więcej wątków niż jeden w programie używać f. sigprocmask.

Delegowanie oddzielnego wątku do obsługi sygnałów jest typowym i wygodnym sposobem radzenia sobie w programach wielowątkowych.

Ile wątków jest w tym programie?

Odpowiedź
Dwa, wątek główny utworzony przez system (każdy proces ma ten jeden wątek) oraz watek utworzony przez nas w kodzie.

Wypunktuj różnicę i podobieństwa pomiędzy sigwait i sigsuspend:

Odpowiedź
- sigwait nie wymaga funkcji obsługi sygnału tak jak sigsuspend - obie metody wymagają blokowania sygnałów oczekiwanych - sigwait nie jest przerywany przez funkcję obsługi sygnałów - nie może to wynika z POSIX - sigwait nie zmienia maski blokowanych sygnałów więc nie ma mowy o uruchomieniu obsługi sygnału blokowanego (jeśli by takowa była, w przykładzie nie ma) jak przy sigsuspend

Po wywołaniu sigwait tylko jeden typ sygnału jest zdejmowany z wektora sygnałów oczekujących wiec problem jaki mieliśmy z obsługa wielu sygnałów w trakcie pojedynczego sigsuspend w przykładzie z L2 nie wystąpiłby jeśli zamienić użycie sigsuspend na sigwait (ćwiczenie do zrobienia)

Czy metoda czekania na potomka jest w tym programie równie słaba jak w poprzednim?

Odpowiedź
Nie, tutaj sprawdzanie co sekundę jest częścią zadania, gdyby jednak nie było to i tak unikniemy busy waitingu bo możemy się synchronizować na pthread_join.

Czy w tym programie można użyć sigprocmask zamiast pthread_sigmask?

Odpowiedź
Tak, wywołanie blokujące sygnały pojawia się zanim powstaną dodatkowe wątki.

Czemu nie sprawdzamy błędów wywołań funkcji systemowych związanych pozyskaniem i zwolnieniem muteksów

Odpowiedź
Podstawowy typ muteksu (a taki używamy w programie) nie sprawdza i nie raportuje błędów. Dodanie sprawdzania nie byłoby wcale złe, łatwiej można potem zmienić muteks na typ raportujący błędy.

Zadanie 4 - anulowanie wątków, cleanup handlers #

Cel: Napisać program symulujący losy studentów wydziału MiNI. Program ma następujące argumenty:

  • n <= 100 … liczba nowych studentów,

Program przechowuje liczniki studentów na 1, 2, 3 roku oraz inżynierów.

Wątek główny: Inicjuje studentów, a przez następne 4 sekundy, w losowych odstępach czasu (100-300ms) wyrzuca jednego studenta (anuluje wątek). Po czterech sekundach, czeka na zakończenie działania studentów i wypisuje liczbę inżynierów oraz wyrzuconych osób.

Wątek studenta: Każdy student jest nowo-utworzonym wątkiem. Student dodaje się do licznika 1. roku, po sekundzie odejmuje się z tego licznika i dodaję do 2. roku. Analogicznie po sekundzie przechodzi do 3. roku i po jeszcze jednej sekundzie do inżynierów. Student jest zawsze przygotowany na opuszczenie uczelni.

Co student musi wiedzieć:

  • man 3p pthread_cancel
  • man 3 pthread_cleanup_push
  • man 3 pthread_cleanup_pop
  • man 7 time
  • man 3p clock_getres

rozwiązanie, cały prog20.c:

#include <errno.h>
#include <pthread.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#define MAXLINE 4096
#define DEFAULT_STUDENT_COUNT 100
#define ELAPSED(start, end) ((end).tv_sec - (start).tv_sec) + (((end).tv_nsec - (start).tv_nsec) * 1.0e-9)
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))

typedef unsigned int UINT;
typedef struct timespec timespec_t;
typedef struct studentList
{
    bool *removed;
    pthread_t *thStudents;
    int count;
    int present;
} studentsList_t;
typedef struct yearCounters
{
    int values[4];
    pthread_mutex_t mxCounters[4];
} yearCounters_t;
typedef struct argsModify
{
    yearCounters_t *pYearCounters;
    int year;
} argsModify_t;
void ReadArguments(int argc, char **argv, int *studentsCount);
void *student_life(void *);
void increment_counter(argsModify_t *args);
void decrement_counter(void *_args);
void msleep(UINT milisec);
void kick_student(studentsList_t *studentsList);

int main(int argc, char **argv)
{
    int studentsCount;
    ReadArguments(argc, argv, &studentsCount);
    yearCounters_t counters = {.values = {0, 0, 0, 0},
                               .mxCounters = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER,
                                              PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER}};
    studentsList_t studentsList;
    studentsList.count = studentsCount;
    studentsList.present = studentsCount;
    studentsList.thStudents = (pthread_t *)malloc(sizeof(pthread_t) * studentsCount);
    studentsList.removed = (bool *)malloc(sizeof(bool) * studentsCount);
    if (studentsList.thStudents == NULL || studentsList.removed == NULL)
        ERR("Failed to allocate memory for 'students list'!");
    for (int i = 0; i < studentsCount; i++)
        studentsList.removed[i] = false;
    for (int i = 0; i < studentsCount; i++)
        if (pthread_create(&studentsList.thStudents[i], NULL, student_life, &counters))
            ERR("Failed to create student thread!");
    srand(time(NULL));
    timespec_t start, current;
    if (clock_gettime(CLOCK_REALTIME, &start))
        ERR("Failed to retrieve time!");
    do
    {
        msleep(rand() % 201 + 100);
        if (clock_gettime(CLOCK_REALTIME, &current))
            ERR("Failed to retrieve time!");
        kick_student(&studentsList);
    } while (ELAPSED(start, current) < 4.0);
    for (int i = 0; i < studentsCount; i++)
        if (pthread_join(studentsList.thStudents[i], NULL))
            ERR("Failed to join with a student thread!");
    printf(" First year: %d\n", counters.values[0]);
    printf("Second year: %d\n", counters.values[1]);
    printf(" Third year: %d\n", counters.values[2]);
    printf("  Engineers: %d\n", counters.values[3]);
    free(studentsList.removed);
    free(studentsList.thStudents);
    exit(EXIT_SUCCESS);
}

void ReadArguments(int argc, char **argv, int *studentsCount)
{
    *studentsCount = DEFAULT_STUDENT_COUNT;
    if (argc >= 2)
    {
        *studentsCount = atoi(argv[1]);
        if (*studentsCount <= 0)
        {
            printf("Invalid value for 'studentsCount'");
            exit(EXIT_FAILURE);
        }
    }
}

void *student_life(void *voidArgs)
{
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
    argsModify_t args;
    args.pYearCounters = voidArgs;
    for (args.year = 0; args.year < 3; args.year++)
    {
        increment_counter(&args);
        pthread_cleanup_push(decrement_counter, &args);
        msleep(1000);
        pthread_cleanup_pop(1);
    }
    increment_counter(&args);
    return NULL;
}

void increment_counter(argsModify_t *args)
{
    pthread_mutex_lock(&(args->pYearCounters->mxCounters[args->year]));
    args->pYearCounters->values[args->year] += 1;
    pthread_mutex_unlock(&(args->pYearCounters->mxCounters[args->year]));
}

void decrement_counter(void *_args)
{
    argsModify_t *args = _args;
    pthread_mutex_lock(&(args->pYearCounters->mxCounters[args->year]));
    args->pYearCounters->values[args->year] -= 1;
    pthread_mutex_unlock(&(args->pYearCounters->mxCounters[args->year]));
}

void msleep(UINT milisec)
{
    time_t sec = (int)(milisec / 1000);
    milisec = milisec - (sec * 1000);
    timespec_t req = {0};
    req.tv_sec = sec;
    req.tv_nsec = milisec * 1000000L;
    if (nanosleep(&req, &req))
        ERR("nanosleep");
}

void kick_student(studentsList_t *studentsList)
{
    int idx;
    if (0 == studentsList->present)
        return;
    do
    {
        idx = rand() % studentsList->count;
    } while (studentsList->removed[idx] == true);
    pthread_cancel(studentsList->thStudents[idx]);
    studentsList->removed[idx] = true;
    studentsList->present--;
}

Wątkom przekazuje się strukturę z aktualnym rokiem oraz z wskazaniem na 4 liczniki z muteksami, struktura argsModify_t nie powiela błędu z zadania 2 czyli nie przekazuje indywidualnych kopii zbyt wielu wskaźników.

Struktura studentsList_t jest używana jedynie przez wątek główny, nie jest “widziana” przez wątki studentów.

“Sprytna” inicjalizacja struktury yearCounters_t nie zadziała z archaicznymi standardami języka C (c89/c90). Warto wiedzieć, oczywiście używamy wszelkich dobrodziejstw nowszych standardów.

Cleanup handlery w funkcji wątku służą do bezpiecznego usunięcia studenta z licznika roku w razie anulowania w czasie spania, bez nich takie anulowanie pozostawiłoby studenta na roku do końca działania programu!

Pamiętaj, że pthread_cleanup_push musi być sparowany z pthread_cleanup_pop w obrębie tego samego bloku {}

Ile w programie występuje mutexów i co chronią?

Odpowiedź
Dokładnie 4, każdy chroni inny współdzielony licznik.

Czy aktualny rok studenta musi być częścią struktury argsModify_t?

Odpowiedź
Nie, równie dobrze mógłby być automatyczną zmienną utworzoną w wątku, nie byłaby wtedy potrzebna struktura argsModify_t , wątkom przekazywalibyśmy wskaźnik na strukturę yearCounters.

Co to znaczy, że wątek ma stan anulowania PTHREAD_CANCEL_DEFERRED ?

Odpowiedź
Anulowanie nastąpi dopiero podczas wywołania funkcji z listy funkcji "cancelation points"

Które z funkcji użytych w funkcji wątku są punktami anulowania wątku?

Odpowiedź
W tym kodzie tylko nanosleep (wywołany z msleep).

Skąd wiemy, które funkcje są takimi punktami?

Odpowiedź
$man 7 pthreads

Co oznacza jedynka w wywołaniu: pthread_cleanup_pop(1); ?

Odpowiedź
Oznacza, że poza zdjęciem handlera ze stosu jest on też wykonywany.

Kiedy jest zmniejszany liczniku roku?

Odpowiedź
W dwu przypadkach, podczas anulowania (rzadki przypadek) oraz podczas zdjęcia handlera decrement_counter ze stosu cleanup handlerów.

W algorytmie losowania wątku do anulowania jest spory błąd logiczny, gdzie i jakie niesie zagrożenie?

Odpowiedź
Losowy wybór wątku do anulowania może trwać bardzo długo gdy na dużej liście wątków zostanie np. tylko 1 nie anulowany. Można to zaobserwować, jeśli wywołać program z parametrem 10.

Jako ćwiczenie popraw sposób losowania “żyjącego” wątku do anulowania.

Zwróć uwagę na sposób odliczenia 4 sekund w losowych interwałach za pomocą clock_gettime i nanosleep, to można zmienić, dodając wywołanie f.alarm i oddając obsługę sygnału SIGALRM. Rozwiązanie takie można wykonać jako ćwiczenie.

Wykonaj przykładowe zadania. Podczas laboratorium będziesz miał więcej czasu oraz dostępny startowy kod, jeśli jednak wykonasz poniższe zadania w przewidzianym czasie, to znaczy, że jesteś dobrze przygotowany do zajęć.

Kody źródłowe z treści tutoriala #