L3 - Threads, mutexes and signals

Tutorial 3 - Threads, mutexes and signals #

Introduction notes:

  • Quick look at this material will not suffice, you should compile and run all the programs, check how they work, read additional materials like man pages. As you read the material please do all the exercises and questions. At the end you will find sample task similar to the one you will do during the labs, please do it at home.
  • You will find additional information in yellow sections, questions and tasks in blue ones. Under the question you will find the answer, to see it you have to click. Please try to answer on you own before checking.
  • Full programs’ codes are placed as attachments at the bottom of this page. On this page only vital parts of the code are displayed
  • Codes, information and tasks are organized in logical sequence, in order to fully understand it you should follow this sequence. Sometimes former task makes context for the next one and it is harder to comprehend it without the study of previous parts.
  • Most of exercises require command line to practice, I usually assume that all the files are placed in the current working folder and that we do not need to add path parts to file names.
  • What you learn and practice in this tutorial will be required for the next ones. If you have a problem with this material after the graded lab you can still ask teachers for help.
  • This tutorial is based on student’s examples, it has some minor flaws in parts not related to the threads (I left them on purpose and explain them in comments). Some repeating flaws not mentioned in tasks are:
    • Main functions are too long, it should be easy to split them
    • Bool data type requires extra header and can be easily avoided by using integers (0 and 1)
    • So called “magic numbers” are left in code - those numerical constants should be replaced with “C” preprocesor macro

This tutorial is based on tasks and code examples provided by student Mariusz Kowalski.

Task 1 - simple joinable threads, synchronization and threads return values #

Goal: Write a program to approximate PI value with Monte Carlo method, program takes the following parameters:

  • k … number of threads used to approximate,
  • n … number of random tries of each thread.

Each thread (except for the main) should conduct its own estimation, at the end the main thread gathers the results from co-threads and calculate the average value from all the sub-results.

What you need to know:

  • man 7 pthreads
  • man 3p pthread_create
  • man 3p pthread_join
  • man 3p rand (especially rand_r)
  • Monte-Carlo method, in paragraph “Monte Carlo methods” on this site.

Solution Makefile:

CC=gcc
CFLAGS=-std=gnu99 -Wall -fsanitize=address,undefined
LDFLAGS=-fsanitize=address,undefined
LDLIBS=-lpthread -lm

Flag -lpthread is mandatory for all compilations in this tutorial. Linked library is called libpthread.so (after -l we write the name of the library without first “lib” part)

Flag -lm adds math library, this library is called lm.so (not libmath.so as you could have guessed)

Solution prog17.c:

#include <math.h>
#include <pthread.h>
#include <stdarg.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define MAXLINE 4096
#define DEFAULT_THREADCOUNT 10
#define DEFAULT_SAMPLESIZE 100

#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))

typedef unsigned int UINT;
typedef struct argsEstimation
{
    pthread_t tid;
    UINT seed;
    int samplesCount;
} argsEstimation_t;

void ReadArguments(int argc, char **argv, int *threadCount, int *samplesCount);
void *pi_estimation(void *args);

int main(int argc, char **argv)
{
    int threadCount, samplesCount;
    double *subresult;
    ReadArguments(argc, argv, &threadCount, &samplesCount);
    argsEstimation_t *estimations = (argsEstimation_t *)malloc(sizeof(argsEstimation_t) * threadCount);
    if (estimations == NULL)
        ERR("Malloc error for estimation arguments!");
    srand(time(NULL));
    for (int i = 0; i < threadCount; i++)
    {
        estimations[i].seed = rand();
        estimations[i].samplesCount = samplesCount;
    }
    for (int i = 0; i < threadCount; i++)
    {
        int err = pthread_create(&(estimations[i].tid), NULL, pi_estimation, &estimations[i]);
        if (err != 0)
            ERR("Couldn't create thread");
    }
    double cumulativeResult = 0.0;
    for (int i = 0; i < threadCount; i++)
    {
        int err = pthread_join(estimations[i].tid, (void *)&subresult);
        if (err != 0)
            ERR("Can't join with a thread");
        if (NULL != subresult)
        {
            cumulativeResult += *subresult;
            free(subresult);
        }
    }
    double result = cumulativeResult / threadCount;
    printf("PI ~= %f\n", result);
    free(estimations);
}

