Tutorial 6 - POSIX Message Queues #
This tutorial contains explanations of the functions required during lab sessions, along with their parameters.
However, it is still just a reference guide with the most important information —
you must read the indicated man pages to fully understand all the details.
POSIX Queue Operations #
The topic of message queues is noticeably less extensive than that of pipes/FIFOs, and this tutorial is also shorter.
POSIX message queues don’t suffer from issues with simultaneous reading and writing, guarantee atomic writes even for large messages, provide distinct message boundaries, and support priorities. These features make the topic relatively simple, with potential difficulties mostly limited to handling queue event notifications.
The main focus of this session is POSIX message queues, but the tasks will also involve elements practiced earlier, such as threads, processes, and signals.
The manual man 7 mq_overview
contains a description of how queues work. The discussed functions are defined in the <mqueue.h>
header file, and the interface is designed in a way that allows queues to be used similarly to file operations — it’s worth noting the parallels in how each function is defined and used.
POSIX queues are priority-based, meaning the order of messages is determined first by priority and then by send order.
Creating, Opening, and Removing Queues #
Queues are identified by a name (defined by the programmer) /<name>
, e.g., /queue
.
To create and open a queue, use the mq_open
function (man 3p mq_open
):
mqd_t mq_open(const char *name, int oflag, ...);
name
is the name of the queue,oflag
is a bitmask defining the access mode. Available options include:O_RDONLY
,O_WRONLY
,O_RDWR
— open for reading/writing/both,O_CREAT
— create the queue; this requires two additional arguments:mode_t mode
, which specifies access permissions, andmq_attr* attr
, a structure with 4 fields:mq_flags
, a bitmask (POSIX only defines the presence/absence ofO_NONBLOCK
, others are implementation-dependent),mq_maxmsg
— maximum number of messages in the queue,mq_msgsize
— maximum message size,mq_curmsgs
— current number of messages in the queue (ignored when creating the queue).
You can also pass
NULL
asattr
, and the queue will be created with default settings.O_EXCL
— used only withO_CREAT
, it causes an error if a queue with the given name already exists,O_NONBLOCK
— sets the queue to non-blocking mode.
The function returns a message queue descriptor (mqd_t
) or (mqd_t)-1
on error.
Unused descriptors should be closed using mq_close
(man 3p mq_close
).
You can remove a queue by name using mq_unlink
. Be cautious when attempting to remove a queue that is still open in another process. As noted in man 3p mq_unlink
, unlinking only frees the name (e.g., so it can be reused). The queue is actually removed only after all associated descriptors are closed. Depending on the implementation, mq_unlink
may block until other processes close their descriptors. On Linux (man 3 mq_unlink
), however, it returns immediately.
Even in that case, a newly created queue with the same name will be a different queue.
Queue Attributes #
Queues store their attributes in a mq_attr
structure, discussed in the previous section (see mq_open
, O_CREAT
flag). You can retrieve queue attributes using mq_getattr
(man 3p mq_getattr
):
int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat);
mqdes
is the descriptor of the open queue,mqstat
is a pointer to an already allocatedstruct mq_attr
variable, whose fields will be filled by the function.
To set queue attributes, use mq_setattr
(man 3p mq_setattr
):
int mq_setattr(mqd_t mqdes, const struct mq_attr *restrict mqstat, struct mq_attr *restrict omqstat);
mqdes
is the descriptor of the open queue,mqstat
points to themq_attr
structure with attributes you want to set,omqstat
points to an already allocatedmq_attr
variable; its fields will be filled with values before the call tomq_setattr
. You may also passNULL
.
IMPORTANT: The mq_setattr
function only changes the mq_flags
field. The other fields (mq_maxmsg
, mq_msgsize
, and mq_curmsgs
) are immutable (set only during queue creation) and ignored by mq_setattr
.
In practice, according to POSIX, the only changeable setting is the blocking/non-blocking I/O mode.
Both functions return 0
on success or -1
on failure.
Sending and Receiving Messages #
So far, we have used read
and write
to handle I/O with files and pipes. Message queues, however, use their own functions: mq_send
and mq_receive
(man 3p mq_send
, man 3p mq_receive
). Let’s look at their definitions:
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
mqdes
is the queue descriptor,msg_ptr
is a pointer to the message buffer (for sending) or the buffer to be filled (for receiving),msg_len
is the size of themsg_ptr
buffer in bytes (must not exceedmq_msgsize
when sending; must be ≥mq_msgsize
when receiving),msg_prio
:- for
mq_send
, this is the message’s priority (must be less thanMQ_PRIO_MAX
— POSIX guaranteesMQ_PRIO_MAX >= 32
), - for
mq_receive
, it is a pointer to a variable where the message priority will be stored (can beNULL
if the priority is not needed).
- for
If you try to read from an empty queue or write to a full queue in non-blocking mode, the functions return -1
and set errno
to EAGAIN
.
In blocking mode, the functions will wait until the operation becomes possible.
If mq_send
succeeds, it returns 0
.mq_receive
returns the size (in bytes) of the received message.
In blocking mode, we can also set a maximum wait time for a message or for sending it. For this, use mq_timedsend
and mq_timedreceive
:
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abstime);
ssize_t mq_timedreceive(mqd_t mqdes, char *restrict msg_ptr, size_t msg_len, unsigned *restrict msg_prio, const struct timespec *restrict abstime);
These functions add the abstime
argument, a pointer to a timespec
struct (from <time.h>
), which defines an absolute timeout after which the function will stop waiting (to receive a message or to send one).
They behave like their non-timed counterparts, except that if the timeout is exceeded, they return -1
and set errno
to ETIMEDOUT
.
IMPORTANT: As mentioned, abstime
is an absolute timeout. To use it correctly, you must first retrieve the current time using, for example, clock_gettime
with CLOCK_REALTIME
(man 3p clock_getres
), then add the desired wait time (e.g., add 1
to tv_sec
for a 1-second wait).
When the queue is in non-blocking mode, mq_timed(send|receive)
behaves exactly the same as mq_(send|receive)
.
Notifications #
POSIX provides the ability to asynchronously notify processes about incoming messages to an empty queue.
To register a process for notification, use the mq_notify
function (man 3p mq_notify
):
int mq_notify(mqd_t mqdes, const struct sigevent *notification);
mqdes
is the descriptor of the open queue,notification
is a pointer to asigevent
structure (header<signal.h>
) that specifies how the notification should be delivered.
The sigevent
structure should be filled out depending on the notification type (man 7 sigevent
):
For signal-based notification (only realtime signals):
- set
sigev_notify
toSIGEV_SIGNAL
, - set
sigev_signo
to the signal you want to receive (e.g.,SIGRTMIN
), - set
sigev_value.sival_int
orsigev_value.sival_ptr
to the value you want to receive in the signal handler argument (an integer or a memory address, respectively).
Additionally:
- when setting the signal handler using the
sigaction
function, thesigaction
structure passed to the function must have thesa_flags
field set toSA_SIGINFO
(see thesethandler
function from the solution in Task 1), - the values passed in
sigev_value
are delivered to the handler in thesi_value
field of thesiginfo_t
structure (see themq_handler
function in the task solution).
- set
For thread-based notification:
- set
sigev_notify
toSIGEV_THREAD
, - set
sigev_notify_function
to a pointer to a function that takes aunion sigval
(the type ofsigev_value
) and returnsvoid
, - set
sigev_value.sival_int
orsigev_value.sival_ptr
to the value you want to pass to the above function (integer or memory address), - optionally set thread parameters in
sigev_notify_attributes
.
- set
To unregister a process from notifications, call mq_notify
with notification
set to NULL
.
Passing NULL
when no process is registered will return -1
and set errno
to EINVAL
.
IMPORTANT:
- Notifications are one-time only: after a notification is delivered, no further notifications are expected. To receive more, you must call
mq_notify
again. - Notifications are only triggered when the queue transitions from empty to non-empty.
- Only one process can be registered for notification. Otherwise,
mq_notify
returns-1
and setserrno
toEBUSY
.
Refer to the signal-based notification implementation in Task 1.
Thread-based notification is demonstrated in the solution for Task 2.
Notes #
- You must link the
librt
library when compiling programs that use POSIX message queues. - If you open an existing queue, it may already contain data. You should not assume it is empty. To ensure a clean state, delete the queue before recreating it.
Tasks with Solutions #
Task 1: Signal-Based Notification #
Write a program that simulates a simple version of bingo. The parent process randomly draws numbers, and the players are its child processes. Communication between them takes place via POSIX message queues.
The parent process creates n
child processes (0 < n < 100
, where n
is a program argument) and two message queues.
The first queue, pout
, is used to send randomly drawn numbers from the range [0,9]
to the children once per second.
The second queue, pin
, is used to receive messages from the children when they win or complete the game.
Each child process randomly selects its winning number E
(in [0,9]
) and a number N
representing how many numbers it will read from the queue (also from [0,9]
). Then, in a loop, the child tries to read numbers from the pout
queue — a sent number can only be received by one process, not all of them simultaneously.
Each child compares the received number with its E
. If they match, the child sends this number to the parent via the pin
queue and exits.
If the child reads N
numbers without a match, it sends its own process number (from [1, n]
) to the parent via pin
before exiting.
The parent process must asynchronously receive messages from the pin
queue and print the appropriate output, while still sending numbers every second.
When all child processes terminate, the parent also ends execution and removes both queues.
NOTE: For this task, message size in the queue should be limited to 1 byte!
Task Solution #
New manpages:
man 7 mq_overview
man 3p mq_open
man 3p mq_close
man 3p mq_unlink
man 3p mq_getattr
man 3p mq_setattr
man 3p mq_send
man 3p mq_receive
man 3p mq_notify
man 7 sigevent
#define _GNU_SOURCE
#include <errno.h>
#include <mqueue.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
#define LIFE_SPAN 10
#define MAX_NUM 10
#define ERR(source) \
(fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), perror(source), kill(0, SIGKILL), exit(EXIT_FAILURE))
volatile sig_atomic_t children_left = 0;
void sethandler(void (*f)(int, siginfo_t *, void *), int sigNo)
{
struct sigaction act;
memset(&act, 0, sizeof(struct sigaction));
act.sa_sigaction = f;
act.sa_flags = SA_SIGINFO;
if (-1 == sigaction(sigNo, &act, NULL))
ERR("sigaction");
}
void mq_handler(int sig, siginfo_t *info, void *p)
{
mqd_t *pin;
uint8_t ni;
unsigned msg_prio;
pin = (mqd_t *)info->si_value.sival_ptr;
static struct sigevent notif;
notif.sigev_notify = SIGEV_SIGNAL;
notif.sigev_signo = SIGRTMIN;
notif.sigev_value.sival_ptr = pin;
if (mq_notify(*pin, ¬if) < 0)
ERR("mq_notify");
for (;;)
{
if (mq_receive(*pin, (char *)&ni, 1, &msg_prio) < 1)
{
if (errno == EAGAIN)
break;
else
ERR("mq_receive");
}
if (0 == msg_prio)
printf("MQ: got timeout from %d.\n", ni);
else
printf("MQ: %d is a bingo number!\n", ni);
}
}
void sigchld_handler(int sig, siginfo_t *s, void *p)
{
pid_t pid;
for (;;)
{
pid = waitpid(0, NULL, WNOHANG);
if (pid == 0)
return;
if (pid <= 0)
{
if (errno == ECHILD)
return;
ERR("waitpid");
}
children_left--;
}
}
void child_work(int n, mqd_t pin, mqd_t pout)
{
srand(getpid());
uint8_t my_bingo = (uint8_t)(rand() % MAX_NUM);
uint8_t ni;
for (int life = rand() % LIFE_SPAN + 1; life > 0; life--)
{
if (TEMP_FAILURE_RETRY(mq_receive(pout, (char *)&ni, 1, NULL)) < 1)
ERR("mq_receive");
printf("[%d] Received %d\n", getpid(), ni);
if (my_bingo == ni)
{
if (TEMP_FAILURE_RETRY(mq_send(pin, (const char *)&my_bingo, 1, 1)))
ERR("mq_send");
return;
}
}
if (TEMP_FAILURE_RETRY(mq_send(pin, (const char *)&n, 1, 0)))
ERR("mq_send");
}
void parent_work(mqd_t pout)
{
srand(getpid());
while (children_left)
{
uint8_t ni = (uint8_t)(rand() % MAX_NUM);
if (TEMP_FAILURE_RETRY(mq_send(pout, (const char *)&ni, 1, 0)))
ERR("mq_send");
sleep(1);
}
printf("[PARENT] Terminates \n");
}
void create_children(int n, mqd_t pin, mqd_t pout)
{
while (n > 0)
{
switch (fork())
{
case 0:
child_work(n, pin, pout);
exit(EXIT_SUCCESS);
case -1:
ERR("fork");
}
children_left++;
n--;
}
}
void usage(char *name)
{
fprintf(stderr, "USAGE: %s n k p l\n", name);
fprintf(stderr, "100 > n > 0 - number of children\n");
exit(EXIT_FAILURE);
}
int main(int argc, char **argv)
{
int n;
if (argc != 2)
usage(argv[0]);
n = atoi(argv[1]);
if (n <= 0 || n >= 100)
usage(argv[0]);
mqd_t pin, pout;
struct mq_attr attr = {};
attr.mq_maxmsg = 10;
attr.mq_msgsize = 1;
if ((pin = TEMP_FAILURE_RETRY(mq_open("/bingo_in", O_RDWR | O_NONBLOCK | O_CREAT, 0600, &attr))) == (mqd_t)-1)
ERR("mq_open in");
if ((pout = TEMP_FAILURE_RETRY(mq_open("/bingo_out", O_RDWR | O_CREAT, 0600, &attr))) == (mqd_t)-1)
ERR("mq_open out");
sethandler(sigchld_handler, SIGCHLD);
sethandler(mq_handler, SIGRTMIN);
create_children(n, pin, pout);
static struct sigevent noti;
noti.sigev_notify = SIGEV_SIGNAL;
noti.sigev_signo = SIGRTMIN;
noti.sigev_value.sival_ptr = &pin;
if (mq_notify(pin, ¬i) < 0)
ERR("mq_notify");
parent_work(pout);
mq_close(pin);
mq_close(pout);
if (mq_unlink("/bingo_in"))
ERR("mq unlink");
if (mq_unlink("/bingo_out"))
ERR("mq unlink");
return EXIT_SUCCESS;
}
Notes and Questions #
Pay attention to the use of the pointer sent with the signal. The signal handler function prototype includes an additional
siginfo_t*
parameter, and theSA_SIGINFO
flag is used when installing the handler to enable pointer passing. Note that we are not sending this signal manually — we receive it as a notification from the queue. Thekill
function cannot send a pointer; this can only be done usingsigqueue
.Numbers are passed using the
uint8_t
type (stdint.h
), which is a one-byte unsigned integer ranging from 0 to 255. Fixed-size integer types (e.g.,int32_t
) are more portable than base types likeint
, which can vary in size across platforms.Since the program involves signal handling, protection from signal interruptions is required, e.g., using
TEMP_FAILURE_RETRY
. These macros were added throughout the code, although only the parent process’s code is at risk of interruption, as it is the only one receiving notifications from the queue. Adding such safeguards does not degrade performance and makes the code more portable.One of the parent process’s calls lacks signal interruption protection — which one, and why is it not necessary there?
Answer:
Themq_receive
call inside the signal handler function. We don’t expect the handler itself to be interrupted by the signal it’s handling, because that signal is blocked during its own handling by default.How are child processes counted?
Answer:
By counting successfulwaitpid
calls in theSIGCHLD
handler. Signals likeSIGCHLD
can coalesce, so we do not count the signals themselves. The counter is stored in a global variable.Why do we install/restart the notification before reading from the queue?
Answer:
If we read the queue first and install the notification afterward, a message could arrive in the queue in between — and because notifications are triggered only when a message appears in an empty queue, we would miss it. Installing the notification first guarantees that the queue is empty and a new message will generate a notification.Why is one of the queues set to non-blocking mode?
Answer:
Thepin
queue is non-blocking due to the above reason. If it blocked, the program could hang inside the asynchronous signal handler after reading the last message, which would be a serious error.How are message priorities used, and how does that relate to the 1-byte message size limit?
Answer:
Priorities are used not for ordering but to indicate the type of message. Because we’re limited to 1-byte messages, it’s hard to encode anything beyond a single number. Whether it’s a winning message or a game-over notice is indicated via the priority.A significant part of the program logic is implemented inside the signal handler, which is acceptable because there’s no dependency between the number-drawing code and the notification-handling code. However, to make it more challenging, try moving all message-handling logic into the main parent process code (i.e., outside the asynchronous signal handler).
Task 2: Thread-based Notification #
Write a program simulating conversations on a Roman forum.
Define a constant CHILD_COUNT
.
The parent process creates CHILD_COUNT
queues, opens them in non-blocking mode, and spawns CHILD_COUNT
child processes.
Each child receives a randomly generated name.
Each child acts as a citizen on the forum.
Citizens randomly send each other messages containing their names, and upon receiving messages, they print them prefixed with their own name.
After each message is sent, they sleep for a random amount of time.
After sending ROUNDS
messages, they exit.
Each citizen waits for messages using thread-based notification.
Solution #
#include <errno.h>
#include <fcntl.h>
#include <mqueue.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/wait.h>
#include <unistd.h>
#define CHILD_COUNT 4
#define QUEUE_NAME_MAX_LEN 32
#define CHILD_NAME_MAX_LEN 32
#define MSG_SIZE 64
#define MAX_MSG_COUNT 4
#define ROUNDS 5
#define ERR(source) \
(fprintf(stderr, "%s:%d\n", __FILE__, __LINE__), perror(source), kill(0, SIGKILL), exit(EXIT_FAILURE))
mqd_t create_queue(char* name)
{
struct mq_attr attr = {};
attr.mq_maxmsg = MAX_MSG_COUNT;
attr.mq_msgsize = MSG_SIZE;
mqd_t res = mq_open(name, O_RDWR | O_NONBLOCK | O_CREAT, 0600, &attr);
if (res == -1)
ERR("mq_open");
return res;
}
typedef struct
{
char* name;
mqd_t queue;
} ChildData;
void handle_messages(union sigval data);
void register_notification(ChildData* data)
{
struct sigevent notification = {};
notification.sigev_value.sival_ptr = data;
notification.sigev_notify = SIGEV_THREAD;
notification.sigev_notify_function = handle_messages;
int res = mq_notify(data->queue, ¬ification);
if (res == -1)
ERR("mq_notify");
}
void handle_messages(union sigval data)
{
ChildData* child_data = data.sival_ptr;
char message[MSG_SIZE];
register_notification(child_data);
for (;;)
{
int res = mq_receive(child_data->queue, message, MSG_SIZE, NULL);
if (res != -1)
printf("%s: Accepi \"%s\"\n", child_data->name, message);
else
{
if (errno == EAGAIN)
break;
else
ERR("mq_receive");
}
}
}
void close_queues(mqd_t* queues)
{
for (int i = 0; i < CHILD_COUNT; ++i)
{
mq_close(queues[i]);
}
}
void unlink_queues(char queue_names[CHILD_COUNT][CHILD_NAME_MAX_LEN])
{
for (int i = 0; i < CHILD_COUNT; ++i)
{
int result = mq_unlink(queue_names[i]);
// The case when the queue does not exist already
// is not treated as an error
if (result == -1 && errno != ENOENT)
ERR("mq_close");
}
}
void send_message(mqd_t queue, char* content)
{
int result = mq_send(queue, content, MSG_SIZE, 0);
if (result == -1)
ERR("mq_send");
}
void child_function(char* name, mqd_t* queues, int i)
{
printf("%s: A PID %d incipiens.\n", name, getpid());
srand(getpid());
ChildData child_data = {};
child_data.name = name;
child_data.queue = queues[i];
union sigval data;
data.sival_ptr = &child_data;
handle_messages(data);
for (int i = 0; i < ROUNDS; ++i)
{
int receiver = rand() % CHILD_COUNT;
char message[MSG_SIZE];
switch (rand() % 3)
{
case 0:
snprintf(message, MSG_SIZE, "%s: Salve %d!", name, receiver);
break;
case 1:
snprintf(message, MSG_SIZE, "%s: Visne garum emere, %d?", name, receiver);
break;
case 2:
snprintf(message, MSG_SIZE, "%s: Fuistine hodie in thermis, %d?", name, receiver);
break;
}
send_message(queues[receiver], message);
int sleep_time = rand() % 3 + 1;
sleep(sleep_time);
}
printf("%s: Disceo.\n", name);
exit(EXIT_SUCCESS);
}
void spawn_child(char* name, mqd_t* queues, int i)
{
switch (fork())
{
case 0:
child_function(name, queues, i);
case -1:
ERR("fork");
}
}
int main(void)
{
mqd_t queues[CHILD_COUNT];
char names[CHILD_COUNT][CHILD_NAME_MAX_LEN];
char queue_names[CHILD_COUNT][CHILD_NAME_MAX_LEN];
for (int i = 0; i < CHILD_COUNT; ++i)
{
snprintf(queue_names[i], QUEUE_NAME_MAX_LEN, "/child_%d", i);
queues[i] = create_queue(queue_names[i]);
snprintf(names[i], CHILD_NAME_MAX_LEN, "Persona %d", i);
}
for (int i = 0; i < CHILD_COUNT; ++i)
{
spawn_child(names[i], queues, i);
}
close_queues(queues);
while (wait(NULL) > 0)
;
printf("Parens: Disceo.");
unlink_queues(queue_names);
return EXIT_SUCCESS;
}
Notes and Questions #
Why do we break the
mq_receive
loop onEAGAIN
?Answer:
This error means the queue is empty and there’s nothing more to read.Why does
child_function
callhandle_messages
and not justregister_notification
?Answer:
Because if messages are already in the queue from other children, no notification will be triggered. We must read those messages in the main thread.The parent process closes all queue descriptors immediately after creating the children. Why doesn’t this cause problems for the children?
Answer:
Because children inherit a copy of the parent’s data, including file descriptors. Closing them in the parent does not affect the children’s copies.Why don’t the child processes close the queues at the end?
Answer:
Closing a queue removes its notification, meaning no new handler threads will start. However, if a thread is already running while the queue is closed, it may attemptmq_notify
ormq_receive
on an invalid descriptor. Therefore, queues must remain open until the program ends. The OS will close them when the program exits, so it’s not an error.Could we just wait for the notification handler thread to finish before closing the queues?
Answer:
No. We don’t have the thread’s TID. Even if it were shared, we wouldn’t know if the thread still exists when closing queues. Also, the thread might be indetached
mode — such threads can’t be joined.Could we use a synchronization structure to prevent this issue?
Answer:
Unfortunately, no. If we used one, we’d need to destroy it at the end of the program. But the thread might still be running and try to access the destroyed structure. In this case, there’s no perfect solution. Since closing the queue means the program is ending anyway, if threads were writing to a file, we could use a condition variable to ensure they finish — but in this case, it’s unnecessary.Can we move the
exit(EXIT_SUCCESS)
call fromchild_function
tospawn_child
, right after callingchild_function
?Answer:
No, for the same reason we don’t close queues. Thechild_data
would be out of scope afterchild_function
, but it’s still needed by the handler thread. Leavingexit(EXIT_SUCCESS)
insidechild_function
ensures it stays valid.
Example Exercises #
Complete the sample exercises. You will have more time and starter code during the lab session, but completing the tasks below on your own means you are well prepared.
- Exercise 1 ~90 minutes
- Exercise 2 ~120 minutes
- Exercise 3 ~120 minutes
- Exercise 4 ~150 minutes