Bo2SS

Bo2SS

ファイルベースのプロセス間通信——100個のプロセスで合計を計算する

要求の説明#

  1. 同時実行度INSを設定し、起動するプロセスの数を示す
  2. このINS個のプロセスを使用して、startからendまでの数字の合計を計算する
  3. startendgetoptを使用してコマンドライン引数から取得する
./a.out -s 12 -e 24
  1. 整数結果を出力する:sum

[注意]

  • 主にファイルおよびプロセス関連の操作に関わる
  • ファイルを使用してデータを共有する場合、データ競合(data race)を考慮する必要がある
  • ファイルロックを使用してスレッド間のミューテックスを模倣する
  • ファイルロックを通じてクリティカルデータ(複数のプロセスまたはスレッドが競合して変更するデータ)への同期アクセスを実現する
  • flock を学ぶ必要がある:man 2 flock

最終結果#

  • 100 個のプロセスを使用して 1 から 1000 までの合計を計算し、結果の一部を以下に示す:
    • 画像
    • 画像
    • プロセスが同じデータ上で合計を計算することに成功した

実装プロセス#

思考フローチャート#

  • 画像
  • 親プロセスと子プロセスのタスクを把握する
  • 重要:複数プロセスが同じファイルにアクセスするためのロック操作により、データの読み書きを「原子操作」とする [不可分割の最小単位]
    • 原子操作として理解できるが、本質的にはデータの読み書きプロセスを完全にするだけである
    • プロセスはタイムスライスが終了することで中断される可能性があるが、ロックが存在するため、この時他のプロセスはこれらのデータにアクセスできない

コマンドライン引数の取得#

-s、-e オプションをキャッチし、このオプションには必ず引数を伴う必要がある

#include "head.h"
int main(int argc, char **argv) {
    int opt, start = 0, end = 0;
    while ((opt = getopt(argc, argv, "s:e:")) != -1) {
        switch (opt) {
            case 's':
                start = atoi(optarg);  // atoi: 文字列->整数
                break;
            case 'e':
                end = atoi(optarg);
                break;
            default:
                fprintf(stderr, "Usage : %s -s start_num -e end_num\n", argv[0]);
                exit(1);
        }
    }
    printf("start = %d\nend = %d\n", start, end);
    return 0;
}
  • ヘッダーファイル "head.h" は末尾に記載
  • atoi:文字列👉整数、optarg は文字配列
  • 結果は以下の通り:
    • 画像
    • 🆗

INS 個のプロセスを作成#

fork を使用して INS 個のプロセスを作成し、wait を使用してゾンビプロセスの発生を防ぐ

#define INS 100
pid_t pid;
int x = 0;        // x: 第何号プロセス
for (int i = 1; i <= INS; i++) {
    if ((pid = fork()) < 0) {
        perror("fork");
        exit(1);  // 便宜上のため、実際の作業ではこのようにしない
    }
    if (pid == 0) {
        x = i;    // 子プロセスに番号を付ける
        break;    // 重要、さもなければ無限に入れ子になる
    }
}
if (pid != 0) {
    // ゾンビプロセスの発生を防ぐ [すべての子プロセスが終了するのを待つ]
    for (int i = 1; i <= INS; i++) {
        wait(NULL);
    }
    // 親プロセス
    printf("I'm parent!\n");  
} else {
    printf("I'm %dth child!\n", x);
}
  • このコードは、コマンドライン引数を取得した後のメイン関数内に配置
  • INS はマクロとして定義
  • 子プロセスの作成に失敗した場合は直接 exit (1) するのは便宜上のため、実際の作業では避けるべき
  • 結果は以下の通り:
    • 画像
    • 100 個の子プロセスを成功裏に作成

ファイルベースのデータ読み書きインターフェース#

ファイルをプロセス間で共有データの媒体として使用

  • ファイルにデータをどのように保存するか?ASCII コード [文字]、int [低 16 ビット + 高 16 ビット]...
  • ここでは構造体を使用してデータを保存し、構造が明確である
    • 画像
    • 加数、合計を保存