void ReadArguments(int argc, char **argv, int *threadCount, int *samplesCount)
{
    *threadCount = DEFAULT_THREADCOUNT;
    *samplesCount = DEFAULT_SAMPLESIZE;

    if (argc >= 2)
    {
        *threadCount = atoi(argv[1]);
        if (*threadCount <= 0)
        {
            printf("Invalid value for 'threadCount'");
            exit(EXIT_FAILURE);
        }
    }
    if (argc >= 3)
    {
        *samplesCount = atoi(argv[2]);
        if (*samplesCount <= 0)
        {
            printf("Invalid value for 'samplesCount'");
            exit(EXIT_FAILURE);
        }
    }
}

void *pi_estimation(void *voidPtr)
{
    argsEstimation_t *args = voidPtr;
    double *result;
    if (NULL == (result = malloc(sizeof(double))))
        ERR("malloc");

    int insideCount = 0;
    for (int i = 0; i < args->samplesCount; i++)
    {
        double x = ((double)rand_r(&args->seed) / (double)RAND_MAX);
        double y = ((double)rand_r(&args->seed) / (double)RAND_MAX);
        if (sqrt(x * x + y * y) <= 1.0)
            insideCount++;
    }
    *result = 4.0 * (double)insideCount / (double)args->samplesCount;
    return result;
}

This and following programs do not show USAGE information, the default parameters values are assumed if options are missing. Run it without parameters to see how it works.

Functions’ declarations at the beginning of the code (not the functions definitions) are quite useful, sometimes mandatory. If you do not know the difference please read this.

In multi threaded processes you can not correctly use rand() function, use rand_r() instead. The later one requires individual seed for every thread.

This program uses the simplest schema for threads lifetime. It creates some threads and then immediately waits for them to finish. More complex scenarios are possible

Please keep in mind that nearly every call to system function (and most calls to library functions) should be followed with the test on errors and if necessary by the proper reaction on the error.

ERR macro does not send “kill” signal as in multi-process program, why ?

Answer
Call to exit terminates all the threads in current process, there is no need to "kill" other processes.

How input data is passed to the new threads?

Answer
Exclusively by the pointer to structure argsEstimation_t that is passed as thread function arguments. There is no need (nor the excuse) to use global variables!

Is the thread input data shared between the threads?

Answer
Not in this program. In this case there is no need to synchronize the access to this data. Each thread gets a pointer to the private copy of the structure.

How the random seed for rand_r() is prepared for each thread?

Answer
It is randomized in the main thread and passed as a part of input data in argsEstimation_t .

In this multi thread program we see srand/rand calls, is this correct? It contradicts what was said a few lines above.

Answer
Only one thread is using srand/rand and those functions are called before working threads come to life. The problem with srand/rand and threads originates from one global variable used in the library to hold the current seed. In this code only on thread access this seed thus it is correct.

Can we share one input data structure for all the threads instead of having a copy for every thread?

Answer
No due to random seed, it must be different for all the threads.

Can we make the array with the thread input data automatic variable (not allocated)?

Answer
Only if we add some limit on the number of working threads (up to 1000) otherwise this array may use all the stack of the main thread.

Why do we need to release the memory returned from the working thread?

Answer
This memory was allocated in the thread and has to be released somewhere in the same process, The heap is sheared by all the threads if you do not release it you will leak the memory. It will not be released automatically on the thread termination.

Why can’t we return the data as the address of local (to the thread) automatic variable?

Answer
The moment thread terminates is the moment of its stack memory release. If you have a pointer to this released stack you should not use it as this memory can be overwritten immediately. What worse, in most cases this memory will stil be the same and faulty program will work in 90% of cases. If you make this kind of mistake it is later very hard to find out why sometimes your code fails. Please be careful and try to avoid this flaw.

