L4 - Synchronizacja

L4 - Synchronizacja #

Uwagi wstępne:

  • Szybkie przejrzenie tutoriala prawdopodobnie nic nie pomoże, należy samodzielnie uruchomić programy, sprawdzić jak działają, poczytać materiały dodatkowe takie jak strony man. W trakcie czytania sugeruję wykonywać ćwiczenia a na koniec przykładowe zadanie.
  • Na żółtych polach podaję dodatkowe informacje, niebieskie zawierają pytania i ćwiczenia. Pod pytaniami znajdują się odpowiedzi, które staną się widoczne dopiero po kliknięciu. Proszę najpierw spróbować sobie odpowiedzieć na pytanie samemu a dopiero potem sprawdzać odpowiedź.
  • Pełne kody do zajęć znajdują się w załącznikach na dole strony. W tekście są tylko te linie kodu, które są konieczne do zrozumienia problemu.
  • Materiały i ćwiczenia są ułożone w pewną logiczną całość, czasem do wykonania ćwiczenia konieczny jest stan osiągnięty poprzednim ćwiczeniem dlatego zalecam wykonywanie ćwiczeń w miarę przyswajania materiału.
  • Większość ćwiczeń wymaga użycia konsoli poleceń, zazwyczaj zakładam, ze pracujemy w jednym i tym samym katalogu roboczym więc wszystkie potrzebne pliki są “pod ręką” tzn. nie ma potrzeby podawania ścieżek dostępu.
  • Czasem podaję znak $ aby podkreślić, że chodzi o polecenie konsolowe, nie piszemy go jednak w konsoli np.: piszę “$make” w konsoli wpisujemy samo “make”.
  • To co ćwiczymy wróci podczas kolejnych zajęć. Jeśli po zajęciach i teście coś nadal pozostaje niejasne proszę to poćwiczyć a jeśli trzeba dopytać się u prowadzących.

The alarm - Semaphores #

Napisz wielowątkowy program zegara. Użytkownik wprowadza liczbę sekund, które program ma odmierzyć, a następnie oczekuje na odpowiedź. Program uruchamia osobny wątek dla każdego nowego żądania. Wątek usypia się na określony czas i wypisuje komunikat “Wake up”. Następnie wątek kończy działanie.

Program ma ograniczenie 5 równoczesnych wątków. Jeśli program otrzyma więcej żądań, natychmiastowo wyświetli komunikat “Only 5 alarms can be set at the time”.

Ograniczenie na współbieżne wątki może być narzucone za pomocą semafora POSIX.

solution prog21.c:

#define _GNU_SOURCE
#include <errno.h>
#include <netinet/in.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))

#define FS_NUM 5
#define MAX_INPUT 120
volatile sig_atomic_t work = 1;

void sigint_handler(int sig) { work = 0; }

int set_handler(void (*f)(int), int sigNo)
{
    struct sigaction act;
    memset(&act, 0, sizeof(struct sigaction));
    act.sa_handler = f;
    if (-1 == sigaction(sigNo, &act, NULL))
        return -1;
    return 0;
}

struct arguments
{
    int32_t time;
    sem_t *semaphore;
};

void *thread_func(void *arg)
{
    struct arguments *args = (struct arguments *)arg;
    uint32_t tt;
    fprintf(stderr, "Will sleep for %d\n", args->time);
    for (tt = args->time; tt > 0; tt = sleep(tt))
        ;
    puts("Wake up");
    if (sem_post(args->semaphore) == -1)
        ERR("sem_post");
    free(args);
    return NULL;
}

void do_work()
{
    int32_t time;
    pthread_t thread;
    char input[MAX_INPUT];
    struct arguments *args;
    sem_t semaphore;
    if (sem_init(&semaphore, 0, FS_NUM) != 0)
        ERR("sem_init");
    while (work)
    {
        puts("Please enter the number of seconds for the alarm delay:");
        if(fgets(input, MAX_INPUT, stdin) == NULL) {
            if (errno == EINTR)
                continue;
            ERR("fgets:");
        }
        
        time = atoi(input);
        if(time <= 0) {
            fputs("Incorrect time specified", stderr);
            continue;
        }
        
        if (TEMP_FAILURE_RETRY(sem_trywait(&semaphore)) == -1)
        {
            switch (errno)
            {
                case EAGAIN:
                    fprintf(stderr, "Only %d alarms can be set at the time.", FS_NUM);
                case EINTR:
                    continue;
            }
            ERR("sem_trywait");
        }
        
        if ((args = (struct arguments *)malloc(sizeof(struct arguments))) == NULL)
            ERR("malloc:");
        args->time = time;
        args->semaphore = &semaphore;
        if (pthread_create(&thread, NULL, thread_func, (void *) args) != 0)
            ERR("pthread_create");
        if (pthread_detach(thread) != 0)
            ERR("pthread_detach");
    }
}

