Nikita Shirikov Ed blog

Multiplexing

Продолжаем заниматься сетевым программированием.

Мы хотим обрабатывать несколько соединений. В прошлый раз для этого мы использовали fork. Но это не очень удобно: нагружает систему, нам тяжело шарить общее состояние и т.д.

Мы посмотрим на сервисы для мультипликсирования, которые предоставляет нам OS.

Poll

Будем смотреть на интерфейс poll. (Есть еще select)

Как он устроен: мы заводим массив структур pollfd и отдаем его ядру.

struct pollfd {
    int fd;
    short events;
    short revents;
};

Мы заполняем fd и events. Ядро возвращает нам информацию в поле revents.

Может быть не очень понятно (на лекции было также), поэтому разберем пример:

// надо найти код

Почему все не так радужно?

С последним нам может помочь вызов ppoll()

Если несколько потоков, то тоже проблемы.

epoll

Линукс-специфичный интерфейс, где массив pollfds лежит в ядре и мы взаимодействуем с ним с помощью системных вызовов.

Вызовы:

Как это все используется в коде:

// listener.h
#pragma once
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <netdb.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>

static int create_listener(char* service) {
    struct addrinfo *res = NULL;
    int gai_err;
    struct addrinfo hint = {
        .ai_family = AF_INET6,
        .ai_socktype = SOCK_STREAM,
        .ai_flags = AI_PASSIVE,
    };
    if ((gai_err = getaddrinfo(NULL, service, &hint, &res))) {
        fprintf(stderr, "gai error: %s\n", gai_strerror(gai_err));
        return -1;
    }
    int sock = -1;
    for (struct addrinfo *ai = res; ai; ai = ai->ai_next) {
        sock = socket(ai->ai_family, ai->ai_socktype, 0);
        if (sock < 0) {
            perror("socket");
            continue;
        }
        int one = 1;
        setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
        if (bind(sock, ai->ai_addr, ai->ai_addrlen) < 0) {
            perror("bind");
            close(sock);
            sock = -1;
            continue;
        }
        if (listen(sock, SOMAXCONN) < 0) {
            perror("listen");
            close(sock);
            sock = -1;
            continue;
        }
        break;
    }
    freeaddrinfo(res);
    return sock;
}
// main.c
#define _GNU_SOURCE

#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <netdb.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>

#include "listener.h"


int main(int argc, char* argv[]) {
    if (argc != 2) {
        fprintf(stderr, "Usage: %s SERVICE\n", argv[0]);
        return 1;
    }
    int sock = create_listener(argv[1]);
    if (sock < 0) {
        return 1;
    }

    int epollfd = epoll_create1(0);
    if (epollfd < 0) {
        perror("epoll");
        return 1;
    }

	struct epoll_event evt = {.events = EPOLLIN, .data.fd = sock};
    // ET mode: struct epoll_event evt = {.events = EPOLLIN | EPOLLET, .data.fd = sock};
	epoll_ctl(epollfd, EPOLL_CTL_ADD, sock, &evt);

	while (1) {
		int timeout = -1; // block indefinitely
		errno = 0;
		if (epoll_wait(epollfd, &evt, 1, timeout) < 1) {
			if (errno == EINTR) {
				continue;
			}
			perror("epoll_wait");
			return 1;
		}

		if (evt.data.fd == sock) {
            int connection = accept(sock, NULL, NULL);
            // Non-blocking fds: int connection = accept4(sock, NULL, NULL, SOCK_NONBLOCK);

            struct epoll_event evt = {.events = EPOLLIN,
                                      .data.fd = connection};
            epoll_ctl(epollfd, EPOLL_CTL_ADD, connection, &evt);
		} else {
			char buf[50] = {0};
            ssize_t res = read(evt.data.fd, buf, sizeof(buf) - 1);

            // ET mode
            ssize_t res;
            errno = 0;
            while (!(res == -1 && errno == EAGAIN)) {
                res = read(evt.data.fd, buf, sizeof(buf) - 1);
            }
            // End of ET mode

            if (res == 0) {
                close(evt.data.fd);
            } else {
                write(evt.data.fd, buf, res);
                printf("fd %d: %s", evt.data.fd, buf);
            }
		}
	}
}

Получаем намного более удобный и эффективный код.

Какие проблемы тут? Write все еще блокирующий.

Для этого нам потребуются:

Неблокирующие файловые дискрипторы

Тогда операция read будет возвращать ошибку, а не ждать данных, если таковых нет.

Можем установить для fd при создании, возпользоваться fcntl(), либо сразу создать неблокирующий сокет (для нашего случая). Можно пользоваться accept4() вместо accept().

Мы можем работать в более эффективном ET (edge triggered) mode, но мы обязаны делать read() до получения EAGAIN.

все это не работает с файлами с диска

#Os