can we avoid memory allocation in the working thread?

Answer
Yes, if we add extra variable to the input structure of the thread. The result can then be stored in this variable.

Task 2 - simple detachable threads with common variables and access mutex #

Goal: Write binomial distribution visualization program, use Bean Machine (Galton board) method with 11 bins for bens. The program takes two parameters:

  • k … number of beans trowing threads
  • n … total number of beans to throw

Each thread throws beans separately from others, after each throw, number of beans in bins must me updated. Main thread at each second checks if the simulation has completed (not using thread_join). At the end main thread prints the content of the bins and the mean value of beans.

What you need to know:

  • man 3p pthread_mutex_destroy (full description)
  • man 3p pthread_mutex_lock
  • man 3p pthread_mutex_unlock
  • man 3p pthread_detach
  • man 3p pthread_attr_init
  • man 3p pthread_attr_destroy
  • man 3p pthread_attr_setdetachstate
  • man 3p pthread_attr_getdetachstate
  • Bean machine on this site.

Solution prog18.c:

#include <pthread.h>
#include <stdarg.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define MAXLINE 4096
#define DEFAULT_N 1000
#define DEFAULT_K 10
#define BIN_COUNT 11
#define NEXT_DOUBLE(seedptr) ((double)rand_r(seedptr) / (double)RAND_MAX)
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))

typedef unsigned int UINT;
typedef struct argsThrower
{
    pthread_t tid;
    UINT seed;
    int *pBallsThrown;
    int *pBallsWaiting;
    int *bins;
    pthread_mutex_t *mxBins;
    pthread_mutex_t *pmxBallsThrown;
    pthread_mutex_t *pmxBallsWaiting;
} argsThrower_t;

void ReadArguments(int argc, char **argv, int *ballsCount, int *throwersCount);
void make_throwers(argsThrower_t *argsArray, int throwersCount);
void *throwing_func(void *args);
int throwBall(UINT *seedptr);

int main(int argc, char **argv)
{
    int ballsCount, throwersCount;
    ReadArguments(argc, argv, &ballsCount, &throwersCount);
    int ballsThrown = 0, bt = 0;
    int ballsWaiting = ballsCount;
    pthread_mutex_t mxBallsThrown = PTHREAD_MUTEX_INITIALIZER;
    pthread_mutex_t mxBallsWaiting = PTHREAD_MUTEX_INITIALIZER;
    int bins[BIN_COUNT];
    pthread_mutex_t mxBins[BIN_COUNT];
    for (int i = 0; i < BIN_COUNT; i++)
    {
        bins[i] = 0;
        if (pthread_mutex_init(&mxBins[i], NULL))
            ERR("Couldn't initialize mutex!");
    }
    argsThrower_t *args = (argsThrower_t *)malloc(sizeof(argsThrower_t) * throwersCount);
    if (args == NULL)
        ERR("Malloc error for throwers arguments!");
    srand(time(NULL));
    for (int i = 0; i < throwersCount; i++)
    {
        args[i].seed = (UINT)rand();
        args[i].pBallsThrown = &ballsThrown;
        args[i].pBallsWaiting = &ballsWaiting;
        args[i].bins = bins;
        args[i].pmxBallsThrown = &mxBallsThrown;
        args[i].pmxBallsWaiting = &mxBallsWaiting;
        args[i].mxBins = mxBins;
    }
    make_throwers(args, throwersCount);
    while (bt < ballsCount)
    {
        sleep(1);
        pthread_mutex_lock(&mxBallsThrown);
        bt = ballsThrown;
        pthread_mutex_unlock(&mxBallsThrown);
    }
    int realBallsCount = 0;
    double meanValue = 0.0;
    for (int i = 0; i < BIN_COUNT; i++)
    {
        realBallsCount += bins[i];
        meanValue += bins[i] * i;
    }
    meanValue = meanValue / realBallsCount;
    printf("Bins count:\n");
    for (int i = 0; i < BIN_COUNT; i++)
        printf("%d\t", bins[i]);
    printf("\nTotal balls count : %d\nMean value: %f\n", realBallsCount, meanValue);
    // for (int i = 0; i < BIN_COUNT; i++) pthread_mutex_destroy(&mxBins[i]);
    // free(args);
    // The resources used by detached threads cannod be freed as we are not sure
    // if they are running yet.
    exit(EXIT_SUCCESS);
}

