L4 - Synchronizacja #
Uwagi wstępne:
- Szybkie przejrzenie tutoriala prawdopodobnie nic nie pomoże, należy samodzielnie uruchomić programy, sprawdzić jak działają, poczytać materiały dodatkowe takie jak strony man. W trakcie czytania sugeruję wykonywać ćwiczenia a na koniec przykładowe zadanie.
- Na żółtych polach podaję dodatkowe informacje, niebieskie zawierają pytania i ćwiczenia. Pod pytaniami znajdują się odpowiedzi, które staną się widoczne dopiero po kliknięciu. Proszę najpierw spróbować sobie odpowiedzieć na pytanie samemu a dopiero potem sprawdzać odpowiedź.
- Pełne kody do zajęć znajdują się w załącznikach na dole strony. W tekście są tylko te linie kodu, które są konieczne do zrozumienia problemu.
- Materiały i ćwiczenia są ułożone w pewną logiczną całość, czasem do wykonania ćwiczenia konieczny jest stan osiągnięty poprzednim ćwiczeniem dlatego zalecam wykonywanie ćwiczeń w miarę przyswajania materiału.
- Większość ćwiczeń wymaga użycia konsoli poleceń, zazwyczaj zakładam, ze pracujemy w jednym i tym samym katalogu roboczym więc wszystkie potrzebne pliki są “pod ręką” tzn. nie ma potrzeby podawania ścieżek dostępu.
- Czasem podaję znak $ aby podkreślić, że chodzi o polecenie konsolowe, nie piszemy go jednak w konsoli np.: piszę “$make” w konsoli wpisujemy samo “make”.
- To co ćwiczymy wróci podczas kolejnych zajęć. Jeśli po zajęciach i teście coś nadal pozostaje niejasne proszę to poćwiczyć a jeśli trzeba dopytać się u prowadzących.
The alarm - Semaphores #
Napisz wielowątkowy program zegara. Użytkownik wprowadza liczbę sekund, które program ma odmierzyć, a następnie oczekuje na odpowiedź. Program uruchamia osobny wątek dla każdego nowego żądania. Wątek usypia się na określony czas i wypisuje komunikat “Wake up”. Następnie wątek kończy działanie.
Program ma ograniczenie 5 równoczesnych wątków. Jeśli program otrzyma więcej żądań, natychmiastowo wyświetli komunikat “Only 5 alarms can be set at the time”.
Ograniczenie na współbieżne wątki może być narzucone za pomocą semafora POSIX.
solution prog21.c:
#define _GNU_SOURCE
#include <errno.h>
#include <netinet/in.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))
#define FS_NUM 5
#define MAX_INPUT 120
volatile sig_atomic_t work = 1;
void sigint_handler(int sig) { work = 0; }
int set_handler(void (*f)(int), int sigNo)
{
struct sigaction act;
memset(&act, 0, sizeof(struct sigaction));
act.sa_handler = f;
if (-1 == sigaction(sigNo, &act, NULL))
return -1;
return 0;
}
struct arguments
{
int32_t time;
sem_t *semaphore;
};
void *thread_func(void *arg)
{
struct arguments *args = (struct arguments *)arg;
uint32_t tt;
fprintf(stderr, "Will sleep for %d\n", args->time);
for (tt = args->time; tt > 0; tt = sleep(tt))
;
puts("Wake up");
if (sem_post(args->semaphore) == -1)
ERR("sem_post");
free(args);
return NULL;
}
void do_work()
{
int32_t time;
pthread_t thread;
char input[MAX_INPUT];
struct arguments *args;
sem_t semaphore;
if (sem_init(&semaphore, 0, FS_NUM) != 0)
ERR("sem_init");
while (work)
{
puts("Please enter the number of seconds for the alarm delay:");
if(fgets(input, MAX_INPUT, stdin) == NULL) {
if (errno == EINTR)
continue;
ERR("fgets:");
}
time = atoi(input);
if(time <= 0) {
fputs("Incorrect time specified", stderr);
continue;
}
if (TEMP_FAILURE_RETRY(sem_trywait(&semaphore)) == -1)
{
switch (errno)
{
case EAGAIN:
fprintf(stderr, "Only %d alarms can be set at the time.", FS_NUM);
case EINTR:
continue;
}
ERR("sem_trywait");
}
if ((args = (struct arguments *)malloc(sizeof(struct arguments))) == NULL)
ERR("malloc:");
args->time = time;
args->semaphore = &semaphore;
if (pthread_create(&thread, NULL, thread_func, (void *) args) != 0)
ERR("pthread_create");
if (pthread_detach(thread) != 0)
ERR("pthread_detach");
}
}
int main(int argc, char **argv)
{
if (set_handler(sigint_handler, SIGINT))
ERR("Seting SIGINT:");
do_work();
fprintf(stderr, "Program has terminated.\n");
return EXIT_SUCCESS;
}
Warto samemu dokonać dokładnej analizy powyższego kodu pod kątem elementów nie omawianych tutaj, proszę to zrobić jako ćwiczenie.
Czemu przed uruchomieniem wątku w serwerze alokujemy pamięć? Czy można się tej alokacji pozbyć z kodu?Answer
Gdzie jest odliczany czas?Answer
Czemu służy semafor?Answer
Czemu przed utworzeniem kolejnego wątku roboczego w programie używamy sem_trywait? Co się stanie gdy zamienimy to na sem_wait?Answer
Co powoduje limitowanie ustawionych alarmów do 5 naraz?Answer
Threads pool - conditional variables #
Original author: Jerzy Bartuszek
Napisz prosty program, który czyta z “/dev/urandom” i zapisuje jego zawartość do plików. Za każdym razem, gdy użytkownik naciśnie Enter, program odczytuje losowe bajty i zapisuje je do pliku. Program jest wielowątkową aplikacją - każde żądanie obsługiwane jest w osobnym wątku. Każdy wątek zapisuje losowe bajty do swojego własnego pliku. Program tworzy wątki z góry określoną liczbę THREADS_NUM i przechowuje nieużywane wątki w puli wątków. Po otrzymaniu sygnału SIGINT serwer przestaje akceptować wejście od użytkownika i kończy swoje wykonanie po obsłużeniu wszystkich bieżących żądań.
solution prog22.c:
#define _GNU_SOURCE
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))
#define BUFFERSIZE 256
#define READCHUNKS 4
#define THREAD_NUM 3
volatile sig_atomic_t work = 1;
typedef struct
{
int id;
int *idlethreads;
int *condition;
pthread_cond_t *cond;
pthread_mutex_t *mutex;
} thread_arg;
void sigint_handler(int sig) { work = 0; }
void set_handler(void (*f)(int), int sigNo)
{
struct sigaction act;
memset(&act, 0x00, sizeof(struct sigaction));
act.sa_handler = f;
if (-1 == sigaction(sigNo, &act, NULL))
ERR("sigaction");
}
ssize_t bulk_read(int fd, char *buf, size_t count)
{
int c;
size_t len = 0;
do
{
c = TEMP_FAILURE_RETRY(read(fd, buf, count));
if (c < 0)
return c;
if (c == 0)
return len;
buf += c;
len += c;
count -= c;
} while (count > 0);
return len;
}
ssize_t bulk_write(int fd, char *buf, size_t count)
{
int c;
size_t len = 0;
do
{
c = TEMP_FAILURE_RETRY(write(fd, buf, count));
if (c < 0)
return c;
buf += c;
len += c;
count -= c;
} while (count > 0);
return len;
}
void cleanup(void *arg) { pthread_mutex_unlock((pthread_mutex_t *)arg); }
void read_random(int thread_id)
{
char file_name[20];
char buffer[BUFFERSIZE];
snprintf(file_name, sizeof(file_name), "random%d.bin", thread_id);
printf("Writing to a file %s\n", file_name);
int i, in, out;
ssize_t count;
if ((out = TEMP_FAILURE_RETRY(open(file_name, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND, 0777))) < 0)
ERR("open");
if ((in = TEMP_FAILURE_RETRY(open("/dev/urandom", O_RDONLY))) < 0)
ERR("open");
for (i = 0; i < READCHUNKS; i++)
{
if ((count = bulk_read(in, buffer, BUFFERSIZE)) < 0)
ERR("bulk_read");
if ((count = bulk_write(out, buffer, count)) < 0)
ERR("bulk_write");
sleep(1);
}
if (TEMP_FAILURE_RETRY(close(in)))
ERR("close");
if (TEMP_FAILURE_RETRY(close(out)))
ERR("close");
}
void *thread_func(void *arg)
{
thread_arg targ;
memcpy(&targ, arg, sizeof(targ));
while (1)
{
pthread_cleanup_push(cleanup, (void *)targ.mutex);
if (pthread_mutex_lock(targ.mutex) != 0)
ERR("pthread_mutex_lock");
(*targ.idlethreads)++;
while (!*targ.condition && work)
if (pthread_cond_wait(targ.cond, targ.mutex) != 0)
ERR("pthread_cond_wait");
*targ.condition = 0;
if (!work)
pthread_exit(NULL);
(*targ.idlethreads)--;
pthread_cleanup_pop(1);
read_random(targ.id);
}
return NULL;
}
void init(pthread_t *thread, thread_arg *targ, pthread_cond_t *cond, pthread_mutex_t *mutex, int *idlethreads,
int *condition)
{
int i;
for (i = 0; i < THREAD_NUM; i++)
{
targ[i].id = i + 1;
targ[i].cond = cond;
targ[i].mutex = mutex;
targ[i].idlethreads = idlethreads;
targ[i].condition = condition;
if (pthread_create(&thread[i], NULL, thread_func, (void *) &targ[i]) != 0)
ERR("pthread_create");
}
}
void do_work(pthread_cond_t *cond, pthread_mutex_t *mutex, const int *idlethreads, int *condition)
{
char buffer[BUFFERSIZE];
while (work)
{
if (fgets(buffer, BUFFERSIZE, stdin) != NULL)
{
if (pthread_mutex_lock(mutex) != 0)
ERR("pthread_mutex_lock");
if (*idlethreads == 0)
{
if (pthread_mutex_unlock(mutex) != 0)
ERR("pthread_mutex_unlock");
fputs("No threads available\n", stderr);
}
else
{
if (pthread_mutex_unlock(mutex) != 0)
ERR("pthread_mutex_unlock");
*condition = 1;
if (pthread_cond_signal(cond) != 0)
ERR("pthread_cond_signal");
}
}
else
{
if (EINTR == errno)
continue;
ERR("fgets");
}
}
}
int main(int argc, char **argv)
{
int i, condition = 0, idlethreads = 0;
pthread_t thread[THREAD_NUM];
thread_arg targ[THREAD_NUM];
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
set_handler(sigint_handler, SIGINT);
init(thread, targ, &cond, &mutex, &idlethreads, &condition);
do_work(&cond, &mutex, &idlethreads, &condition);
if (pthread_cond_broadcast(&cond) != 0)
ERR("pthread_cond_broadcast");
for (i = 0; i < THREAD_NUM; i++)
if (pthread_join(thread[i], NULL) != 0)
ERR("pthread_join");
return EXIT_SUCCESS;
}
Warto samemu dokonać dokładnej analizy powyższego kodu pod kątem elementów nie omawianych tutaj, proszę to zrobić jako ćwiczenie.
Czy warunek dla zmiennej warunkowej może być oparty o jedną zwykłą zmienna?Answer
Czy warunek dla zmiennej warunkowej może być oparty na wartościach kilku zmiennych w programie?Answer
Czy warunek dla zmiennej warunkowej może być oparty na zawartości pliku?Answer
Czy warunek dla zmiennej warunkowej może być oparty na istnieniu pliku?
{Answer
Jak można ogólnie określić na czym może być oparty warunek zmiennej warunkowejAnswer
Czy zmienna warunkowa może być użyta bez żadnego warunku?Answer
Co chroni muteks związany z zmienną warunkowa?Answer
Czy jeden muteks może chronić więcej niż jedną zmienna warunkową?Answer
Co wchodzi w skład zmiennej warunkowej w powyższym programie?Answer
Jak działa warunek zmiennej w tym programie?Answer
Który wątek powinien sprawdzić, że warunek zmiennej jest spełniony? Budzący czy obudzony?Answer
Czemu służy cleanup handler w wątku roboczym?Answer
Dice game - barrier #
Zasymuluj następującą grę w kości: Każdy uczestnik jednocześnie rzuca kością sześcienną przez 10 rund. Po każdym rzucie gracza, jeden z graczy posumowuje rundę i przypisuje punkty. Gracz z najwyższym wynikiem w danej rundzie otrzymuje jeden punkt. W przypadku remisu, wszyscy zremisowani gracze otrzymują punkt. Gra kończy się po 10 rundach, a zwycięzcą zostaje gracz z najwyższą sumą punktów. Reprezentuj każdego gracza za pomocą wątku i użyj bariery do synchronizacji gry.
solution prog23.c:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))
#define PLAYER_COUNT 4
#define ROUNDS 10
struct arguments
{
int id;
unsigned int seed;
int* scores;
int* rolls;
pthread_barrier_t *barrier;
};
void* thread_func(void *arg) {
struct arguments *args = (struct arguments *)arg;
for (int round = 0; round < ROUNDS; ++round) {
args->rolls[args->id] = 1 + rand_r(&args->seed) % 6;
printf("player %d: Rolled %d.\n", args->id, args->rolls[args->id]);
int result = pthread_barrier_wait(args->barrier);
if(result == PTHREAD_BARRIER_SERIAL_THREAD) {
printf("player %d: Assigning scores.\n", args->id);
int max = -1;
for (int i = 0; i < PLAYER_COUNT; ++i) {
int roll = args->rolls[i];
if(roll > max) {
max = roll;
}
}
for (int i = 0; i < PLAYER_COUNT; ++i) {
int roll = args->rolls[i];
if(roll == max) {
args->scores[i] = args->scores[i] + 1;
printf("player %d: Player %d got a point.\n", args->id, i);
}
}
}
pthread_barrier_wait(args->barrier);
}
return NULL;
}
void create_threads(pthread_t *thread, struct arguments *targ, pthread_barrier_t *barrier, int *scores, int* rolls)
{
srand(time(NULL));
int i;
for (i = 0; i < PLAYER_COUNT; i++)
{
targ[i].id = i;
targ[i].seed = rand();
targ[i].scores = scores;
targ[i].rolls = rolls;
targ[i].barrier = barrier;
if (pthread_create(&thread[i], NULL, thread_func, (void *)&targ[i]) != 0)
ERR("pthread_create");
}
}
int main() {
pthread_t threads[PLAYER_COUNT];
struct arguments targ[PLAYER_COUNT];
int scores[PLAYER_COUNT] = {0};
int rolls[PLAYER_COUNT];
pthread_barrier_t barrier;
pthread_barrier_init(&barrier, NULL, PLAYER_COUNT);
create_threads(threads, targ, &barrier, scores, rolls);
for (int i = 0; i < PLAYER_COUNT; i++) {
pthread_join(threads[i], NULL);
}
puts("Scores: ");
for (int i = 0; i < PLAYER_COUNT; ++i) {
printf("ID %d: %i\n", i, scores[i]);
}
pthread_barrier_destroy(&barrier);
return 0;
}
Jak działa bariera w tym programie?Answer
Które części funkcji wątku są wywoływane współbieżnie?Answer
Jak jest wybierany wątek jednego gracza do posumowania rundy?Answer
Wykonaj przykładowe zadania. Podczas laboratorium będziesz miał więcej czasu oraz dostępny startowy kod, jeśli jednak wykonasz poniższe zadania w przewidzianym czasie, to znaczy, że jesteś dobrze przygotowany do zajęć.