char data_file[] = "./.data";
char lock_file[] = "./.lock";  // [オプション] 専用のロックを設定
// 伝達するデータ
struct Msg {
    int now;                   // 加数
    int sum;                   // 合計
};
struct Msg data;               // 構造体データ
// 構造体データを書き込む
size_t set_data(struct Msg *msg) {
    FILE *f = fopen(data_file, "w");                         // 書き込み
    if (f == NULL) {
        perror("fopen");
        return -1;                                          // 小さな関数内でexitするのはあまりにも粗野
    }
    size_t nwrite = fwrite(msg, 1, sizeof(struct Msg), f);  // 毎回1バイト書き込む
    fclose(f);
    return nwrite;             // 成功したバイト数を返す、エラーがあれば上位に返す
}
// 構造体データを読み込む
size_t get_data(struct Msg *msg) {
    FILE *f = fopen(data_file, "r");
    if (f == NULL) {
        perror("fopen");
        return -1;
    }
    size_t nread = fread(msg, 1, sizeof(struct Msg), f);    // 構造体データをmsgに読み込む
    fclose(f);
    return nread;
}
  • グローバル変数 data を作成し、プロセス内でデータを操作する
  • 標準ファイル操作を利用し、低レベルのファイル操作も可能
  • 戻り値は呼び出し元が読み書きの成功を確認するために使用できる

ロックを追加⭐#

プロセスが共有データを維持するために奪い合い、データファイルが同時に操作されないように保護する

【2 つのアプローチ】1 つのファイルを使用する;2 つのファイルを使用する

  • アプローチ 1:データファイルに直接ロックをかける
char data_file[] = "./.data";
// 加算 [原子操作:読み込み + 書き込み];end:加算の停止条件;id:子の番号 [神の視点で監視可能]
void do_add(int end, int id) {
    // 子はずっと加算を行う
    while (1) {
        /*
         * アプローチ1:1つのファイル、データファイルに直接ロックをかける
         */
        // data_fileを開いてロックをかける
        FILE *f = fopen(data_file, "r");
        // ミューテックスロックをかける
        flock(f->_fileno, LOCK_EX);
        // ファイルからデータを読み込む [get_data関数内でdata_fileファイルを再度開くため、新しいfdに対応し、ロックは共有されない]
        if (get_data(&data) < 0) continue;
        // 加数+1し、加数が範囲を超えないか確認する
        if (++data.now > end) {
            fclose(f);
            break;
        }
        // 加算を行う
        data.sum += data.now;
        printf("The <%d>th Child : now = %d, sum = %d\n", id, data.now, data.sum);
        // データをファイルに書き込む
        if (set_data(&data) < 0) continue;
        // ロックを解除する [後で閉じても自動的にロックは解除される]
        flock(fileno(f), LOCK_UN);
        fclose(f);
    }
}
  • 関数の引数:end は加算の停止条件の参照、id は各加算がどの子によって行われたかを観察するために使用できる
  • ロック 👉 ロック解除の過程は原子操作[不可分割の最小単位] である
    • データの読み込み、計算、データの書き込みをカプセル化し、その過程でデータが奪われることはない
  • ファイルポインタ FILE* f からファイル記述子 fd を取得する
    • ① f->_fileno
    • ② fileno(f)
  • [PS]
    • 同じファイルを再度開くと異なるファイル記述子が得られ、ロックも相互に独立している
    • ファイルを閉じると自動的にロックが解除される
    • 各読み書きインターフェースの呼び出し後に、戻り値を利用して操作の成功を確認する
  • アプローチ 2:専用のファイルをロックとして設定