void ReadArguments(int argc, char **argv, int *ballsCount, int *throwersCount)
{
    *ballsCount = DEFAULT_N;
    *throwersCount = DEFAULT_K;
    if (argc >= 2)
    {
        *ballsCount = atoi(argv[1]);
        if (*ballsCount <= 0)
        {
            printf("Invalid value for 'balls count'");
            exit(EXIT_FAILURE);
        }
    }
    if (argc >= 3)
    {
        *throwersCount = atoi(argv[2]);
        if (*throwersCount <= 0)
        {
            printf("Invalid value for 'throwers count'");
            exit(EXIT_FAILURE);
        }
    }
}

void make_throwers(argsThrower_t *argsArray, int throwersCount)
{
    pthread_attr_t threadAttr;
    if (pthread_attr_init(&threadAttr))
        ERR("Couldn't create pthread_attr_t");
    if (pthread_attr_setdetachstate(&threadAttr, PTHREAD_CREATE_DETACHED))
        ERR("Couldn't setdetachsatate on pthread_attr_t");
    for (int i = 0; i < throwersCount; i++)
    {
        if (pthread_create(&argsArray[i].tid, &threadAttr, throwing_func, &argsArray[i]))
            ERR("Couldn't create thread");
    }
    pthread_attr_destroy(&threadAttr);
}

void *throwing_func(void *voidArgs)
{
    argsThrower_t *args = voidArgs;
    while (1)
    {
        pthread_mutex_lock(args->pmxBallsWaiting);
        if (*args->pBallsWaiting > 0)
        {
            (*args->pBallsWaiting) -= 1;
            pthread_mutex_unlock(args->pmxBallsWaiting);
        }
        else
        {
            pthread_mutex_unlock(args->pmxBallsWaiting);
            break;
        }
        int binno = throwBall(&args->seed);
        pthread_mutex_lock(&args->mxBins[binno]);
        args->bins[binno] += 1;
        pthread_mutex_unlock(&args->mxBins[binno]);
        pthread_mutex_lock(args->pmxBallsThrown);
        (*args->pBallsThrown) += 1;
        pthread_mutex_unlock(args->pmxBallsThrown);
    }
    return NULL;
}

/* returns # of bin where ball has landed */
int throwBall(UINT *seedptr)
{
    int result = 0;
    for (int i = 0; i < BIN_COUNT - 1; i++)
        if (NEXT_DOUBLE(seedptr) > 0.5)
            result++;
    return result;
}

Once again all thread input data is passed as pointer to the structure (Thrower_t), treads results modify bins array ( pointer in the same structure), no global variables used.

In this code two mutexes protect two counters and an array of mutexes protects the bins’ array (one mutex for every cell in the array). In total we have BIN_COUNT+2 mutexes.

In this program we use detachable threads. There is now need (nor option) to wait for working threads to finish and thus lack of pthread_join. As we do not join the threads a different method must be deployed to test if the main program can exit.

In this example mutexes are created in two ways - as automatic and dynamic variables. The first method is simpler in coding but you need to know exactly how many mutexes you need at coding time. The dynamic creation requires more coding (initiation and removal) but the amount of mutexes also is dynamic.

Is data passed to threads in argsThrower_t structure shared between them?

Answer
Some of it, counters and bins are shared and thus protected with mutexes.

Is structure argsThrower_t optimal?

Answer
No - some fields point the same data for every thread. Common parts can be moved to additional structure and one pointer for this structure instead of 6 will be stored in argsThrower_t. Additionally we store tids in this structure while it is not used in the thread code.

Why do we mostly use pointers in the threads input data?

Answer
We share the data, without the pointers we would have independent copies of those variables in each thread.