int main(int argc, char **argv)
{
    if (set_handler(sigint_handler, SIGINT))
        ERR("Seting SIGINT:");
    do_work();
    fprintf(stderr, "Program has terminated.\n");
    return EXIT_SUCCESS;
}

Warto samemu dokonać dokładnej analizy powyższego kodu pod kątem elementów nie omawianych tutaj, proszę to zrobić jako ćwiczenie.

Czemu przed uruchomieniem wątku w serwerze alokujemy pamięć? Czy można się tej alokacji pozbyć z kodu?

Answer
Nie możemy użyć jednej wspólnej struktury bo na raz może być więcej niż jeden wątek roboczy. Można zrobić tablice FS_NUM struktur ale trzeba by wtedy zarządzać wolnymi polami w tej tablicy a to oznacza trochę więcej kodu. Najłatwiej jest zaalokować pamięć dla tej struktury przed uruchomieniem wątku. Pamiętamy o zwolnieniu tej pamięci w wątku zanim się skończy.

Gdzie jest odliczany czas?

Answer
W wątku roboczym, w tym celu go tworzymy.

Czemu służy semafor?

Answer
Semafor odlicza ile jeszcze wątków można utworzyć aby na raz nie działało ich więcej niż FS_NUM (10). Na początek jego wartość to 10, przed utworzeniem kolejnego wątku zmniejszamy wartość semafora. Zanim wątek skończy zwiększa tą wartość. Jeśli wartość semafora jest zerowa to nie da się już utworzyć kolejnego wątku.

Czemu przed utworzeniem kolejnego wątku roboczego w programie używamy sem_trywait? Co się stanie gdy zamienimy to na sem_wait?

Answer
Robimy tak, aby od razu zorientować się, że wyczerpaliśmy limit wątków. Gdyby zastąpić trywait wait'em to program poczekałby aż się zwolni jakiś wątek i dopiero wtedy obsłużyłby zadanie, ale to oznacza, że czas odliczony dla tego zadania byłby dłuższy niż użytkownik żądał!

Co powoduje limitowanie ustawionych alarmów do 5 naraz?

Answer
Semafor, zerknij na 2 powyższe pytania i odpowiedzi.

Threads pool - conditional variables #

Original author: Jerzy Bartuszek

Napisz prosty program, który czyta z “/dev/urandom” i zapisuje jego zawartość do plików. Za każdym razem, gdy użytkownik naciśnie Enter, program odczytuje losowe bajty i zapisuje je do pliku. Program jest wielowątkową aplikacją - każde żądanie obsługiwane jest w osobnym wątku. Każdy wątek zapisuje losowe bajty do swojego własnego pliku. Program tworzy wątki z góry określoną liczbę THREADS_NUM i przechowuje nieużywane wątki w puli wątków. Po otrzymaniu sygnału SIGINT serwer przestaje akceptować wejście od użytkownika i kończy swoje wykonanie po obsłużeniu wszystkich bieżących żądań.

solution prog22.c:

#define _GNU_SOURCE
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>

#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))

#define BUFFERSIZE 256
#define READCHUNKS 4
#define THREAD_NUM 3
volatile sig_atomic_t work = 1;

typedef struct
{
    int id;
    int *idlethreads;
    int *condition;
    pthread_cond_t *cond;
    pthread_mutex_t *mutex;
} thread_arg;

void sigint_handler(int sig) { work = 0; }

void set_handler(void (*f)(int), int sigNo)
{
    struct sigaction act;
    memset(&act, 0x00, sizeof(struct sigaction));
    act.sa_handler = f;
    
    if (-1 == sigaction(sigNo, &act, NULL))
        ERR("sigaction");
}

