今回はネットワークプログラミング、サーバプログラ ミングの事例としてpgpool-IIを取り上げます。具体的には、ソケットの使い方やpre-forkテクニックの解説を行います。
pgpool-IIはPostgreSQLのproxyサーバであるとも言えます。 つまりPostgreSQLの通信プロトコルを実装しているわけで、そういった面からもpgpool-IIを解説したいと思います。
前回はpgpoolのソースを使って、以下のようなネットワークプログラミングの基礎を解説しました。
ここまでで、一応サーバがネットワーク経由でクライアントからの接続を受け入れるところまで解説が終わっています。
今回はその続きということで、クライアントがサーバに接続するところから始めます。
クライアントがサーバに接続するには、connect()というシステムコールを使います。 pgpool-IIでは、pool_connection_pool.cにある connect_unix_domain_socket_by_port()とconnect_inet_domain_socket_by_port() という関数がconnect()を呼び出しています(リスト1)。
リスト1: connect_unix_domain_socket_by_port()とconnect_inet_domain_socket_by_port()
---------------------------------------------------------------------
int connect_unix_domain_socket_by_port(int port, char *socket_dir)
{
struct sockaddr_un addr;
int fd;
int len;
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd == -1)
{
pool_error("connect_unix_domain_socket_by_port: setsockopt() failed: %s", strerror(errno));
return -1;
}
memset((char *) &addr, 0, sizeof(addr));
((struct sockaddr *)&addr)->sa_family = AF_UNIX;
snprintf(addr.sun_path, sizeof(addr.sun_path), "%s/.s.PGSQL.%d", socket_dir, port);
len = sizeof(struct sockaddr_un);
for (;;)
{
if (connect(fd, (struct sockaddr *)&addr, len) < 0)
{
if (errno == EINTR || errno == EAGAIN)
continue;
pool_error("connect_unix_domain_socket_by_port: connect() failed: %s", strerror(errno));
close(fd);
return -1;
}
break;
}
return fd;
}
int connect_inet_domain_socket_by_port(char *host, int port)
{
int fd;
int len;
int on = 1;
struct sockaddr_in addr;
struct hostent *hp;
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0)
{
pool_error("connect_inet_domain_socket_by_port: socket() failed: %s", strerror(errno));
return -1;
}
/* set nodelay */
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
(char *) &on,
sizeof(on)) < 0)
{
pool_error("connect_inet_domain_socket_by_port: setsockopt() failed: %s", strerror(errno));
close(fd);
return -1;
}
memset((char *) &addr, 0, sizeof(addr));
((struct sockaddr *)&addr)->sa_family = AF_INET;
addr.sin_port = htons(port);
len = sizeof(struct sockaddr_in);
hp = gethostbyname(host);
if ((hp == NULL) || (hp->h_addrtype != AF_INET))
{
pool_error("connect_inet_domain_socket: gethostbyname() failed: %s host: %s", strerror(errno), host);
close(fd);
return -1;
}
memmove((char *) &(addr.sin_addr),
(char *) hp->h_addr,
hp->h_length);
for (;;)
{
if (connect(fd, (struct sockaddr *)&addr, len) < 0)
{
if (errno == EINTR || errno == EAGAIN)
continue;
pool_error("connect_inet_domain_socket: connect() failed: %s",strerror(errno));
close(fd);
return -1;
}
break;
}
return fd;
}
---------------------------------------------------------------------
クライアントからサーバに接続する際もまずsocket()システムコールを使ってソケットを作ります。 socket()については前回説明しました。
作成したソケットにはsetsockopt()でオプションを与えることができます。ここでは、TCP_NODELAYというオプションを指定しています。これを指定しないと、カーネルは小さなTCPのパケットをできるだけまとめて送信しようとするため、PostgreSQLの通信プロトコルに支障を来たすことがあります。
次にconnect()を使って目的のサーバに接続します。connect()の第1引数は socket()の返すファイルディスクリプタです。第2引数は接続するサーバを指 定するための構造体sockaddrです。前回のべたように、TCP/IP接続の場合は実 際にはsockaddr_inという構造体に値をセットし、connect()への引数としては sockaddr *にキャストするということを行います。sockaddr_inには前回説明 した通り、アドレスファミリとアドレスを指定します。アドレスは 前回はINADDR_ANYを指定していましたが、クライアントがサーバに接続する際 には相手のサーバをポート番号とホスト名(またはIPアドレス)ではっきり指定 しなければなりません。
ポート番号はpostmasterの待ち受けポート番号をhtons()を使ってネットワークバイトオーダーに変換して設定します。
文字列のホスト名はそのままconnect()に渡すことはできないので、 gethostbyname()を使ってIPアドレスに変換します。
こうして作成した引数をconnect()に渡します。成功すればファイルディスクリプタが返却されます。このファイルにディスクリプタにread()やwrite()を使ってデータの読み書きを行えばそのままサーバとデータの送受信ができます。
こうしてread()やwrite()を使ってサーバと通信ができるようになったとは言え、普通のファイルの読み書きとはまた違った考慮がネットワーク通信では必要です。
ネットワーク上の通信では、回線が物理的に切断したり、あるいはサーバがダ ウンするなど、いつ何時通信路が閉鎖されるかわかりません。きちんとエラー チェックを行うことが必要です。なお、切断したソケットに書き込みを行うと、 SIGPIPEというシグナルが発生し、プロセスが強制終了させられますが、後始 末をなどを考えるとあまり都合の良い動作ではありません。SIGPIPEを無視す る設定をし、read()やwrite()でエラーを検知した際にきめ細かなエラー処理 をするようにした方が良いでしょう。
ソケットをread()すると、指定したバイト数が読み出せないことがありますが、 これはエラーではありません。このような場合は、取得できなかったバイト数 を再度読み出さなければなりません。
通常のファイルへの書き込みと違って、ソケットへのread()やwrite()は非常 に遅くなります。これはどちらかというと、データ量よりもシステムコールの 発行回数が問題になります。また、read()に関してはカーネルのバッファリン グがあるので、せいぜいシステムコールを呼び出すオーバヘッドが問題になる くらいですが、write()のオーバヘッドは深刻です。バッファリング機能を持 つ標準入出力ライブラリを使う手もありますが、read()の際に前もってデータ が届いているかどうかをチェックできないのがこまり者です。そこでpgpool-IIでは自前のread()やwrite()のラッパー関数を書いています。
ソケットインターフェイスのaccept(サーバの場合)やconnect(クライアントの
場合)が返すファイルディスクリプタは全二重通信、すなわち1本のストリーム
で読み出しも書き込みもできます。前述の理由で読み出しに関しては独自のバッ
ファリングを実装しますが、書き込みに関しては標準入出力関数を使わない理
由は特にないので、ソケットのファイルディスクリプタからfdopen()を使って
標準入出力関数が利用できる形式のファイルディスクリプタを作成します。図にpgpool-IIにおける入出力関数の構造を示します。
pgpool-IIのラッパー関数はすべてpool_stream.cにまとめてあります。提供しているインターフェイスは以下のものです。
ソケットインターフェイスの返したファイルディスクリプタから書き込み用の標準入出力インターフェイスのファイルディスクリプタを生成し、POOL_CONNECTION構造体にセットして返却します。
POOL_CONNECTION構造体はpool.hに以下のように定義されており、pgpoolのストリーム入出力関数はすべてこの構造体をインターフェイスとして使用します。
typedef struct {
int fd; /* fd for connection */
FILE *write_fd; /* stream write connection */
char *hp; /* pending data buffer head address */
int po; /* pending data offset */
int bufsz; /* pending data buffer size */
int len; /* pending data length */
char *sbuf; /* buffer for pool_read_string */
int sbufsz; /* its size in bytes */
char *buf2; /* buffer for pool_read2 */
int bufsz2; /* its size in bytes */
int isbackend; /* this connection is for backend if non 0 */
int issecondary_backend; /* this connection is for secondary backend if non 0 */
char tstate; /* transaction state (V3 only) */
/*
* following are used to remember when re-use the authenticated connection
*/
int auth_kind; /* 3: clear text password, 4: crypt password, 5: md5 password */
int pwd_size; /* password (sent back from frontend) size in host order */
char password[MAX_PASSWORD_SIZE]; /* password (sent back from frontend) */
char salt[4]; /* password salt */
/*
* following are used to remember current session paramter status.
* re-used connection will need them (V3 only)
*/
ParamStatus params;
int no_forward; /* if non 0, do not write to frontend */
} POOL_CONNECTION;
pool_open()で開いたストリームを閉じます。
指定したバイト数分ストリームから読み込み、bufに返します。 POOL_CONNECTION構造体の、hp, po, bufsz, lenを使って内部的にバッファリ ングをしており、なるべく読み込み回数を減らすようにしています。このとき 読み込みすぎたデータは「ペンディングデータ」として内部的に扱い、次回の pool_read()の呼び出しで利用されます。
指定したバイト数分ストリームから読み込み、内部的な読み込みバッファのア ドレスを返します。pool_read()との違いは、lenで指定したバイト数分だけし か読み込みを行わないことです。したがって、バイト数が少なく、しかも固定 長の読み込みにはpool_read()、そうでない場合にはpool_read2()を使うよう にします。
fwrite()を使ってストリームに書き込みます。
pool_write()は標準入出力ライブラリのバッファに書き込むだけです、 pool_flush()はfflush() を呼び出し、実際にネットワークにデータが送出されることを保証します。
pool_write()とpool_flush()のコンビネーションです。
改行またはNULLが来るまでストリームから文字列を読み込みます。V3プロトコルでは使用されません。
では実際にPostgreSQLがどのようにしてフロントエンドとバックエンドの間で通信を行っているかを簡単に見てみましょう。接続の開始から終了までの大まかな流れは図のようになります。
もっと詳しいことを知りたい方は、PostgreSQL付属マニュアルの "Frontend/Backend Protocol"、あるいは日本PostgreSQLユーザ会が配布する日本語版の「フロントエンド/バックエンドプロトコル」をご覧下さい。
このあたりの詳細な処理は、pgpoolではpool_process_query.cに書いてあるので、興味がある方はソースコードをご覧下さい。
まず最初にフロントエンドは図のような構造を持つ「スタートアップメッセージ」を送ります。
最初の4バイトは、自身を含むメッセージ全体のバイト数です。次の4バ イトはこの通信プロトコルのバージョンで、頭16ビットが「メジャーバージョ ン」で、V3なら3です。後の16ビットは「マイナーバージョン」で、今のとこ ろ0です。この後項目名文字列とその値文字列のペアが続きます。図ではuser の次にdatabaseが来ていますが、この順番は保証されていません。databaseや optionsは省略可能です。databaseが省略された場合はユーザ名と同じ名前の データベースを使うものと見なされます。
pgpoolではスタートアップメッセージの処理はchild.cに定義されたread_startup_packet()という関数に記述されています。
バックエンド側では、データベース名やpg_hba.confを参照してフロントエンドからの接続要求を受け入れるかどうかを決定します。もし問題なければ次のステップに移ります。
もしTRUST認証、すなわちパスワード設定がされていない場合は、図の「AuthenticationOk」メッセージがフロントエンドに送信されてきます。
頭1バイトは'R'という文字で、認証要求を表します。次の4バイトは自身を含 むメッセージのバイト数です。このように、頭1バイトがメッセージの種類で、 次の4バイトがメッセージの長さになっているのはほかのメッセージも共通で す(ただし、スタートアップメッセージを除く)。そして最後が4バイトの0です。
パスワードが必要なければ次のステップに進むことができますが、md5認証な
どではパスワードをやり取りしなければなりません。この場合図の「AuthenticationMD5Password」メッセージがフロントエンドに送られてきます。
AuthenticationMD5Password」メッセージが送られてきたら、返答として図の「PasswordMessage」をバックエンドに返却します。
MD5認証ではパスワードを生のままで送るのではなく、「AuthenticationMD5Password」メッセージに含まれるソルトを使ってMD5メッセージ化したものを送るようにします。こうすれば、ネットワーク上の盗聴にも対処できます。
もしパスワードがOKならば、AuthenticationOkメッセージがバックエンドから送られてきます。
次は「パラメータステータス」(ParameterStatus: 図)をバックエンドは送信してきます。
パラメータステータスとは、そのセッションの設定値を示す値です。 SETコマンドで設定するとこのメッセージが送られてきますが、セッションの開始にもセッションのデフォルト値がまとめて送られてきます。 今のところ、以下のようなパラメータステータスが送信されてきます。
| 名前 | 典型的な値 | 説明 |
|---|---|---|
| client_encoding | EUC_JP | クライアントのエンコーディング |
| DateStyle | ISO, MDY | 日付データ形式 |
| is_superuser | off | スーパユーザかどうか |
| server_version | 8.4.0 | PostgreSQLバックエンドのバージョン |
| session_authorization | t-ishii | セッションユーザ名 |
次はBackendKeyDataメッセージです(図)。
バックエンドから送られてくるこのメッセージには、バックエンドのプロセスIDと「秘密鍵」が含まれます。
フロントエンドは、後で実行中の問い合わせを中断したくなったらこのプロセスIDと秘密鍵を送信しなければなりません。
このようにして、勝手に問い合わせがキャンセルされることを防いでいます。
すべてがOKならば、最後の締めくくりにバックエンドからReadyForQueryメッセージが送られてきます(図)。
このメッセージを受け取るまでは、フロントエンドは問い合わせを送信してはいけません。
ReadyForQueryはスタートアップ時だけでなく、一つの問い合わせ処理が終わる度にバックエンドから送信されてきます。
ReadyForQueryメッセージの重要な役割は、現在のトランザクションの状態を報告することです。 状態は1バイトの文字で報告されます。
| 状態文字 | 説明 |
|---|---|
| I | アイドル状態 |
| T | トランザクションブロック内 |
| E | エラー中のトランザクション |
PostgreSQLが受け付ける問い合わせにいろいろな種類があります。
| 形式 | 説明 |
|---|---|
| 簡易問い合わせ | 問い合わせに対して直ちに結果が返る |
| 拡張問い合わせ | Parse/Bind/Executeによって問い合わせを実行 |
| COPY | COPYコマンドで使用 |
| 関数呼び出し | 指定関数の実行 |
ここでは「簡易問い合わせ」と呼ばれる問い合わせに対して直ちに結果が返るタイプのものを説明します。 例題として使う問い合わせは"SELECT 1"という極めて単純なものです。
問い合わせは、Queryメッセージをフロントエンドが発行するところから始まります(図)。
首尾よく問い合わせが実行されると、まず結果のデータの構造を表すRowDescriptionメッセージ(図)がバックエンドから送られてきます。
このメッセージは少々複雑で、検索結果のデータに関する詳細な情報を含んでいます。
メッセージタイプT、メッセージのバイト長の後にまず結果の列数があります。 後のフィールドはその数分だけの情報があります。
RowDescriptionの次は結果の行数分だけDataRowメッセージ(図)が返ります。
なお、データがNULLの場合は、データ長は-1、その後に続くはずの「データ」はありません。
問い合わせ結果が無事に返ると、次にCommandCompleteメッセージ(図)がバックエンドから送られてきます。
「コマンドタグ」はどのSQLコマンドが完了したかを表す文字列です。
たとえばSELECTなら"SELECT"、VACUUMなら"VACUUM"となります。
INSERTコマンドなどでは挿入された行数、行にアサインされたOIDなどの付加情報が付与されます。
今回は間違いが起きようもないSELECT文ですが、万が一エラーが起きたらどうなるのでしょう?
そういう場合はErrorResponseメッセージ(図)が返ります。
フィールド識別コードには以下のものがあります。
| フィールド識別コード | 意味 | フィールド値の例 |
|---|---|---|
| S | エラーの深刻度 | ERROR |
| C | エラーコード | 22021 |
| M | エラーメッセージ | could not create unique index |
| D | 詳細メッセージ | Table contains duplicated values. |
| H | ヒント | Please REINDEX it. |
| P | エラーの位置 | 92 |
| F | ソースファイル名 | scan.c |
| L | 行番号 | 110 |
| R | 関数名 | foo |
こうして一つの問い合わせが処理されると再びReadyForQueryメッセージをバッ クエンドは送信し、問い合わせを受け付けることができるようになります。
接続を終了したい場合は、Terminateメッセージ(図)をフロントエンドからバックエンドに送信します。
2回にわたってネットワークプログラミングについて解説しました。 ネットワークプログラミングは慣れないとなかなか分かりにくい面もありますが、プログラミングのテクニックを磨く上では最高の題材だと思います。 また、今の世の中ネットワークは避けて通れない存在です。 最初から難しく考える必要はありません。 実はpgpoolも最初は300行ほどの小さなプログラムから出発し、今ではそれなりの機能を持つサーバソフトへと成長しました。 皆さんもこれを機会に簡単なネットワークプログラムを作ってみてはどうでしょう。