Can we pass mutexes as variables? Not as pointers?

Answer
NO, POSIX FORBIDS, copy of a mutex does not have to be a working mutex! Even if it would work, it should be quite obvious that, a copy would be a different mutex.

This program uses a lot of mutexes, can we reduce the number of them?

Answer
Yes, in extreme case it can be reduced to only one mutex but at the cost of concurrency. In more reasonable approach you can have 2 mutexes for the counters and one for all the bins, although the concurrency is lower in this case the running time of a program can be a bit shorter as operations on mutexes are quite time consuming for the OS.

To check if the working threads terminated, the main threads periodically checks if the numer of thrown beans is equal to the number of beans in total. Is this optimal solution?

Answer
No, it is so called "soft busy waiting" but without synchronization tool like conditional variable it can not be solved better.

Do all the threads created in this program really work?

Answer
No ,especially when there is a lot of threads. It is possible that some of threads "starve". The work code for the thread is very fast, thread creation is rather slow, it is possible that last threads created will have no beans left to throw. To check it please add per thread thrown beans counters and print them on stdout at the thread termination. The problem can be avoided if we add synchronization on threads start - make them start at the same time but this again requires the methods that will be introduced during OPS2 (barier or conditional variable).

Task 3 - threads and signals, waiting for a thread with sigwait function #

Goal: The program takes sole ‘k’ parameter and prints the list of numbers form 1 to k at each second. It must handle two signals in dedicated thread, the following action must be taken upon the signal arrival:

  • SIGINT (C-c) … removes random number from the list (do nothing if empty),
  • SIGQUIT (C-) … set program ‘STOP’ flag (both threads end).

What you need to know:

  • man 3p pthread_sigmask
  • man 3p sigprocmask
  • man 3p sigwait

Solution prog19.c:

#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define MAXLINE 4096
#define DEFAULT_ARRAYSIZE 10
#define DELETED_ITEM -1
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))

typedef struct argsSignalHandler
{
    pthread_t tid;
    int *pArrayCount;
    int *array;
    pthread_mutex_t *pmxArray;
    sigset_t *pMask;
    bool *pQuitFlag;
    pthread_mutex_t *pmxQuitFlag;
} argsSignalHandler_t;

void ReadArguments(int argc, char **argv, int *arraySize);
void removeItem(int *array, int *arrayCount, int index);
void printArray(int *array, int arraySize);
void *signal_handling(void *);

int main(int argc, char **argv)
{
    int arraySize, *array;
    bool quitFlag = false;
    pthread_mutex_t mxQuitFlag = PTHREAD_MUTEX_INITIALIZER;
    pthread_mutex_t mxArray = PTHREAD_MUTEX_INITIALIZER;
    ReadArguments(argc, argv, &arraySize);
    int arrayCount = arraySize;
    if (NULL == (array = (int *)malloc(sizeof(int) * arraySize)))
        ERR("Malloc error for array!");
    for (int i = 0; i < arraySize; i++)
        array[i] = i + 1;
    sigset_t oldMask, newMask;
    sigemptyset(&newMask);
    sigaddset(&newMask, SIGINT);
    sigaddset(&newMask, SIGQUIT);
    if (pthread_sigmask(SIG_BLOCK, &newMask, &oldMask))
        ERR("SIG_BLOCK error");
    argsSignalHandler_t args;
    args.pArrayCount = &arrayCount;
    args.array = array;
    args.pmxArray = &mxArray;
    args.pMask = &newMask;
    args.pQuitFlag = &quitFlag;
    args.pmxQuitFlag = &mxQuitFlag;
    if (pthread_create(&args.tid, NULL, signal_handling, &args))
        ERR("Couldn't create signal handling thread!");
    while (true)
    {
        pthread_mutex_lock(&mxQuitFlag);
        if (quitFlag == true)
        {
            pthread_mutex_unlock(&mxQuitFlag);
            break;
        }
        else
        {
            pthread_mutex_unlock(&mxQuitFlag);
            pthread_mutex_lock(&mxArray);
            printArray(array, arraySize);
            pthread_mutex_unlock(&mxArray);
            sleep(1);
        }
    }
    if (pthread_join(args.tid, NULL))
        ERR("Can't join with 'signal handling' thread");
    free(array);
    if (pthread_sigmask(SIG_UNBLOCK, &newMask, &oldMask))
        ERR("SIG_BLOCK error");
    exit(EXIT_SUCCESS);
}