ssize_t bulk_read(int fd, char *buf, size_t count)
{
    int c;
    size_t len = 0;
    do
    {
        c = TEMP_FAILURE_RETRY(read(fd, buf, count));
        if (c < 0)
            return c;
        if (c == 0)
            return len;
        buf += c;
        len += c;
        count -= c;
    } while (count > 0);
    return len;
}

ssize_t bulk_write(int fd, char *buf, size_t count)
{
    int c;
    size_t len = 0;
    do
    {
        c = TEMP_FAILURE_RETRY(write(fd, buf, count));
        if (c < 0)
            return c;
        buf += c;
        len += c;
        count -= c;
    } while (count > 0);
    return len;
}

void cleanup(void *arg) { pthread_mutex_unlock((pthread_mutex_t *)arg); }

void read_random(int thread_id)
{
    char file_name[20];
    char buffer[BUFFERSIZE];
    snprintf(file_name, sizeof(file_name), "random%d.bin", thread_id);
    printf("Writing to a file %s\n", file_name);
    int i, in, out;
    ssize_t count;
    if ((out = TEMP_FAILURE_RETRY(open(file_name, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND, 0777))) < 0)
        ERR("open");
    if ((in = TEMP_FAILURE_RETRY(open("/dev/urandom", O_RDONLY))) < 0)
        ERR("open");
    for (i = 0; i < READCHUNKS; i++)
    {
        if ((count = bulk_read(in, buffer, BUFFERSIZE)) < 0)
            ERR("bulk_read");
        if ((count = bulk_write(out, buffer, count)) < 0)
            ERR("bulk_write");
        sleep(1);
    }
    if (TEMP_FAILURE_RETRY(close(in)))
        ERR("close");
    if (TEMP_FAILURE_RETRY(close(out)))
        ERR("close");
}

void *thread_func(void *arg)
{
    thread_arg targ;
    memcpy(&targ, arg, sizeof(targ));
    while (1)
    {
        pthread_cleanup_push(cleanup, (void *)targ.mutex);
        if (pthread_mutex_lock(targ.mutex) != 0)
            ERR("pthread_mutex_lock");
        (*targ.idlethreads)++;
        while (!*targ.condition && work)
            if (pthread_cond_wait(targ.cond, targ.mutex) != 0)
                ERR("pthread_cond_wait");
        *targ.condition = 0;
        if (!work)
            pthread_exit(NULL);
        (*targ.idlethreads)--;
        pthread_cleanup_pop(1);
        read_random(targ.id);
    }
    return NULL;
}

void init(pthread_t *thread, thread_arg *targ, pthread_cond_t *cond, pthread_mutex_t *mutex, int *idlethreads,
          int *condition)
{
    int i;
    for (i = 0; i < THREAD_NUM; i++)
    {
        targ[i].id = i + 1;
        targ[i].cond = cond;
        targ[i].mutex = mutex;
        targ[i].idlethreads = idlethreads;
        targ[i].condition = condition;
        if (pthread_create(&thread[i], NULL, thread_func, (void *) &targ[i]) != 0)
            ERR("pthread_create");
    }
}

void do_work(pthread_cond_t *cond, pthread_mutex_t *mutex, const int *idlethreads, int *condition)
{
    char buffer[BUFFERSIZE];
    while (work)
    {
        if (fgets(buffer, BUFFERSIZE, stdin) != NULL)
        {
            if (pthread_mutex_lock(mutex) != 0)
                ERR("pthread_mutex_lock");
            if (*idlethreads == 0)
            {
                if (pthread_mutex_unlock(mutex) != 0)
                    ERR("pthread_mutex_unlock");
                fputs("No threads available\n", stderr);
            }
            else
            {
                if (pthread_mutex_unlock(mutex) != 0)
                    ERR("pthread_mutex_unlock");
                *condition = 1;
                if (pthread_cond_signal(cond) != 0)
                    ERR("pthread_cond_signal");
            }
        }
        else
        {
            if (EINTR == errno)
                continue;
            ERR("fgets");
        }
    }
}

