Tutorial 4 - Synchronization #
Introduction notes:
- Quick look at this material will not suffice, you should compile and run all the programs, check how they work, read additional materials like man pages. As you read the material please do all the exercises and questions. At the end you will find sample task similar to the one you will do during the labs, please do it at home.
- You will find additional information in yellow sections, questions and tasks in blue ones. Under the question you will find the answer, to see it you have to click. Please try to answer on you own before checking.
- Full programs’ codes are placed as attachments at the bottom of this page. On this page only vital parts of the code are displayed
- Codes, information and tasks are organized in logical sequence, in order to fully understand it you should follow this sequence. Sometimes former task makes context for the next one and it is harder to comprehend it without the study of previous parts.
- Most of exercises require command line to practice, I usually assume that all the files are placed in the current working folder and that we do not need to add path parts to file names.
- Quite often you will find $ sign placed before commands you should run in the shell, obviously you do not need to rewrite this sight to command line, I put it there to remind you that it is a command to execute.
- What you learn and practice in this tutorial will be required for the next ones. If you have a problem with this material after the graded lab you can still ask teachers for help.
The alarm - Semaphores #
Write a multi-threaded timer program. User inputs number of seconds it needs to be counted to the program and awaits response. Program starts separate thread for each new request. The thread sleeps for given time and outputs “Wake up” response. Then the thread exits.
Program has a limit of 5 concurrent threads. If it receives more requests, it outputs immediate response “Only 5 alarms can be set at the time”.
Limit on concurrent threads can be imposed with POSIX semaphore.
solution prog21.c:
#define _GNU_SOURCE
#include <errno.h>
#include <netinet/in.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))
#define FS_NUM 5
#define MAX_INPUT 120
volatile sig_atomic_t work = 1;
void sigint_handler(int sig) { work = 0; }
int set_handler(void (*f)(int), int sigNo)
{
struct sigaction act;
memset(&act, 0, sizeof(struct sigaction));
act.sa_handler = f;
if (-1 == sigaction(sigNo, &act, NULL))
return -1;
return 0;
}
struct arguments
{
int32_t time;
sem_t *semaphore;
};
void *thread_func(void *arg)
{
struct arguments *args = (struct arguments *)arg;
uint32_t tt;
fprintf(stderr, "Will sleep for %d\n", args->time);
for (tt = args->time; tt > 0; tt = sleep(tt))
;
puts("Wake up");
if (sem_post(args->semaphore) == -1)
ERR("sem_post");
free(args);
return NULL;
}
void do_work()
{
int32_t time;
pthread_t thread;
char input[MAX_INPUT];
struct arguments *args;
sem_t semaphore;
if (sem_init(&semaphore, 0, FS_NUM) != 0)
ERR("sem_init");
while (work)
{
puts("Please enter the number of seconds for the alarm delay:");
if(fgets(input, MAX_INPUT, stdin) == NULL) {
if (errno == EINTR)
continue;
ERR("fgets:");
}
time = atoi(input);
if(time <= 0) {
fputs("Incorrect time specified", stderr);
continue;
}
if (TEMP_FAILURE_RETRY(sem_trywait(&semaphore)) == -1)
{
switch (errno)
{
case EAGAIN:
fprintf(stderr, "Only %d alarms can be set at the time.", FS_NUM);
case EINTR:
continue;
}
ERR("sem_trywait");
}
if ((args = (struct arguments *)malloc(sizeof(struct arguments))) == NULL)
ERR("malloc:");
args->time = time;
args->semaphore = &semaphore;
if (pthread_create(&thread, NULL, thread_func, (void *) args) != 0)
ERR("pthread_create");
if (pthread_detach(thread) != 0)
ERR("pthread_detach");
}
}
int main(int argc, char **argv)
{
if (set_handler(sigint_handler, SIGINT))
ERR("Seting SIGINT:");
do_work();
fprintf(stderr, "Program has terminated.\n");
return EXIT_SUCCESS;
}
It is worth of your time to analyze the above code in aspects not covered here (thread, mutex), please do it as an exercise.
Why it is necessary to allocate memory before we start the thread, can this be avoided?Answer
What is the semaphore used for?Answer
Why sem_trywait is used not just sem_wait before the new thread is created? What if we use sem_wait instead?Answer
What limits the concurrent alarms to 5?Answer
Threads pool - conditional variables #
Original author: Jerzy Bartuszek
Write a simple program that reads from “/dev/urandom” and writes its content to files. Every time user presses enter, the program reads random bytes and saves it in a file. The program is a multi-threaded application - each request is being handled in a separate thread. Each thread saves random bytes to its own file. Program creates THREADS_NUM threads in advance and keeps idle threads in a thread pool. After receiving SIGINT server stops accepting user input and terminates its execution after handling all current requests.
solution prog22.c:
#define _GNU_SOURCE
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))
#define BUFFERSIZE 256
#define READCHUNKS 4
#define THREAD_NUM 3
volatile sig_atomic_t work = 1;
typedef struct
{
int id;
int *idlethreads;
int *condition;
pthread_cond_t *cond;
pthread_mutex_t *mutex;
} thread_arg;
void sigint_handler(int sig) { work = 0; }
void set_handler(void (*f)(int), int sigNo)
{
struct sigaction act;
memset(&act, 0x00, sizeof(struct sigaction));
act.sa_handler = f;
if (-1 == sigaction(sigNo, &act, NULL))
ERR("sigaction");
}
ssize_t bulk_read(int fd, char *buf, size_t count)
{
int c;
size_t len = 0;
do
{
c = TEMP_FAILURE_RETRY(read(fd, buf, count));
if (c < 0)
return c;
if (c == 0)
return len;
buf += c;
len += c;
count -= c;
} while (count > 0);
return len;
}
ssize_t bulk_write(int fd, char *buf, size_t count)
{
int c;
size_t len = 0;
do
{
c = TEMP_FAILURE_RETRY(write(fd, buf, count));
if (c < 0)
return c;
buf += c;
len += c;
count -= c;
} while (count > 0);
return len;
}
void cleanup(void *arg) { pthread_mutex_unlock((pthread_mutex_t *)arg); }
void read_random(int thread_id)
{
char file_name[20];
char buffer[BUFFERSIZE];
snprintf(file_name, sizeof(file_name), "random%d.bin", thread_id);
printf("Writing to a file %s\n", file_name);
int i, in, out;
ssize_t count;
if ((out = TEMP_FAILURE_RETRY(open(file_name, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND, 0777))) < 0)
ERR("open");
if ((in = TEMP_FAILURE_RETRY(open("/dev/urandom", O_RDONLY))) < 0)
ERR("open");
for (i = 0; i < READCHUNKS; i++)
{
if ((count = bulk_read(in, buffer, BUFFERSIZE)) < 0)
ERR("bulk_read");
if ((count = bulk_write(out, buffer, count)) < 0)
ERR("bulk_write");
sleep(1);
}
if (TEMP_FAILURE_RETRY(close(in)))
ERR("close");
if (TEMP_FAILURE_RETRY(close(out)))
ERR("close");
}
void *thread_func(void *arg)
{
thread_arg targ;
memcpy(&targ, arg, sizeof(targ));
while (1)
{
pthread_cleanup_push(cleanup, (void *)targ.mutex);
if (pthread_mutex_lock(targ.mutex) != 0)
ERR("pthread_mutex_lock");
(*targ.idlethreads)++;
while (!*targ.condition && work)
if (pthread_cond_wait(targ.cond, targ.mutex) != 0)
ERR("pthread_cond_wait");
*targ.condition = 0;
if (!work)
pthread_exit(NULL);
(*targ.idlethreads)--;
pthread_cleanup_pop(1);
read_random(targ.id);
}
return NULL;
}
void init(pthread_t *thread, thread_arg *targ, pthread_cond_t *cond, pthread_mutex_t *mutex, int *idlethreads,
int *condition)
{
int i;
for (i = 0; i < THREAD_NUM; i++)
{
targ[i].id = i + 1;
targ[i].cond = cond;
targ[i].mutex = mutex;
targ[i].idlethreads = idlethreads;
targ[i].condition = condition;
if (pthread_create(&thread[i], NULL, thread_func, (void *) &targ[i]) != 0)
ERR("pthread_create");
}
}
void do_work(pthread_cond_t *cond, pthread_mutex_t *mutex, const int *idlethreads, int *condition)
{
char buffer[BUFFERSIZE];
while (work)
{
if (fgets(buffer, BUFFERSIZE, stdin) != NULL)
{
if (pthread_mutex_lock(mutex) != 0)
ERR("pthread_mutex_lock");
if (*idlethreads == 0)
{
if (pthread_mutex_unlock(mutex) != 0)
ERR("pthread_mutex_unlock");
fputs("No threads available\n", stderr);
}
else
{
if (pthread_mutex_unlock(mutex) != 0)
ERR("pthread_mutex_unlock");
*condition = 1;
if (pthread_cond_signal(cond) != 0)
ERR("pthread_cond_signal");
}
}
else
{
if (EINTR == errno)
continue;
ERR("fgets");
}
}
}
int main(int argc, char **argv)
{
int i, condition = 0, idlethreads = 0;
pthread_t thread[THREAD_NUM];
thread_arg targ[THREAD_NUM];
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
set_handler(sigint_handler, SIGINT);
init(thread, targ, &cond, &mutex, &idlethreads, &condition);
do_work(&cond, &mutex, &idlethreads, &condition);
if (pthread_cond_broadcast(&cond) != 0)
ERR("pthread_cond_broadcast");
for (i = 0; i < THREAD_NUM; i++)
if (pthread_join(thread[i], NULL) != 0)
ERR("pthread_join");
return EXIT_SUCCESS;
}
It is worth of your time to analyze the above code in aspects not covered here (thread, mutex), please do it as an exercise.
Can a condition of conditional variable be based on regular variable value?Answer
Can a condition of conditional variable be based on a combination of regular variables’ values?Answer
Can a condition of conditional variable be based on file content?Answer
Can a condition of conditional variable be based on file existence?
{Answer
What are the limitations for the condition of conditional variable?Answer
Can we use conditional variable without any condition at all?Answer
Conditional variable must have a mutex, what is protected by it?Answer
Can one mutex protect multiple conditional variables?Answer
What are the parts of the condition for the conditional variable in the above code?Answer
How does the conditional variable works in this program?Answer
Who should check for the condition to be true? The thread that wakes or maybe the one that is being wakened?Answer
What is cleanup handler in working thread used for?Answer
Dice game - barrier #
Simulate a following dice game: Each participant rolls a standard six-sided die simultaneously in 10 rounds. After each player rolled, one of the players concludes the round and assigns scores. The player with the highest roll in a given round is awarded one point. In the event of a tie for the highest roll, all tied players receive a point. The game concludes after 10 rounds, and the winner is determined by the player with the highest accumulated points. Represent each player by a thread and use a barrier for the game synchronization.
solution prog23.c:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))
#define PLAYER_COUNT 4
#define ROUNDS 10
struct arguments
{
int id;
unsigned int seed;
int* scores;
int* rolls;
pthread_barrier_t *barrier;
};
void* thread_func(void *arg) {
struct arguments *args = (struct arguments *)arg;
for (int round = 0; round < ROUNDS; ++round) {
args->rolls[args->id] = 1 + rand_r(&args->seed) % 6;
printf("player %d: Rolled %d.\n", args->id, args->rolls[args->id]);
int result = pthread_barrier_wait(args->barrier);
if(result == PTHREAD_BARRIER_SERIAL_THREAD) {
printf("player %d: Assigning scores.\n", args->id);
int max = -1;
for (int i = 0; i < PLAYER_COUNT; ++i) {
int roll = args->rolls[i];
if(roll > max) {
max = roll;
}
}
for (int i = 0; i < PLAYER_COUNT; ++i) {
int roll = args->rolls[i];
if(roll == max) {
args->scores[i] = args->scores[i] + 1;
printf("player %d: Player %d got a point.\n", args->id, i);
}
}
}
pthread_barrier_wait(args->barrier);
}
return NULL;
}
void create_threads(pthread_t *thread, struct arguments *targ, pthread_barrier_t *barrier, int *scores, int* rolls)
{
srand(time(NULL));
int i;
for (i = 0; i < PLAYER_COUNT; i++)
{
targ[i].id = i;
targ[i].seed = rand();
targ[i].scores = scores;
targ[i].rolls = rolls;
targ[i].barrier = barrier;
if (pthread_create(&thread[i], NULL, thread_func, (void *)&targ[i]) != 0)
ERR("pthread_create");
}
}
int main() {
pthread_t threads[PLAYER_COUNT];
struct arguments targ[PLAYER_COUNT];
int scores[PLAYER_COUNT] = {0};
int rolls[PLAYER_COUNT];
pthread_barrier_t barrier;
pthread_barrier_init(&barrier, NULL, PLAYER_COUNT);
create_threads(threads, targ, &barrier, scores, rolls);
for (int i = 0; i < PLAYER_COUNT; i++) {
pthread_join(threads[i], NULL);
}
puts("Scores: ");
for (int i = 0; i < PLAYER_COUNT; ++i) {
printf("ID %d: %i\n", i, scores[i]);
}
pthread_barrier_destroy(&barrier);
return 0;
}
How does the barrier works in this program?Answer
Which parts of the thread function are called concurrently?Answer
How is the one player thread selected to conclude the round?Answer
Do the example tasks. During the laboratory you will have more time and a starting code. If you do following tasks in the allotted time, it means that you are well-prepared.