epollでI/O多重化したTCPサーバーを作る
2023年12月15日

Socket API

TCP/IP等のネットワーク上でコンピュータ同士が通信するために用意されている低レベルなインターフェースとしてSocket APIがある。ネットワーク上の通信を伴うアプリケーションはsocket(), accept()などのシステムコールを呼び出してOSのTCP/IPスタック(TCP/IP通信のプロトコルに関するカーネル内のプログラム群)を利用することができる。
また、アプリケーション側から見た通信の出入り口はソケットと呼ばれ、IPアドレスやポート番号と紐づいたソケットに対して読み書きすることでデータの送受信ができるようになっている。

ソケット通信の流れ

socket timeline

ソケットを用いたクライアント・サーバー間の通信の流れは以下のようになる。

サーバー側

  1. socket()でソケットを作成し、そのファイルディスクリプタを取得
  2. bind()でソケットと自身のIPアドレスやポートを紐づける
  3. listen()でソケットが接続を待ち受ける状態にする
  4. accept()を実行するとプログラムが止まり、クライアントのconnect()による接続要求を受け取ったら、そのクライアントとの通信用のソケット(ファイルディスクリプタ)を新しく作成する
  5. send(), recv() でデータの送受信
  6. close() でソケット(ファイルディスクリプタ)を閉じる

クライアント側

  1. socket() でソケット (とそのファイルディスクリプタ) を作成
  2. connect() でサーバ側のソケットに接続
  3. send(), recv() でデータの送受信
  4. close() でソケット(ファイルディスクリプタ)を閉じる

Cによるソケット通信のサンプル

Cでソケット通信を行うTCPサーバーを実装すると、例えば以下のようになる。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>

