同期機構/排他処理

マルチスレッドで処理する場合、複数スレッドから同時に同じ変数を読み書きしないようにする必要があります。 これを適切に管理・実現するため、様々な同期処理が必要になる場合があります。
手法については様々な物がありますが、ここではランタイムが持つ同期機構として以下について整理していきます。
  • 単独の synchronized
  • Mutex
  • ReadWriteMutex
  • Barrier
  • Condition
  • Semaphore
  • Event
Examples: synchronized 文を用いて単独のクリティカルセクションを構成する例です。
主に std.parallelismparallel など、複数スレッドから1つの処理が同時に呼び出されるときに利用します。
See_Also: https://dlang.org/spec/statement.html#synchronized-statement
import std.parallelism : parallel;
import std.range : iota;

size_t points;
foreach (i; iota(10).parallel())
{
    // synchronized文はそれだけでクリティカルセクションを構成します
    synchronized
    {
        // ここはsynchronized文の中なので、複数スレッドで同時に実行されることはありません
        points += 10;
    }
}
Examples: Mutex クラスを用いて複数のクリティカルセクションをグループ化し、同期させる例です。
See_Also: https://dlang.org/phobos/core_sync_mutex.html
import core.thread : Thread;
import core.sync.mutex : Mutex;

size_t points;
auto mutex = new Mutex;

// 複数のスレッドから points 変数を操作するため、同時実行を防ぐ必要があります。
// synchronized 文に同じ Mutex オブジェクトを指定することで、排他される範囲をグループ化することができます。
auto t1 = new Thread({
    synchronized (mutex)
    {
        points += 10;
    }
});
auto t2 = new Thread({
    synchronized (mutex)
    {
        points += 20;
    }
});
t1.start();
t2.start();
t1.join();
t2.join();

assert(points == 30);
Examples: ReadWriteMutex クラスを使い、書き込みと読み取りの排他を分けることで効率化する例です。
読み取り同士では排他せず、読み取りと書き込み、書き込み同士のときに排他することで、 単純な Mutex による排他よりもスループットが向上することが期待できます。
読み取り書き込み
読み取り排他なし排他あり
書き込み排他あり排他あり
See_Also: https://dlang.org/phobos/core_sync_rwmutex.html
import core.thread : ThreadGroup;
import core.sync.rwmutex : ReadWriteMutex;

size_t points;
auto rwMutex = new ReadWriteMutex;

auto tg = new ThreadGroup;
tg.create({
    synchronized (rwMutex.reader) // 排他時に読み取り処理であることを明示します
    {
        auto temp = points; // 値を読み取るのみです
    }
});
tg.create({
    synchronized (rwMutex.reader) // 排他時に読み取り処理であることを明示します
    {
        auto temp = points; // 値を読み取るのみです
    }
});
tg.create({
    synchronized (rwMutex.writer) // 排他時に書き込み処理であることを明示します
    {
        points += 10; // 値を読み書きします
    }
});
tg.create({
    synchronized (rwMutex.writer) // 排他時に書き込み処理であることを明示します
    {
        points += 20; // 値を読み書きします
    }
});
tg.joinAll();
Examples: Barrierで、指定した数のスレッドが所定のポイントに達するまで待機する例です。
See_Also:
import core.thread: ThreadGroup;
import core.sync.barrier: Barrier;

int test;

// 2つのスレッドを同期
auto b = new Barrier(2);

auto tg = new ThreadGroup;
// スレッド1
tg.create({
    test = 1;
    b.wait();
});
// スレッド2
tg.create({
    // スレッド1がtest変数を1にするまで待つ
    b.wait();
    assert(test == 1);
});
tg.joinAll(true);
Examples: Conditionで、通知による同期を行う例です。
Conditionのコンストラクタに渡したMutexをロックしているときにwaitすると、そのMutexのロックを解除して待機状態に入ります。 その後別スレッドがMutexをロックすると、notifyされても、Mutexがロック解除されるまでは待機を維持します。
逆に、Conditionのコンストラクタに渡したMutexのロック解除中にwaitした場合、単にほかのスレッドからnotifyされるまで待機します。
使用例として、Producer Consumerパターン内で、Guarded Suspensionパターンでの同期処理を行う場合に利用できます。 See_Also:
import core.thread: ThreadGroup;
import core.sync.mutex: Mutex;
import core.sync.condition: Condition;
import core.sync.barrier: Barrier;