char data_file[] = "./.data";
char lock_file[] = "./.lock";  // 専用のロックを設定
void do_add(int end, int id) {
    while (1) {
        /*
         * アプローチ2:2つのファイルを使用し、専用のファイルをロックとして使用する [理解しやすい]
         */
        // ロックファイルを開くまたは作成する;ファイルがロックされている場合、使用者がロックを解除するのを待つ
        FILE *lock = fopen(lock_file, "w");  // "w":ファイルが存在しない場合、作成される
        if (lock == NULL) {
            perror("fopen");
            exit(1);
        }
        // ロックをかける
        flock(lock->_fileno, LOCK_EX);
        // ファイルからデータを読み込む
        if (get_data(&data) < 0) {
            fclose(lock);                    // ロックファイルを閉じてロックを解除
            continue;
        }
        // 加数+1し、加算停止条件を満たすか確認する
        if (++data.now > end) {
            fclose(lock);
            break;
        }
        // 加算を行う
        data.sum += data.now;
        printf("The <%d>th Child : now = %d, sum = %d\n", id, data.now, data.sum);
        // データをファイルに書き込む
        if (set_data(&data) < 0) continue;
        // ロックを解除
        flock(lock->_fileno, LOCK_UN);
        fclose(lock);
    }
}
  • lock_file は単にロックを行うためのものである
  • 結果は以下の通り:【単核、5 つのプロセス、1~100 を計算】
    • 画像
    • 画像
    • 単核の効果は多核よりも整然としている
      • 単核は一度に 1 つのプロセスしか実行できない
      • usleep () を使用してプロセスを事前に一時停止させ、1 つのプロセスがそれほど長く計算しないようにし、順序をより乱すことができる
    • 出力を more に渡すと、出力がプロセスごとに区別され、再配置されて表示される
  • 【注意】
    • メイン関数内でデータの初期値をファイルに書き込む必要がある、さもなければファイルは空である [完全なコードを参照]
    • メイン関数の子プロセスロジック内で do_add () 関数を呼び出すだけで、親プロセスロジックではすべての子プロセスが終了するのを待ってからデータファイルから最終結果を取得して出力する
  • ❗ ロックをかけなくても、結果は依然として正しい
    • 加数と合計は一緒にカプセル化されており、加算は間違いを起こさない
    • しかし、各プロセスは結果を完全に計算するため、バッファのせいか?そうではない
      • すべての書き込み操作の後に fflush を追加しても、いくつかの計算が続く場合があるが、各プロセスは最後の正しい結果に到達する
      • つまり、あるプロセスが計算を終え、データをファイルに書き込んでも、別のプロセスがまだ最新のデータを読み込んでいないため、再度合計を計算することになる
    • 説明
      • 複数のプロセスが同じファイルを開くと、各プロセスは独自のファイルテーブルエントリ(file オブジェクト)を持ち、それぞれ独自のファイルオフセットを含む
      • したがって、複数のプロセスが同じファイルを読み取ることは正しく機能するが、同じファイルに書き込むことは予期しない結果を生じる可能性がある。pread、pwrite の使用を参照することができる
      • また、Linux での複数プロセスによるファイル操作を参照することもできる ——cnblogs

完全なコード#

sum.c#