int main(void)
{
  char *server_ip = "<サーバーのIPアドレス>";
  unsigned short server_port = <サーバーのポート番号>;

  int sockfd;                     // 接続を待ち受けるためのソケットのファイルディスクリプタ
  int connfd;                     // クライアントと通信するためのソケットのファイルディスクリプタ
  char buffer[1024];              // データ送受信のためのソケットのバッファサイズ
  struct sockaddr_in server_addr; // サーバーのアドレス情報
  struct sockaddr_in client_addr; // クライアントのアドレス情報
  socklen_t addr_size;            // クライアントのアドレス情報の長さ
  char client_ip[32];             // クライアントのIPアドレス

  // 接続を待ち受けるためのソケットを生成
  if ((sockfd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
  {
    perror("Socket Error");
    exit(EXIT_FAILURE);
  }
  printf("[Socket] Socket successfully created. fd: %d\n", sockfd);

  // サーバーのアドレス情報をソケットに割り当てる
  int one = 1;
  setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one));
  memset(&server_addr, 0x00, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_addr.s_addr = inet_addr(server_ip);
  server_addr.sin_port = htons(server_port);
  if (bind(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0)
  {
    perror("Bind Error");
    exit(EXIT_FAILURE);
  }
  printf("[Bind] Socket successfully bound.\n");

  // 接続を待ち受ける状態にする
  if (listen(sockfd, 5) != 0)
  {
    perror("Listen Error");
    exit(EXIT_FAILURE);
  }
  printf("[Listen] Listening...\n");

  // 接続があれば受け入れ、新たなソケットを生成する
  addr_size = sizeof(client_addr);
  if ((connfd = accept(sockfd, (struct sockaddr *)&client_addr, &addr_size)) < 0)
  {
    perror("Accept Error");
    exit(EXIT_FAILURE);
  }
  memset(client_ip, '\0', sizeof(client_ip));
  inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, client_ip, sizeof(client_ip));
  printf("[Accept] Client connected. fd: %d, Client IP: %s, Client Port: %u\n", connfd, client_ip, client_addr.sin_port);

  // 生成されたソケットを使いメッセージを受け取る
  bzero(buffer, 1024);
  if (recv(connfd, buffer, sizeof(buffer), 0) < 0)
  {
    perror("Recv Error");
    exit(EXIT_FAILURE);
  }
  printf("[Recv] Client: %s\n", buffer);

  // クライアントにメッセージを送り返す
  bzero(buffer, 1024);
  strcpy(buffer, "HELLO, THIS IS SERVER.");
  if (send(connfd, buffer, strlen(buffer), 0) < 0)
  {
    perror("Send Error");
    exit(EXIT_FAILURE);
  }
  printf("[Send] Message successfly sent.\n");

  // 切断
  close(connfd);
  printf("[Close] Client disconnected.\n");

  close(sockfd);
  printf("[Close] Server ended.\n");
}

ソケットのブロッキングとI/O多重化

デフォルトのソケットの場合、サーバー側のaccept()recv()はその処理が終了するまで他の処理をブロックすることになるため、一つのクライアントと通信している際は他のクライアントからの接続は受け付けられないことになる。
複数クライアントと並行で通信を行うためにマルチプロセス/マルチスレッドを用いることも可能な一方で、シングルプロセス・シングルスレッドでもselect(), poll(), epoll(), kqueue()といったシステムコールを使って複数のソケットを同時に扱うことが可能になる。これをI/Oの多重化と言い、例えばNginxは多重化されたI/Oを採用することでシングルスレッドでも同時リクエストを処理しているらしい。

epollで複数クライアントからの接続を処理する

epoll()を使って前述のプログラムを複数クライアントとの通信に並行でハンドリングできるようにしてみる。 epoll()の使い方としては以下のようになる。ただし、ここではエッジトリガーと呼ばれる方式でepollを動作させる。 (参考: https://ymmt.hatenablog.com/entry/2013/09/05/150116)

  1. クライアントからの接続を待ち受けるためのソケットをノンブロッキングにする

    fcntl(sockfd, F_SETFL, O_NONBLOCK);
    
  2. epoll_create1でepollのインスタンスを作成

    // epollインスタンスのファイルディスクリプタが帰ってくる
    int epollfd = epoll_create1(0);
    
  3. epoll_ctlでクライアントからの接続を待ち受けるためのソケットをepollで監視させる

    // epoll_event構造体を使い、監視対象のイベントの種類を指定する
    struct epoll_event ev
    // 今回はsockfdからの読み取りが可能になるというイベントを監視する
    ev.events = EPOLLIN | EPOLLET;
    ev.data.fd = sockfd;
    // epollでsockfdを監視するよう設定
    epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &ev)
    
  4. epoll_waitでイベントを監視し、発生したイベントの一覧を取得、順に処理していく

    // MAX_EVENTSの数までイベントを監視
    struct epoll_event events[MAX_EVENTS];
    // イベントが発生した数を返す
    int nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
    
    // イベントがあったソケットでループ
    for (int i = 0; i < nfds; i++)
    {
      // eventsに受け取ったイベントの情報が入ってくる
      int fd = events[i].data.fd;
    
      if (fd == sockfd)
      {
        // listenしているソケットに新しい接続要求があった場合、通信用のソケットを新しく生成する
        // int connfd = accept(sockfd, ...
    
        // ノンブロッキングなソケットにする
        fcntl(connfd, F_SETFL, O_NONBLOCK);
        // 新しい通信用ソケットをepollによる監視対象に加える
        struct epoll_event ev;
        ev.events = EPOLLIN | EPOLLET;
        ev.data.fd = connfd;
        epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &ev)
      }
      else if ((events[i].events & EPOLLERR) ||
               (events[i].events & EPOLLHUP) ||
               (!(events[i].events & EPOLLIN)))
      {
        // 接続を切断する
        close(fd);
      }
      else
      {
        // クライアントとの通信用ソケットが読み込み可能になっている場合、受信したデータを処理する
        // int ret_data = recv(fd, ...
      }
    }
    
  5. 4のepoll_waitによるイベント一覧取得とその後のイベント処理をループする