Nikita Shirikov Ed blog

Posix Threads

В этой лекции будем о многопоточную модель программирования, когда запускаем несоклько потоков с разделением памяти.

Будем говорить о библиотеки posix-threads.

Код 1

Разбираем код, кажется ничего сложного, информацию про функции ествественно см. man:

#define _GNU_SOURCE
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>

#include <stdio.h>
#include <stdlib.h>


_Atomic int counter;


void* threadfun(void *arg) {
    int somevar;
    for (int i = 0; i < 10; ++i) {
        counter++;
        printf("in thread %lu, stack near %p, pid=%d, tid=%d, counter=%d\n",
               pthread_self(), &somevar, getpid(), gettid(), counter);
        sleep(100);
    }
    return NULL;
}


int main(int argc, char* argv[]) {
    if (argc != 2) {
        fprintf(stderr, "Usage: threads THREAD_COUNT\n");
        return 1;
    }
    int N = atoi(argv[1]);
    pthread_t threads[N];
    int created = 0;
    for (; created < N; ++created) {
        int result = pthread_create(&threads[created], NULL, threadfun, NULL);
        if (result != 0) {
            perror("pthread_create");
            break;
        }
    }
    for (int i = 0; i < created; ++i) {
        pthread_join(threads[i], NULL);
    }
}

Если мы хотим использовать функции стандартной библиотеки из нескольких потоков, то надо убедиться, что они потокобезопасны (проверить можно в man, например, man 3 printf)

Немного обсудили, что будет, если процесс завершится до завершения поток - их убьют.

Лимит по количеству создаваемых потоков зависит от такого, какого размера стек будет у каждого потока. Можно сделать с помощью функции pthread_attr_setstacksize()

fork()

Использовать форк можно в многопоточном коде, но очень сложно и лучше все-таки это избежать. Для этого есть отдельные спциальные функции.

#define _GNU_SOURCE
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>


_Atomic int counter;


void* threadfun(void *arg) {
    sleep(1);
    pthread_exit(NULL);
    return NULL;
}


int main(int argc, char* argv[]) {
    if (argc != 3) {
        fprintf(stderr, "Usage: %s THREAD_COUNT STACK_PAGES\n", argv[0]);
        return 1;
    }
    int N = atoi(argv[1]);
    int stack_pages = atoi(argv[2]);

    pthread_t threads[N];
    int created = 0;
    for (; created < N; ++created) {
        pthread_attr_t attr;
        pthread_attr_init(&attr);
        int sss_result = pthread_attr_setstacksize(&attr, stack_pages * 4096);
        if (sss_result == EINVAL) {
            fprintf(stderr, "pthread_attr_setstacksize returned EINVAL\n");
            break;
        }
        int result = pthread_create(&threads[created], &attr, threadfun, NULL);
        if (result != 0) {
            perror("pthread_create");
            break;
        }
        printf("%d threads\n", created);
    }
    for (int i = 0; i < created; ++i) {
        printf("pthread_join returned %d\n", pthread_join(threads[i], NULL));
    }
}

Сигналы

Обрабатывать будет рандомный поток, поэтому обычно пользуются масками (которая есть у каждого потока), чтобы выбрать конкретный поток для обработки сигналов.

Exec

Проблема, что он как и exit, предварительно все завершает. Для правильное работы с fd использовать функции с постфиксом CLOEXEC. (Честно говоря, этот момент сам не до конца понял). Но нужно это, чтобы при exec таблица файловых дескрипторов не сломалась.

Примитивы синхронизации

mutex

Библиотека предоставляет pthread_mutex_t. Чтобы заблокировать мьютекс в общей памяти pthread_mutex_lock.

Есть try_lock, timed_lock и т.п. сомнительные вещи.

Код 2

Дальше пытаемся реализовать очередь - один поток кладет задачи, другой снимает, по фатку обсуждаем cond_var’ы:

#define _GNU_SOURCE
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>


pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t nonempty = PTHREAD_COND_INITIALIZER;


struct node {
    struct node *next;
    int data;
};

struct node* list = NULL;