#include "head.h"
#define INS 100
char data_file[] = "./.data";
char lock_file[] = "./.lock";  // [オプション] 専用のロックを設定
// 伝達するデータ
struct Msg {
    int now;                   // 加数
    int sum;                   // 合計
};
struct Msg data;               // 構造体データ
// 構造体データを書き込む
size_t set_data(struct Msg *msg) {
    FILE *f = fopen(data_file, "w");                         // 書き込み
    if (f == NULL) {
        perror("fopen");
        return -1;                                          // 小さな関数内でexitするのはあまりにも粗野
    }
    size_t nwrite = fwrite(msg, 1, sizeof(struct Msg), f);  // 毎回1バイト書き込む
    fclose(f);
    return nwrite;             // 成功したバイト数を返す、エラーがあれば上位に返す
}
// 構造体データを読み込む
size_t get_data(struct Msg *msg) {
    FILE *f = fopen(data_file, "r");
    if (f == NULL) {
        perror("fopen");
        return -1;
    }
    size_t nread = fread(msg, 1, sizeof(struct Msg), f);    // 構造体データをmsgに読み込む
    return nread;
}
// 加算 [原子操作:読み込み + 書き込み];end:加算停止条件;id:子の番号 [神の視点で監視可能]
void do_add(int end, int id) {
    // 子はずっと加算を行う
    while (1) {
        /*
         * アプローチ2:2つのファイルを使用し、専用のファイルをロックとして使用する [理解しやすい]
         */
        // ロックファイルを開くまたは作成する;ファイルがロックされている場合、使用者がロックを解除するのを待つ
        FILE *lock = fopen(lock_file, "w");  // "w":ファイルが存在しない場合、作成される
        if (lock == NULL) {
            perror("fopen");
            exit(1);
        }
        // ロックをかける
        flock(lock->_fileno, LOCK_EX);
        // ファイルからデータを読み込む
        if (get_data(&data) < 0) {
            fclose(lock);                    // ロックファイルを閉じてロックを解除
            continue;
        }
        // 加数+1し、加算停止条件を満たすか確認する
        if (++data.now > end) {
            fclose(lock);
            break;
        }
        // 加算を行う
        data.sum += data.now;
        printf("The <%d>th Child : now = %d, sum = %d\n", id, data.now, data.sum);
        // データをファイルに書き込む
        if (set_data(&data) < 0) continue;
        // ロックを解除
        flock(lock->_fileno, LOCK_UN);
        fclose(lock);
        /*
         * アプローチ1:1つのファイル、データファイルに直接ロックをかける
         */
        /*
        // data_fileを開いてロックをかける
        FILE *f = fopen(data_file, "r");
        // ミューテックスロックをかける
        flock(f->_fileno, LOCK_EX);
        // ファイルからデータを読み込む [get_data関数内でdata_fileファイルを再度開くため、新しいfdに対応し、ロックは共有されない]
        if (get_data(&data) < 0) continue;
        // 加数+1し、加数が範囲を超えないか確認する
        if (++data.now > end) {
            fclose(f);
            break;
        }
        // 加算を行う
        data.sum += data.now;
        printf("The <%d>th Child : now = %d, sum = %d\n", id, data.now, data.sum);
        // データをファイルに書き込む
        if (set_data(&data) < 0) continue;
        // ロック [後で閉じても自動的にロックは解除される]
        flock(fileno(f), LOCK_UN);
        fclose(f);
        */
    }
}
int main(int argc, char **argv) {
    int opt, start = 0, end = 0;
    while ((opt = getopt(argc, argv, "s:e:")) != -1) {
        switch (opt) {
            case 's':
                start = atoi(optarg);       // atoi: 文字列->整数
                break;
            case 'e':
                end = atoi(optarg);
                break;
            default:
                fprintf(stderr, "Usage : %s -s start_num -e end_num\n", argv[0]);
                exit(1);
        }
    }
    printf("start = %d\nend = %d\n", start, end);
    // 先に初期データをファイルに書き込む
    if (set_data(&data) < 0) return -1;     // dataはグローバル変数で、メンバーはデフォルトで0
    pid_t pid;
    int x = 0;                              // x: 第何号プロセス
    for (int i = 1; i <= INS; i++) {
        if ((pid = fork()) < 0) {
            perror("fork");
            exit(1);                        // 便宜上のため、実際の作業ではこのようにしない
        }
        if (pid == 0) {
            x = i;                          // 子プロセスに番号を付ける
            break;                          // 重要、さもなければ無限に入れ子になる
        }
    }
    if (pid != 0) {
        // ゾンビプロセスの発生を防ぐ [すべての子プロセスが終了するのを待つ]
        for (int i = 1; i <= INS; i++) {
            wait(NULL);
        }
        if (get_data(&data) < 0) return -1; // 最終結果を取得
        printf("sum = %d\n", data.sum);
    } else {
        do_add(end, x);                     // 子プロセスの唯一の仕事
    }
    return 0;
}

head.h#

#ifndef _HEAD_H
#define _HEAD_H
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <sys/file.h>
#endif
  • 余分なヘッダーファイルがあるかもしれないが、重要ではない

参考#

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。