int main(int argc, char **argv)
{
    int i, condition = 0, idlethreads = 0;
    pthread_t thread[THREAD_NUM];
    thread_arg targ[THREAD_NUM];
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    set_handler(sigint_handler, SIGINT);
    init(thread, targ, &cond, &mutex, &idlethreads, &condition);
    do_work(&cond, &mutex, &idlethreads, &condition);
    if (pthread_cond_broadcast(&cond) != 0)
        ERR("pthread_cond_broadcast");
    for (i = 0; i < THREAD_NUM; i++)
        if (pthread_join(thread[i], NULL) != 0)
            ERR("pthread_join");
    return EXIT_SUCCESS;
}

Warto samemu dokonać dokładnej analizy powyższego kodu pod kątem elementów nie omawianych tutaj, proszę to zrobić jako ćwiczenie.

Czy warunek dla zmiennej warunkowej może być oparty o jedną zwykłą zmienna?

Answer
Tak.

Czy warunek dla zmiennej warunkowej może być oparty na wartościach kilku zmiennych w programie?

Answer
Tak.

Czy warunek dla zmiennej warunkowej może być oparty na zawartości pliku?

Answer
Tak.

Czy warunek dla zmiennej warunkowej może być oparty na istnieniu pliku? {

Answer
Tak.

Jak można ogólnie określić na czym może być oparty warunek zmiennej warunkowej

Answer
Warunek może być absolutnie dowolny, to kod napisany przez programistę o tym decyduję więc granicą tego czym może być warunek jest tylko wyobraźnia kodującego.

Czy zmienna warunkowa może być użyta bez żadnego warunku?

Answer
Tak, jest wtedy zwykłą pulą czekających wątków które możemy wybudzać pojedynczo lub wszystkie naraz.

Co chroni muteks związany z zmienną warunkowa?

Answer
Muteks ma chronić dostęp do tych elementów programu (zmiennych, plików itd.) które są testowane w warunku zmiennej warunkowej. Chodzi o to, aby w czasie sprawdzania warunku zmiennej (zablokowania muteksu) mieć pewność, że warunek ten nie zostanie zmodyfikowany.

Czy jeden muteks może chronić więcej niż jedną zmienna warunkową?

Answer
Może, ale proszę pamiętaj o wydajności programu, takie rozwiązanie ograniczy też równoległość.

Co wchodzi w skład zmiennej warunkowej w powyższym programie?

Answer
Warunek jest oparty wyłącznie o zmienną "condition", do której wątki mają dostęp poprzez wskaźniki.

Jak działa warunek zmiennej w tym programie?

Answer
Gdy główny przyjmie nowe żądanie ustawia zamienną "condition" na 1 i budzi jeden z wątków czekających. Obudzony wątek jeśli widzi, że "condition==1" i obsługuje żądanie.

Który wątek powinien sprawdzić, że warunek zmiennej jest spełniony? Budzący czy obudzony?

Answer
Warunek musi zawsze być sprawdzony przez wątek budzony, nie ma gwarancji że będzie on spełniony nawet jeśli wątek budzący go sprawdzał ponieważ muteks który blokuje zmianę warunku musi być na chwilę zwolniony pomiędzy wysłaniem prośby o obudzenie a samym obudzeniem. Jeśli w tym czasie muteks zostałby przejęty przez inny wątek to warunek może ulec zmianie! Dobrze jeśli wątek budzący sprawdzi warunek ale jeśli nie może tego zrobić bo nie ma dostępu do wszystkich składowych warunku to można ten krok pominąć.

Czemu służy cleanup handler w wątku roboczym?

Answer
Bardzo ważne jest, aby wątek nie zakończył się nie zwolniwszy muteksu chroniącego zmienna warunkową, to zablokowałoby całą aplikację. Ten handler w razie awaryjnego wyjścia zwolni muteks.

Dice game - barrier #

Zasymuluj następującą grę w kości: Każdy uczestnik jednocześnie rzuca kością sześcienną przez 10 rund. Po każdym rzucie gracza, jeden z graczy posumowuje rundę i przypisuje punkty. Gracz z najwyższym wynikiem w danej rundzie otrzymuje jeden punkt. W przypadku remisu, wszyscy zremisowani gracze otrzymują punkt. Gra kończy się po 10 rundach, a zwycięzcą zostaje gracz z najwyższą sumą punktów. Reprezentuj każdego gracza za pomocą wątku i użyj bariery do synchronizacji gry.

solution prog23.c:

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>

#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))

#define PLAYER_COUNT 4
#define ROUNDS 10

struct arguments
{
    int id;
    unsigned int seed;
    int* scores;
    int* rolls;
    pthread_barrier_t *barrier;
};

void* thread_func(void *arg) {
    struct arguments *args = (struct arguments *)arg;
    for (int round = 0; round < ROUNDS; ++round) {
        args->rolls[args->id] = 1 + rand_r(&args->seed) % 6;
        printf("player %d: Rolled %d.\n", args->id, args->rolls[args->id]);
        
        int result = pthread_barrier_wait(args->barrier);
        
        if(result == PTHREAD_BARRIER_SERIAL_THREAD) {
            printf("player %d: Assigning scores.\n", args->id);
            int max = -1;
            for (int i = 0; i < PLAYER_COUNT; ++i) {
                int roll = args->rolls[i];
                if(roll > max) {
                    max = roll;
                }
            }
            for (int i = 0; i < PLAYER_COUNT; ++i) {
                int roll = args->rolls[i];
                if(roll == max) {
                    args->scores[i] = args->scores[i] + 1;
                    printf("player %d: Player %d got a point.\n", args->id, i);
                }
            }
        }
        pthread_barrier_wait(args->barrier);
    }
    
    return NULL;
}

void create_threads(pthread_t *thread, struct arguments *targ, pthread_barrier_t *barrier, int *scores, int* rolls)
{
    srand(time(NULL));
    int i;
    for (i = 0; i < PLAYER_COUNT; i++)
    {
        targ[i].id = i;
        targ[i].seed = rand();
        targ[i].scores = scores;
        targ[i].rolls = rolls;
        targ[i].barrier = barrier;
        if (pthread_create(&thread[i], NULL, thread_func, (void *)&targ[i]) != 0)
            ERR("pthread_create");
    }
}

int main() {
    pthread_t threads[PLAYER_COUNT];
    struct arguments targ[PLAYER_COUNT];
    int scores[PLAYER_COUNT] = {0};
    int rolls[PLAYER_COUNT];
    pthread_barrier_t barrier;
    
    pthread_barrier_init(&barrier, NULL, PLAYER_COUNT);
    
    create_threads(threads, targ, &barrier, scores, rolls);
    
    for (int i = 0; i < PLAYER_COUNT; i++) {
        pthread_join(threads[i], NULL);
    }
    
    puts("Scores: ");
    for (int i = 0; i < PLAYER_COUNT; ++i) {
        printf("ID %d: %i\n", i, scores[i]);
    }
    
    pthread_barrier_destroy(&barrier);
    
    return 0;
}

Jak działa bariera w tym programie?

Answer
Jest używana do synchronizacji wątków w dwóch kluczowych punktach w funkcji wątku. Bariera zapewnia, że wszystkie uczestniczące wątki osiągną określony punkt swojego wykonania, zanim którykolwiek z nich wznowi działanie. Bariera jest inicjowana z liczbą PLAYER_COUNT, co oznacza, że zablokuje się, dopóki PLAYER_COUNT wątków nie wywoła pthread_barrier_wait.

Które części funkcji wątku są wywoływane współbieżnie?

Answer
Każdy wątek niezależnie rzuca sześcienną kością, a wyniki są przechowywane w tablicy args->rolls. Ten fragment kodu jest wykonywany równocześnie przez wszystkie wątki.

Jak jest wybierany wątek jednego gracza do posumowania rundy?

Answer
pthread_barrier_wait zwraca PTHREAD_BARRIER_SERIAL_THREAD tylko dla jednego wątku (standard nie określa, który to wątek), a dla innych wątków zwraca 0. Ten mechanizm zapewnia, że akcja jest wykonywana tylko przez jeden wątek w każdej rundzie, uniemożliwiając wielokrotne równoczesne wykonanie tego samego kodu, który powinien być wykonany tylko raz.

Wykonaj przykładowe zadania. Podczas laboratorium będziesz miał więcej czasu oraz dostępny startowy kod, jeśli jednak wykonasz poniższe zadania w przewidzianym czasie, to znaczy, że jesteś dobrze przygotowany do zajęć.

Kody źródłowe z treści tutoriala #