int pop() {
    pthread_mutex_lock(&mutex);

    while (list == NULL) {
        pthread_cond_wait(&nonempty, &mutex);   // atomically: mutex_unlock(); sleep(); mutex_lock(); 
    }

    int data = list->data;
    struct node* next = list->next;
    free(list);
    list = next;

    pthread_mutex_unlock(&mutex);

    return data;
}

void push(int data) {
    struct node *new = calloc(sizeof(*new), 1);
    new->data = data;

    pthread_mutex_lock(&mutex);
    new->next = list;
    list = new;
    if (!list->next) {
        pthread_cond_signal(&nonempty);
    }
    pthread_mutex_unlock(&mutex);
}

void* threadfun(void *arg) {
    while (1) {
        int i = pop();
        printf("in thread %d: popped value %d\n", gettid(), i);
    }
    return NULL;
}


int main(int argc, char* argv[]) {
    if (argc != 2) {
        fprintf(stderr, "Usage: %s THREAD_COUNT\n", argv[0]);
        return 1;
    }
    int N = atoi(argv[1]);

    pthread_t threads[N];
    int created = 0;
    for (; created < N; ++created) {
        int result = pthread_create(&threads[created], NULL, threadfun, NULL);
        if (result != 0) {
            perror("pthread_create");
            break;
        }
        printf("%d threads\n", created);
    }

    for (int i = 0; i < 100; ++i) {
        push(i);
    }

    for (int i = 0; i < created; ++i) {
        pthread_join(threads[i], NULL);
    }
}

Код 3

Дальше идет какая-то queue-stop, но я уже не стал разбираться:

#define _GNU_SOURCE
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>


pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t nonempty = PTHREAD_COND_INITIALIZER;
pthread_cond_t empty = PTHREAD_COND_INITIALIZER;


struct node {
    struct node *next;
    int data;
};

struct node* list = NULL;
_Bool done = 0;

int pop() {
    // unlocked
    pthread_mutex_lock(&mutex);
    // locked
    while (!list && !done) {
        printf("in thread %d: waiting\n", gettid());
        pthread_cond_wait(&nonempty, &mutex);
        printf("in thread %d: awake\n", gettid());
    }
    int data;
    if (list) {
        data = list->data;
        struct node* next = list->next;
        free(list);
        list = next;

        if (list == NULL) {
            pthread_cond_signal(&empty);
        }
        pthread_mutex_unlock(&mutex);
        return data;
    } else {
        printf("in thread %d: stopping\n", gettid());
        pthread_mutex_unlock(&mutex);
        pthread_exit(NULL);
    }
}

void push(int data) {
    struct node *new = calloc(sizeof(*new), 1);
    new->data = data;

    pthread_mutex_lock(&mutex);
    if (done) {
        fprintf(stderr, "queue stopped\n");
        free(new);
        pthread_mutex_unlock(&mutex);
        return;
    }
    new->next = list;
    list = new;
    if (!list->next) {
        printf("signaling nonempty\n");
        pthread_cond_signal(&nonempty);
    }
    pthread_mutex_unlock(&mutex);
}

void stop() {
    pthread_mutex_lock(&mutex);
    done = 1;
    printf("broadcasting nonempty\n");
    pthread_cond_broadcast(&nonempty);
    while (list) {
        pthread_cond_wait(&empty, &mutex);
    }
    pthread_mutex_unlock(&mutex);
}

void* threadfun(void *arg) {
    while (1) {
        int i = pop();
        printf("in thread %d: popped value %d\n", gettid(), i);
    }
    return NULL;
}


int main(int argc, char* argv[]) {
    if (argc != 2) {
        fprintf(stderr, "Usage: %s THREAD_COUNT\n", argv[0]);
        return 1;
    }
    int N = atoi(argv[1]);

    pthread_t threads[N];
    int created = 0;
    for (; created < N; ++created) {
        int result = pthread_create(&threads[created], NULL, threadfun, NULL);
        if (result != 0) {
            perror("pthread_create");
            break;
        }
        printf("%d threads\n", created);
    }

    for (int i = 0; i < 100; ++i) {
        push(i);
    }
    stop();

    for (int i = 0; i < created; ++i) {
        pthread_join(threads[i], NULL);
    }
}

#Os