void ReadArguments(int argc, char **argv, int *arraySize)
{
    *arraySize = DEFAULT_ARRAYSIZE;

    if (argc >= 2)
    {
        *arraySize = atoi(argv[1]);
        if (*arraySize <= 0)
        {
            printf("Invalid value for 'array size'");
            exit(EXIT_FAILURE);
        }
    }
}

void removeItem(int *array, int *arrayCount, int index)
{
    int curIndex = -1;
    int i = -1;
    while (curIndex != index)
    {
        i++;
        if (array[i] != DELETED_ITEM)
            curIndex++;
    }
    array[i] = DELETED_ITEM;
    *arrayCount -= 1;
}

void printArray(int *array, int arraySize)
{
    printf("[");
    for (int i = 0; i < arraySize; i++)
        if (array[i] != DELETED_ITEM)
            printf(" %d", array[i]);
    printf(" ]\n");
}

void *signal_handling(void *voidArgs)
{
    argsSignalHandler_t *args = voidArgs;
    int signo;
    srand(time(NULL));
    for (;;)
    {
        if (sigwait(args->pMask, &signo))
            ERR("sigwait failed.");
        switch (signo)
        {
            case SIGINT:
                pthread_mutex_lock(args->pmxArray);
                if (*args->pArrayCount > 0)
                    removeItem(args->array, args->pArrayCount, rand() % (*args->pArrayCount));
                pthread_mutex_unlock(args->pmxArray);
                break;
            case SIGQUIT:
                pthread_mutex_lock(args->pmxQuitFlag);
                *args->pQuitFlag = true;
                pthread_mutex_unlock(args->pmxQuitFlag);
                return NULL;
            default:
                printf("unexpected signal %d\n", signo);
                exit(1);
        }
    }
    return NULL;
}

Thread input structure argsSignalHandler_t holds the shared threads data (an array and STOP flag) with protective mutexes and not shared (signal mask and tid of thread designated to handle the signals).

In threaded process (one that has more that one thread) you can not use sigprocmask, use pthread_sigmask instead.

Having separated thread to handle the signals (as in this example) is a very common way to deal with signals in multi-threaded code.

How many threads run in this program?

Answer
Two, main thread created by system (every process has one starting thread) and the thread created by the code.

Name differences and similarities between sigwait i sigsuspend:

Answer
- sigwait does not require signal handling routine as sigsuspend - both functions require blocking of the anticipated signal/signals - sigwait can not be interrupted by signal handling function (it is POSIX requirement), sigsuspend can - sigwait does not change the mask of blocked signals, even if signal handler is set it will not be triggered (in this example we do not have handlers), it will be executed on sigsuspend call

After successful call to sigwait only one type of pending signal is removed from the pending signals vector thus the problem we experienced with sigsuspend in L2 example can be corrected when using sigwait instead of sigsuspend. Please correct the program in L2 as an exercise.

Does the method of waiting for the end of working thread have the same flow as the method in previous example?

Answer
No, periodical printout of the table is a part of the task, busy waiting is when looping is coded only for waiting for the condition to become true. Despite that in this example we join the thread.

Can we use sigprocmask instead of pthread_sigmask in this program?

Answer
Yes, the signal blocking is set prior to thread creation, still in single thread phase of the program.

Why system calls to functions operating on mutex (acquire, release) are not tested for errors?

Answer
Basic mutex type (the type used in this program, default one) is not checking nor reporting errors. Adding those checks would not be such a bad idea as they are not harming the code and if you decide to later change the mutex type to error checking it will not require many changes in the code.

