/* independent parameters */ #define NODES 2 #define STATES 3 #define MAX_SUCC_STATES 3 #define INITIAL_STATES 1 #define UNSIGNED_PREC 10 /* dependent parameters */ #define NODES_COUPLES 2 /* (NODES*(NODES - 1)) */ chan mpi_states[NODES_COUPLES] = [STATES] of {int}; chan mpi_term_prob[NODES_COUPLES] = [STATES] of {int, int}; chan mpi_term[NODES_COUPLES] = [STATES] of {int}; chan consumption_queue[NODES] = [STATES] of {int}; bit Terminate[NODES]; bit sleep[NODES]; bit end_of_CommThread[NODES]; bit TokenValid[NODES]; int Sent[NODES]; /* #MPI_Isend in the paper */ int Received[NODES]; /* #MPI_Recv in the paper */ bit result[NODES]; /* 0: ok; 1: error */ bit mutex_sleep[NODES]; /*bit mutex_terminate[NODES];*/ inline LockMutex(mutex, node) { atomic { mutex[node] == 0; mutex[node] = 1; } } inline UnLockMutex(mutex, node) { mutex[node] = 0; } inline WaitCondSignal(mutex, node) { atomic { mutex_sleep[node] = 0; sleep[node] = 1; sleep[node] == 0; /* sleep till woken up by the other thread */ } } inline chan_connect(i, j, res) { int min, max; min = (i < j -> i : j); max = (i > j -> i : j); res = 2*(min*(NODES - min + 1) + max - min - (min == 0 -> 1 : 0)) + (i < j -> 0 : 1); } inline GenNodes(node, node_tmp) { node_tmp = 0; do :: (node != NODES - 1 && node_tmp < NODES - 1) || (node == NODES - 1 && node_tmp < NODES - 2) -> node_tmp++; :: node_tmp != node -> break; od; } /* only called by CheckState (see comment to this function) */ inline ErrorState(node) { if :: /* error state */ result[node] = 1; Terminate[node] = 1; break; /* it is always called inside a loop */ :: /* state ok */ skip; fi; } inline Enqueue_state(node) { atomic { if :: nfull(consumption_queue[node]) -> consumption_queue[node]!0; :: full(consumption_queue[node]) -> skip; fi; } } /* w.r.t. the paper pseudocode, it also sets Terminate and breaks the loop in which it is called */ inline CheckState(node) { if :: /* state is mine */ if :: /* not visited */ ErrorState(node); Enqueue_state(node); :: /* visited */ skip; fi; :: /* state is not mine */ ErrorState(node); /* moved in CommThread */ /*GenNodes(node, node_state); chan_connect(node, node_state, res); mpi_states[res]!0; Sent[node]++;*/ fi; } proctype ParBFS(int node) { run CommThread(node); int state = 0; int res; if :: node == 0 -> /* root node */ do :: state < INITIAL_STATES -> CheckState(node); state++; :: state > 0 -> break; /* at least one initial state */ od; :: node != 0 -> skip; fi; do /* from here... */ :: Terminate[node] -> break; :: !Terminate[node] -> if :: empty(consumption_queue[node]) -> LockMutex(mutex_sleep, node); if :: !Terminate[node] -> WaitCondSignal(mutex_sleep, node); /*UnLockMutex(mutex_sleep, node);*/ /* performed in the inline function above; this is consistent with what does pthread_cond_wait */ :: Terminate[node] -> UnLockMutex(mutex_sleep, node); break; fi; if :: Terminate[node] -> break; :: !Terminate[node] -> skip; fi :: nempty(consumption_queue[node]) -> skip; fi; /* ...till here implements ParTerminate() */ /* model checking main loop */ assert(nempty(consumption_queue[node])); consumption_queue[node]?_; if :: skip; /* sent by another node and already visited (i.e., w.r.t. the paper pseudocode (!checked && s is in T) holds) */ :: state = 0; do :: state < MAX_SUCC_STATES -> /* state expansion */ CheckState(node); state++; :: break; od; fi; od; Terminate[node] = 1; end_of_CommThread[node]; /* pthread_join */ } inline End(node, broadcast) { if :: broadcast -> GenNodes(node, node_tmp); chan_connect(node, node_tmp, res); mpi_term[res]!0; :: !broadcast -> skip; fi; Terminate[node] = 1; LockMutex(mutex_sleep, node); sleep[node] = 0; UnLockMutex(mutex_sleep, node); } /* cannot be done in this way, since calls and is called by ProcessMessages */ /*inline TknValidAndNthngToDo(node) { }*/ inline ReceiveStates(node) { mpi_states[res]?_; Received[node]++; smth_done = 1; Enqueue_state(node); LockMutex(mutex_sleep, node); if :: sleep[node] == 1 && nempty(consumption_queue[node]) -> sleep[node] = 0; :: sleep[node] != 1 || empty(consumption_queue[node]) -> skip; fi; UnLockMutex(mutex_sleep, node); } inline ProcessMessages(node) { smth_done = 0; GenNodes(node, node_tmp); chan_connect(node_tmp, node, res); if /* MPI_Iprobe for states and termination */ :: mpi_states[res]?[0] -> ReceiveStates(node); :: mpi_term[res]?[0] -> mpi_term[res]?_; End(node, 0) -> goto end_of_CommThread_lbl; :: !(mpi_states[res]?[0]) && !(mpi_term[res]?[0]) -> skip; fi; int neighbour_left = (node >= 1 -> node - 1 : NODES - 1); chan_connect(neighbour_left, node, res); if /* MPI_Iprobe for the termination probing */ :: mpi_term_prob[res]?[sent, received] -> /* ReceiveTermProb */ mpi_term_prob[res]? sent, received; TokenValid[node] = 1; LockMutex(mutex_sleep, node); if :: TokenValid[node] && sleep[node] == 1 -> UnLockMutex(mutex_sleep, node); /*ProcessMessages(node);*//* cannot be done, so expansion... */ smth_done = 0; GenNodes(node, node_tmp); chan_connect(node_tmp, node, res); if :: mpi_states[res]?[0] -> ReceiveStates(node); :: mpi_term[res]?[0] -> mpi_term[res]?_; End(node, 0) -> goto end_of_CommThread_lbl; :: !(mpi_states[res]?[0]) && !(mpi_term[res]?[0]) -> skip; fi; assert(!(mpi_term_prob[res]?[_, _])); /* we are already processing it... */ ret_TknValidAndNthngToDo = !smth_done; :: !(TokenValid[node] && sleep[node] == 1) -> UnLockMutex(mutex_sleep, node); ret_TknValidAndNthngToDo = 0; fi; if :: ret_TknValidAndNthngToDo -> if :: node == 0 -> /* master node */ if :: sent + Sent[node] == received + Received[node] -> End(node, 1) -> goto end_of_CommThread_lbl; :: sent + Sent[node] != received + Received[node] -> skip; fi :: !(node == 0) -> /* non-master node */ chan_connect(node, (node + 1)%NODES, res); mpi_term_prob[res]! sent + Sent[node], received + Received[node]; fi; TokenValid[node] = 0; :: !ret_TknValidAndNthngToDo -> skip; fi :: !(mpi_term_prob[res]?[_, _]) -> skip; fi } proctype CommThread(int node) { bit ret_TknValidAndNthngToDo; byte node_tmp; bit smth_done = 0; int res; int node_state; int sent, received; /* token.snt and token.rcvd, respectively */ do :: /* cycle in CommThread */ ProcessMessages(node); if :: Terminate[node] -> End(node, 1) -> goto end_of_CommThread_lbl; :: !Terminate[node] -> skip; fi; /* DoSends */ if :: /* something present in communication queue, send it */ atomic { GenNodes(node, node_state); chan_connect(node, node_state, res); if :: nfull(mpi_states[res]) -> mpi_states[res]!0; Sent[node]++; :: full(mpi_states[res]) -> skip; fi } :: skip; fi; /* StableCondTokenProc */ LockMutex(mutex_sleep, node); if :: TokenValid[node] && sleep[node] == 1 -> UnLockMutex(mutex_sleep, node); ProcessMessages(node); ret_TknValidAndNthngToDo = !smth_done; :: !(TokenValid[node] && sleep[node] == 1) -> UnLockMutex(mutex_sleep, node); ret_TknValidAndNthngToDo = 0; fi; if :: ret_TknValidAndNthngToDo -> chan_connect(node, (node + 1)%NODES, res); if :: node == 0 -> /* master node */ mpi_term_prob[res]! 0, 0; :: !(node == 0) -> /* non-master node */ mpi_term_prob[res]! sent + Sent[node], received + Received[node]; fi; TokenValid[node] = 0; :: !ret_TknValidAndNthngToDo -> skip; fi; od; end_of_CommThread_lbl: end_of_CommThread[node] = 1; } init { byte node = 0; atomic { do :: node < NODES -> if :: node == 0 -> TokenValid[node] = 1; :: !(node == 0) -> TokenValid[node] = 0; fi; run ParBFS(node); node++; :: node == NODES -> break; od } }