-title: プロセス間の同期・通信
Java のスレッドを用いた通信と同期について調べる。
この実験では、Java 1.5 を用いる。
--共有資源と競争状態
資源を共有して、作業する二つのスレッドを定義しよう。
以下のSimpleWorker は、共有資源のintをcount 回分、incrementする。
Race Condition (競合状態) を明示的に起こすために yield(); を
挿入しているが、これは、必須ではない。
Class ProcessExample.SimpleWorker
package ProcessExample;
public class SimpleWorker extends Thread {
String name;
int count;
int myResource;
SimpleWorker sharedResource;
SimpleWorker() {
// empty
}
public SimpleWorker(String _name,int count_,SimpleWorker w) {
String msg;
name = _name; sharedResource = w;
count = count_;
msg = "Thread "+name+" created.";
myResource = 0;
System.out.println(msg);
}
public void run() {
while(count < 0 || count-- > 0) {
sharedResource.work();
}
}
public void work() {
int x;
yield();
x = myResource;
yield();
x = x + 1;
yield();
myResource = x;
yield();
}
public void setResource(int x) {
myResource = x;
}
public int getResource() {
return myResource;
}
}
あとで、このクラスを拡張するために、単純Constructorと、
資源へのアクセサ (accessor method) が用意されている。
Class ThreadRace では、SimpeWorker の一つ(t1)を受動的な資源
として用意し、t2, t3 から平行的にアクセスする。
Class ProcessExample.ThreadRace
package ProcessExample;
public class ThreadRace {
static public void main(String [] args) throws InterruptedException {
SimpleWorker t1 = new SimpleWorker("t1",-1,null); // does not run
SimpleWorker t2 = new SimpleWorker("t2",5,t1);
SimpleWorker t3 = new SimpleWorker("t3",5,t1);
t2.start();
t3.start();
t2.join(0);
t3.join(0);
String msg = "Thread t1 "+Integer.toString(t1.getResource());
System.out.println(msg);
}
}
join(0) では、対応するスレッドの終了を待っている。
このmain() メソッドを動かして、共有資源の値がどうなるかを調べよ。
yield() を取り去った場合の動作はどうか?
--相互排除
相互排除を実現するためには、そのためのハードウェア的なサポートが必要で
ある。そのための基本的な命令が機械語レベルで用意されていることもある。
Java 言語では、synchronized という構文が用意されており、メソッド単位、
ステートメント単位で、モニタを定義することが出来る。モニタを実現する
のは、Java VM の役割である。
synchronized で得られるモニタは、インスタンスに対応して一つ
であり、他のスレッドがモニタを使用している場合は、そのスレッドが
モニタを解放するまで待つことになる。
SimpleWorkerのmethod、あるいは、ステートメントに synchronized 構文を
付加して、競合状態を避け、正しい結果が得られるように変更せよ。
---モニタの及ぶ範囲
以下の例題は、何故か、うまく競合条件を避けるように動作しない。
ThreadRace の初期化を
SimpleWorker t1 = new BadWorker("t1",-1,null); // does not run
SimpleWorker t2 = new BadWorker("t2",5,t1);
SimpleWorker t3 = new BadWorker("t3",5,t1);
に変更して動作させよう。(BadWorker t1 である必要はない。何故か?)
Class ProcessExample.BadWorker
package ProcessExample;
public class BadWorker extends SimpleWorker {
public BadWorker(String _name,int count_,SimpleWorker w) {
String msg;
name = _name; sharedResource = w;
count = count_;
msg = "Thread "+name+" created.";
myResource = 0;
System.out.println(msg);
}
public void run() {
while(count < 0 || count-- > 0) {
work();
}
}
public synchronized void work() {
int x;
yield();
x = sharedResource.getResource();
yield();
x = x + 1;
yield();
sharedResource.setResource(x);
yield();
}
}
これは何故か? モニタとオブジェクトの対応を明確にする図を書いて
説明せよ。
---生産者消費者問題
生産者消費者問題では、生産者(OurProducer), 消費者(OurConsumer)
と、その間を調停するモニタ(OurThreadMonitor) の三つのスレッド
からなる。モニタは、ここでは受動的にしか動かないので、start()
する必然性はない。(が、一応、起動しておこう....)
Class ProcessExample.ThreadProdCons
package ProcessExample;
public class ThreadProdCons {
static public void main(String [] args) throws InterruptedException {
OurThreadMonitor monitor = new OurThreadMonitor("m1",5);
Thread p1 = new OurProducer("p1",10,monitor);
Thread c1 = new OurConsumer("c1",monitor);
monitor.start();
p1.start();
c1.start();
p1.join();
c1.stop();
monitor.stop();
}
}
ここで、5 は、モニタが持つバッファの大きさであり、Producer の10は、
生産するデータの個数である。生産者の終了を待って、すべてのスレッドを
停止させている。
Class ProcessExample.OurProducer
package ProcessExample;
public class OurProducer extends Thread {
OurThreadMonitor mon;
String name;
int counter;
OurProducer(String name_,int counter_, OurThreadMonitor mon_) {
mon = mon_;
name = name_;
counter = counter_;
}
public void run() { // run method contains the thread code
int item;
while (counter>=0) { // producer loop
item = produce_item();
mon.insert(item);
}
}
private int produce_item() { return counter--; } // actually produce
}
生産者は、モニタに対して、insert(item) でデータを引き渡す。
Class ProcessExample.OurConsumer
package ProcessExample;
public class OurConsumer extends Thread {
OurThreadMonitor mon;
String name;
OurConsumer(String name_, OurThreadMonitor mon_) {
mon = mon_;
name = name_;
}
public void run() { // run method contains the thread code
int item;
while (true) { // consumer loop
item = mon.remove();
consume_item (item);
}
}
private void consume_item(int item) {
String msg = "Consumer "+name+" eats "+Integer.toString(item);
System.out.println(msg);
} // actually consume
}
消費者は、モニタから remove() でデータを取得する。
モニタは、計数セマフォとして動作する。条件付変数の代わりに、
wait()/notify()を使っていることに注意しよう。
Class ProcessExample.OurThreadMonitor
package ProcessExample;
public class OurThreadMonitor extends Thread {
private int bufsize;
private int buffer[];
private int count, lo, hi; // counters and indices
private String name;
OurThreadMonitor(String _name,int _bufsize) {
name = _name;
bufsize = _bufsize;
buffer = new int[bufsize];
}
public synchronized void insert(int val) {
if (count == bufsize) go_to_sleep(); // if the buffer is full, go to sleep
buffer [hi] = val; // insert an item into the buffer
hi = (hi + 1) % bufsize; // slot to place next item in
count = count + 1; // one more item in the buffer now
if (count == 1) notify(); // if consumer was sleeping, wake it up
}
public synchronized int remove() {
int val;
if (count == 0) go_to_sleep( ); // if the buffer is empty, go to sleep
val = buffer [lo]; // fetch an item from the buffer
lo = (lo + 1) % bufsize; // slot to fetch next item from
count = count - 1 ; // one few items in the buffer
if (count == bufsize - 1) notify(); // if producer was sleeping, wake it up
return val;
}
private void go_to_sleep() { try{wait( );} catch(InterruptedException exc
) {};}
}
このsynchronizedでは、OurThreadMonitorのインスタンスに対応するモニタを
取得する。OurThreadMonitor がbuffer fullまたはbuffer emptyになると、
insert/remove ともにブロックされることになる。実際にそういうことが起きるか
どうかを調べよ。もし、起きるなら、これはバグであるので、fix せよ。
起きないならば、その理由を、Java Language API の記述を引用して説明せよ。
ThreadProdCons は、生産者スレッドが終了すると、無条件に消費者を殺しにいく。
これは、若干、拙速な行動である。消費者がデータを取り損なうことなく、
終了するためには、どうすれば良いか? 可能な変更を提案し、実装せよ。
---複数の生産者と複数の消費者
課題3では、生産者と消費者プロセスがそれぞれ1つづつしか生成されない。
これを変更して、複数の生産者プロセスと複数の消費者プロセスを
生成し、動作させなさい。
生産者プロセスと消費者プロセスの数は異なる場合が普通である。しかし、
生産量と消費量がバランスしないとデッドロックに陥る可能性がある。
デッドロックを検出するためには、どうすれば良いか? 可能な方法を
提案し、実装せよ。