Task 4 - threads cancellation, cleanup handlers #

Goal: Program simulates the faith of MiNI students, it takes the following parameter:

  • n <= 100 … count of new students

The program stores the counters of students studding on year 1,2,3 and the final BSc year.

Main Thread: Initiate students, then for 4 seconds at random intervals (100-300ms) strike off one of the students (cancel thread). After the 4s period waits for the students threads and prints the counters.

Students threads: Student (thread) adds itself to the counter of the first year, after a second it removes itself from it and adds to the second year counter and so on until it reaches the BSc counter. The thread must be always prepared for cacellation.

What you need to know:

  • man 3p pthread_cancel
  • man 3 pthread_cleanup_push
  • man 3 pthread_cleanup_pop
  • man 7 time
  • man 3p clock_getres

Solution prog20.c:

#include <errno.h>
#include <pthread.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#define MAXLINE 4096
#define DEFAULT_STUDENT_COUNT 100
#define ELAPSED(start, end) ((end).tv_sec - (start).tv_sec) + (((end).tv_nsec - (start).tv_nsec) * 1.0e-9)
#define ERR(source) (perror(source), fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), exit(EXIT_FAILURE))

typedef unsigned int UINT;
typedef struct timespec timespec_t;
typedef struct studentList
{
    bool *removed;
    pthread_t *thStudents;
    int count;
    int present;
} studentsList_t;
typedef struct yearCounters
{
    int values[4];
    pthread_mutex_t mxCounters[4];
} yearCounters_t;
typedef struct argsModify
{
    yearCounters_t *pYearCounters;
    int year;
} argsModify_t;
void ReadArguments(int argc, char **argv, int *studentsCount);
void *student_life(void *);
void increment_counter(argsModify_t *args);
void decrement_counter(void *_args);
void msleep(UINT milisec);
void kick_student(studentsList_t *studentsList);

int main(int argc, char **argv)
{
    int studentsCount;
    ReadArguments(argc, argv, &studentsCount);
    yearCounters_t counters = {.values = {0, 0, 0, 0},
                               .mxCounters = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER,
                                              PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER}};
    studentsList_t studentsList;
    studentsList.count = studentsCount;
    studentsList.present = studentsCount;
    studentsList.thStudents = (pthread_t *)malloc(sizeof(pthread_t) * studentsCount);
    studentsList.removed = (bool *)malloc(sizeof(bool) * studentsCount);
    if (studentsList.thStudents == NULL || studentsList.removed == NULL)
        ERR("Failed to allocate memory for 'students list'!");
    for (int i = 0; i < studentsCount; i++)
        studentsList.removed[i] = false;
    for (int i = 0; i < studentsCount; i++)
        if (pthread_create(&studentsList.thStudents[i], NULL, student_life, &counters))
            ERR("Failed to create student thread!");
    srand(time(NULL));
    timespec_t start, current;
    if (clock_gettime(CLOCK_REALTIME, &start))
        ERR("Failed to retrieve time!");
    do
    {
        msleep(rand() % 201 + 100);
        if (clock_gettime(CLOCK_REALTIME, &current))
            ERR("Failed to retrieve time!");
        kick_student(&studentsList);
    } while (ELAPSED(start, current) < 4.0);
    for (int i = 0; i < studentsCount; i++)
        if (pthread_join(studentsList.thStudents[i], NULL))
            ERR("Failed to join with a student thread!");
    printf(" First year: %d\n", counters.values[0]);
    printf("Second year: %d\n", counters.values[1]);
    printf(" Third year: %d\n", counters.values[2]);
    printf("  Engineers: %d\n", counters.values[3]);
    free(studentsList.removed);
    free(studentsList.thStudents);
    exit(EXIT_SUCCESS);
}

void ReadArguments(int argc, char **argv, int *studentsCount)
{
    *studentsCount = DEFAULT_STUDENT_COUNT;
    if (argc >= 2)
    {
        *studentsCount = atoi(argv[1]);
        if (*studentsCount <= 0)
        {
            printf("Invalid value for 'studentsCount'");
            exit(EXIT_FAILURE);
        }
    }
}