auto m = new Mutex;
auto c = new Condition(m);
auto b = new Barrier(2);

int test;

auto tg = new ThreadGroup;
// スレッド1
tg.create({
    // ➀mをロック
    synchronized (m)
    {
        // スレッド2が開始されるまで待つ
        b.wait();
        // ➂cをwaitと同時にmをロック解除
        c.wait();
        // mをロックしたことをスレッド2に知らせる
        b.wait();

        assert(test == 1);
        test = 2;
    }
});
// スレッド2
tg.create({
    // mがロックされるまで待つ
    b.wait();
    // ➁cがwaitしてロック解除するまで待機
    synchronized (m)
    {
        // ➃注意:ここで即時➂でのwaitが解除されるわけではありません
        c.notify();
        assert(test == 0);
        test = 1;
    }
    // synchronized を抜けた(mのロック解除した)ここで➂のwaitが解除されます
    // mがロックされるまで待つ
    b.wait();
    synchronized (m)
        assert(test == 2);
});
tg.joinAll(true);
Examples: Semaphoreで、通知による同期を行う例です。
Conditionよりも単純で、Mutexとの関係を気にする必要はありません。 Semaphoreは内部にカウント値を持っており、notifyでカウント値は+1されます。 waitではカウント値が0より大きくなるまで制御をブロックし、カウント値が0より大きくなったら制御を戻しつつカウント値を-1します。
共有資源がある場合、通知する側はnotify以降の共有資源へのアクセス、通知される側はwait以前の共通資源へのアクセスについて、互いに競合しないよう排他する必要があります。 Semaphoreでは通知する側はnotifyより前に共有資源の編集を終え、通知される側はwait以降に共有資源を使うようにすればよいでしょう。
See_Also:
import core.thread: ThreadGroup;
import core.sync.semaphore: Semaphore;

auto s = new Semaphore(0);
int test;

auto tg = new ThreadGroup;
// スレッド1
tg.create({
    // 長い処理のあと終わったことを通知
    test = 1;
    s.notify();
});
// スレッド2
tg.create({
    // スレッド1の処理が終わるまで待つ
    s.wait();
    assert(test == 1);
});
tg.joinAll(true);
Examples: Eventで、通知による同期を行う例です。
ConditionやSemaphoreよりも単純です。ただのフラグと考えればよいです。 Eventは内部にフラグを持っており、setでtrue(シグナル状態)に、resetでfalse(非シグナル状態)になります。 waitでは、フラグを見てfalseならtrueになるまで制御をブロックします。ここでEventのコンストラクタで指定する manualReset がfalseの場合、waitで制御を返した後、フラグをfalseに自動的に戻します。 Eventのコンストラクタの initialState は、このフラグの初期状態を表します。
共有資源がある場合、通知する側はset以降の共有資源へのアクセス、通知される側はwait以前の共通資源へのアクセスについて、互いに競合しないよう排他する必要があります。 Eventでは通知する側はsetより前に共有資源の編集を終え、通知される側はwait以降に共有資源を使うようにすればよいでしょう。
See_Also:
import core.thread: ThreadGroup;
import core.sync.event: Event;
import std.random: uniform;

// マニュアルリセットなので、勝手に非シグナル状態に戻らない。
// 初期状態は非シグナル状態なので、setするまでwaitは制御をブロックする。
auto ev = Event(true, false);
int testA;
int testB;
int testC;

// ここではスレッドを3つ立ち上げる
// Semaphoreだとスレッド1で2回notifyが必要だが、Eventなら1回でOK
auto tg = new ThreadGroup;
// スレッド1
tg.create({
    // 長い処理のあと終わったことを通知
    testA = uniform(1, 10);
    ev.set();
});
// スレッド2
tg.create({
    // スレッド1の処理が終わるまで待つ
    ev.wait();
    assert(testA > 0);
    testB = testA + 1;
});
// スレッド3
tg.create({
    // スレッド1の処理が終わるまで待つ
    ev.wait();
    assert(testA > 0);
    testC = testA + 2;
});
tg.joinAll(true);
assert(testA + 1 == testB);
assert(testA + 2 == testC);