/* Condition Variables Using DSS Demo Program CS509 Winter 1995 G. Lindstrom */ import sim.*; import queues.*; import random.*; /* globals */ class cond_demo { static int stop_time; // simulation stop time static urand interleave; // simulates "truer" concurrency static buffer buf; // one globally declared buffer public static void main(String av[]) { buf = new buffer(2, true); stop_time = 75; interleave = new urand(0, 2); scheduler.activate(new writer("writer 1", 100)); scheduler.activate(new reader("reader A")); scheduler.activate(new writer("writer 2", 200)); scheduler.activate(new reader("reader B")); scheduler.activate(new writer("writer 3", 300)); scheduler.activate(new reader("reader C")); scheduler.activate(new writer("writer 4", 400)); scheduler.activate(new reader("reader D")); scheduler.run_simulation(); } } /* class declarations */ /* buff_elem not needed */ /* buffer member functions */ class buffer extends Queue { // buffer class; hide inherited get/put int size; // maximum number of elements boolean verbose; // show trace? resource mutex; // mutex guarding this buffer condition_var not_empty; // writers signal readers condition_var not_full; // readers signal writers buffer(int s, boolean v) { size = s; verbose = v; mutex = new resource("buffer mutex"); not_empty = new condition_var("buffer not empty", verbose); not_full = new condition_var("buffer not full", verbose); } public void Enqueue(Object o) // redefined for "interleaving" { scheduler.hold(cond_demo.interleave.draw()); mutex.request(); while (Length() == size) { // current process holds mutex scheduler.hold(cond_demo.interleave.draw()); not_full.wait(mutex); // mutex temporarily released within wait // current process again holds mutex } // buffer not full now; current process has lock scheduler.hold(cond_demo.interleave.draw()); super.Enqueue(o); // safely do put scheduler.hold(cond_demo.interleave.draw()); not_empty.signal(); // signal a waiting reader (if any) scheduler.hold(cond_demo.interleave.draw()); mutex.release(); } public Object Dequeue() // redefined for "interleaving" { Object result; scheduler.hold(cond_demo.interleave.draw()); mutex.request(); while (Length() == 0) { // current process holds mutex scheduler.hold(cond_demo.interleave.draw()); not_empty.wait(mutex); // mutex temporarily released within wait // current process again holds mutex } // buffer not empty now; current process has lock scheduler.hold(cond_demo.interleave.draw()); result = super.Dequeue(); // safely do get scheduler.hold(cond_demo.interleave.draw()); not_full.signal(); // signal a waiting writer (if any) scheduler.hold(cond_demo.interleave.draw()); mutex.release(); return result; } }; /* writer member functions */ class writer extends process { int base; writer(String n, int b) { super(n); base = b; } void body() { while ( scheduler.clock < cond_demo.stop_time ) { scheduler.hold(cond_demo.interleave.draw()); base++; System.out.println("Time " + scheduler.clock + ": " + name + " initiates write of " + base); // all concurrency control hidden cond_demo.buf.Enqueue(new Integer(base)); System.out.println("Time " + scheduler.clock + ": " + name + " completes write of " + base); } } } /* reader member functions */ class reader extends process { reader(String n) { super(n); } void body() { Integer be; while ( scheduler.clock < cond_demo.stop_time ) { scheduler.hold(cond_demo.interleave.draw()); System.out.println("Time " + scheduler.clock + ": " + name + " initiates read"); // all concurrency control hidden be = (Integer)cond_demo.buf.Dequeue(); System.out.println("Time " + scheduler.clock + ": " + name + " completes read of " + be); } } }