void *student_life(void *voidArgs)
{
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
    argsModify_t args;
    args.pYearCounters = voidArgs;
    for (args.year = 0; args.year < 3; args.year++)
    {
        increment_counter(&args);
        pthread_cleanup_push(decrement_counter, &args);
        msleep(1000);
        pthread_cleanup_pop(1);
    }
    increment_counter(&args);
    return NULL;
}

void increment_counter(argsModify_t *args)
{
    pthread_mutex_lock(&(args->pYearCounters->mxCounters[args->year]));
    args->pYearCounters->values[args->year] += 1;
    pthread_mutex_unlock(&(args->pYearCounters->mxCounters[args->year]));
}

void decrement_counter(void *_args)
{
    argsModify_t *args = _args;
    pthread_mutex_lock(&(args->pYearCounters->mxCounters[args->year]));
    args->pYearCounters->values[args->year] -= 1;
    pthread_mutex_unlock(&(args->pYearCounters->mxCounters[args->year]));
}

void msleep(UINT milisec)
{
    time_t sec = (int)(milisec / 1000);
    milisec = milisec - (sec * 1000);
    timespec_t req = {0};
    req.tv_sec = sec;
    req.tv_nsec = milisec * 1000000L;
    if (nanosleep(&req, &req))
        ERR("nanosleep");
}

void kick_student(studentsList_t *studentsList)
{
    int idx;
    if (0 == studentsList->present)
        return;
    do
    {
        idx = rand() % studentsList->count;
    } while (studentsList->removed[idx] == true);
    pthread_cancel(studentsList->thStudents[idx]);
    studentsList->removed[idx] = true;
    studentsList->present--;
}

Threads receive the pointer to the structure with current year and pointer to years counters, structure argsModify_t does not have the same flow as one in task 2 of this tutorial i.e. program is not making too many unnecessary references to the same data.

Structure studentsList_t is only used im main thread, it is not visible for students’ threads.

Cleaver structure yearCounters_t initialization will not work in archaic C standards (c89/c90). It is worth knowing but please use all the improvements of newer standards in your code.

Cleanup handlers in working thread are deployed to safely expel the student while its thread is asleep. Without the handlers the canceled student will occupy the last counter till the end of the program!

Please keep in mind that pthread_cleanup_push must be paired with pthread_cleanup_pop in the same lexical scope (the same braces {}).

How many mutexes this program uses and what they protect?

Answer
Four exactly, one for each year/counter.

Must current year of a student be a part of argsModify_t structure?

Answer
No it could have been automatic in thread variable, then structure argsModify_t would be no longer needed and you would pass pointer to yearCounters instead.

What does it mean that the thread cancellation state is set to PTHREAD_CANCEL_DEFERRED ?

Answer
Cancellation will only happen during certain function calls (so called cancellation points), it will not disturb the rest of the code in the thread. In other words the thread can finish a part of its work before it is canceled.

What functions used in the thread code are cancellation points?

Answer
Only nanosleep (called from msleep) is a cancellation point in this code.

How do we learn what functions are cancellation points?

Answer
$man 7 pthreads

What one in this call " pthread_cleanup_pop(1);" means ?

Answer
It means that the handler is not only removed from the handlers stack but also executed.

When the year counter is decreased?

Answer
In two cases, during cancellation (rare case), during the removal of cleanup handler from the stack of handlers.

Algorithm selecting a thread for cancellation has an apparent flow, can you name it and tell what threat it creates?

Answer
This random selection can last very long if only a few "live" threads are left on a large list of threads. Try to run the program with 10 as the parameter to check it.

Improve random selection as an exercise.

Have a look at the method used to measure the 4 seconds life time of the program (clock_gettime, nanosleep). Please change the solution to use alarm function and the SIGALRM handler as an exercise.

Do the example tasks. During the laboratory you will have more time and a starting code. If you do following tasks in the allotted time, it means that you are well-prepared.

Source codes presented in this tutorial #