Posix Threads
В этой лекции будем о многопоточную модель программирования, когда запускаем несоклько потоков с разделением памяти.
Будем говорить о библиотеки posix-threads.
Код 1
Разбираем код, кажется ничего сложного, информацию про функции ествественно см. man:
#define _GNU_SOURCE
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
_Atomic int counter;
void* threadfun(void *arg) {
int somevar;
for (int i = 0; i < 10; ++i) {
counter++;
printf("in thread %lu, stack near %p, pid=%d, tid=%d, counter=%d\n",
pthread_self(), &somevar, getpid(), gettid(), counter);
sleep(100);
}
return NULL;
}
int main(int argc, char* argv[]) {
if (argc != 2) {
fprintf(stderr, "Usage: threads THREAD_COUNT\n");
return 1;
}
int N = atoi(argv[1]);
pthread_t threads[N];
int created = 0;
for (; created < N; ++created) {
int result = pthread_create(&threads[created], NULL, threadfun, NULL);
if (result != 0) {
perror("pthread_create");
break;
}
}
for (int i = 0; i < created; ++i) {
pthread_join(threads[i], NULL);
}
}
Если мы хотим использовать функции стандартной библиотеки из нескольких потоков, то надо убедиться, что они потокобезопасны (проверить можно в man, например, man 3 printf)
Немного обсудили, что будет, если процесс завершится до завершения поток - их убьют.
Лимит по количеству создаваемых потоков зависит от такого, какого размера стек будет у каждого потока. Можно сделать с помощью функции pthread_attr_setstacksize()
fork()
Использовать форк можно в многопоточном коде, но очень сложно и лучше все-таки это избежать. Для этого есть отдельные спциальные функции.
#define _GNU_SOURCE
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
_Atomic int counter;
void* threadfun(void *arg) {
sleep(1);
pthread_exit(NULL);
return NULL;
}
int main(int argc, char* argv[]) {
if (argc != 3) {
fprintf(stderr, "Usage: %s THREAD_COUNT STACK_PAGES\n", argv[0]);
return 1;
}
int N = atoi(argv[1]);
int stack_pages = atoi(argv[2]);
pthread_t threads[N];
int created = 0;
for (; created < N; ++created) {
pthread_attr_t attr;
pthread_attr_init(&attr);
int sss_result = pthread_attr_setstacksize(&attr, stack_pages * 4096);
if (sss_result == EINVAL) {
fprintf(stderr, "pthread_attr_setstacksize returned EINVAL\n");
break;
}
int result = pthread_create(&threads[created], &attr, threadfun, NULL);
if (result != 0) {
perror("pthread_create");
break;
}
printf("%d threads\n", created);
}
for (int i = 0; i < created; ++i) {
printf("pthread_join returned %d\n", pthread_join(threads[i], NULL));
}
}
Сигналы
Обрабатывать будет рандомный поток, поэтому обычно пользуются масками (которая есть у каждого потока), чтобы выбрать конкретный поток для обработки сигналов.
Exec
Проблема, что он как и exit, предварительно все завершает. Для правильное работы с fd использовать функции с постфиксом CLOEXEC. (Честно говоря, этот момент сам не до конца понял). Но нужно это, чтобы при exec таблица файловых дескрипторов не сломалась.
Примитивы синхронизации
mutex
Библиотека предоставляет pthread_mutex_t. Чтобы заблокировать мьютекс в общей памяти pthread_mutex_lock.
Есть try_lock, timed_lock и т.п. сомнительные вещи.
Код 2
Дальше пытаемся реализовать очередь - один поток кладет задачи, другой снимает, по фатку обсуждаем cond_var’ы:
#define _GNU_SOURCE
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t nonempty = PTHREAD_COND_INITIALIZER;
struct node {
struct node *next;
int data;
};
struct node* list = NULL;
int pop() {
pthread_mutex_lock(&mutex);
while (list == NULL) {
pthread_cond_wait(&nonempty, &mutex); // atomically: mutex_unlock(); sleep(); mutex_lock();
}
int data = list->data;
struct node* next = list->next;
free(list);
list = next;
pthread_mutex_unlock(&mutex);
return data;
}
void push(int data) {
struct node *new = calloc(sizeof(*new), 1);
new->data = data;
pthread_mutex_lock(&mutex);
new->next = list;
list = new;
if (!list->next) {
pthread_cond_signal(&nonempty);
}
pthread_mutex_unlock(&mutex);
}
void* threadfun(void *arg) {
while (1) {
int i = pop();
printf("in thread %d: popped value %d\n", gettid(), i);
}
return NULL;
}
int main(int argc, char* argv[]) {
if (argc != 2) {
fprintf(stderr, "Usage: %s THREAD_COUNT\n", argv[0]);
return 1;
}
int N = atoi(argv[1]);
pthread_t threads[N];
int created = 0;
for (; created < N; ++created) {
int result = pthread_create(&threads[created], NULL, threadfun, NULL);
if (result != 0) {
perror("pthread_create");
break;
}
printf("%d threads\n", created);
}
for (int i = 0; i < 100; ++i) {
push(i);
}
for (int i = 0; i < created; ++i) {
pthread_join(threads[i], NULL);
}
}
Код 3
Дальше идет какая-то queue-stop, но я уже не стал разбираться:
#define _GNU_SOURCE
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t nonempty = PTHREAD_COND_INITIALIZER;
pthread_cond_t empty = PTHREAD_COND_INITIALIZER;
struct node {
struct node *next;
int data;
};
struct node* list = NULL;
_Bool done = 0;
int pop() {
// unlocked
pthread_mutex_lock(&mutex);
// locked
while (!list && !done) {
printf("in thread %d: waiting\n", gettid());
pthread_cond_wait(&nonempty, &mutex);
printf("in thread %d: awake\n", gettid());
}
int data;
if (list) {
data = list->data;
struct node* next = list->next;
free(list);
list = next;
if (list == NULL) {
pthread_cond_signal(&empty);
}
pthread_mutex_unlock(&mutex);
return data;
} else {
printf("in thread %d: stopping\n", gettid());
pthread_mutex_unlock(&mutex);
pthread_exit(NULL);
}
}
void push(int data) {
struct node *new = calloc(sizeof(*new), 1);
new->data = data;
pthread_mutex_lock(&mutex);
if (done) {
fprintf(stderr, "queue stopped\n");
free(new);
pthread_mutex_unlock(&mutex);
return;
}
new->next = list;
list = new;
if (!list->next) {
printf("signaling nonempty\n");
pthread_cond_signal(&nonempty);
}
pthread_mutex_unlock(&mutex);
}
void stop() {
pthread_mutex_lock(&mutex);
done = 1;
printf("broadcasting nonempty\n");
pthread_cond_broadcast(&nonempty);
while (list) {
pthread_cond_wait(&empty, &mutex);
}
pthread_mutex_unlock(&mutex);
}
void* threadfun(void *arg) {
while (1) {
int i = pop();
printf("in thread %d: popped value %d\n", gettid(), i);
}
return NULL;
}
int main(int argc, char* argv[]) {
if (argc != 2) {
fprintf(stderr, "Usage: %s THREAD_COUNT\n", argv[0]);
return 1;
}
int N = atoi(argv[1]);
pthread_t threads[N];
int created = 0;
for (; created < N; ++created) {
int result = pthread_create(&threads[created], NULL, threadfun, NULL);
if (result != 0) {
perror("pthread_create");
break;
}
printf("%d threads\n", created);
}
for (int i = 0; i < 100; ++i) {
push(i);
}
stop();
for (int i = 0; i < created; ++i) {
pthread_join(threads[i], NULL);
}
}