Main Page | Namespace List | Class Hierarchy | Class List | File List | Namespace Members | Class Members | File Members | Related Pages

RKObserverAdvocate.cc

Go to the documentation of this file.
00001 /*
00002  * RKObserverAdvocate.cc
00003  *
00004  * Copyright (c) 2004 The University of Utah and the Flux Group.
00005  * All rights reserved.
00006  *
00007  * This file is licensed under the terms of the GNU Public License.  
00008  * See the file "license.terms" for restrictions on redistribution 
00009  * of this file, and for a DISCLAIMER OF ALL WARRANTIES.
00010  */
00011 
00012 /**
00013  * @file RKObserverAdvocate.cc
00014  *
00015  * Implementation of the RKObserverAdvocate class.
00016  */
00017 
00018 #include "config.h"
00019 
00020 #include <assert_pp.h>
00021 #include <time_util.h>
00022 #include <instrumentation.h>
00023 
00024 #include <iostream>
00025 
00026 #include <rk.h>
00027 #include <rk_util.h>
00028 
00029 #include "RKObserverAdvocate.hh"
00030 
00031 using namespace std;
00032 
00033 #if !defined(timespecclear)
00034 /**
00035  * Clear a timespec structure.
00036  *
00037  * @param tvp Pointer to the timespec object to clear.
00038  */
00039 #define timespecclear(tvp)      ((tvp)->tv_sec = (tvp)->tv_nsec = 0)
00040 
00041 /**
00042  * Test if a timespec has non-zero values.
00043  *
00044  * @param tvp Pointer to the timespec object to check.
00045  */
00046 #define timespecisset(tvp)      ((tvp)->tv_sec || (tvp)->tv_nsec)
00047 
00048 /**
00049  * Compare two timespec structures.
00050  *
00051  * @param tvp Pointer to the 'left hand side' of the comparison.
00052  * @param uvp Pointer to the 'right hande side' of the comparison.
00053  * @param cmp The comparison operator.  (e.g. '<', '>', ...)
00054  */
00055 #define timespeccmp(tvp, uvp, cmp)                                      \
00056         (((tvp)->tv_sec == (uvp)->tv_sec) ?                             \
00057             ((tvp)->tv_nsec cmp (uvp)->tv_nsec) :                       \
00058             ((tvp)->tv_sec cmp (uvp)->tv_sec))
00059 
00060 /**
00061  * Add one timespec to another.
00062  *
00063  * @param vvp Pointer to the timespec that will be added to.
00064  * @param uvp Pointer to the timespec to add.
00065  */
00066 #define timespecadd(vvp, uvp)                                           \
00067         do {                                                            \
00068                 (vvp)->tv_sec += (uvp)->tv_sec;                         \
00069                 (vvp)->tv_nsec += (uvp)->tv_nsec;                       \
00070                 if ((vvp)->tv_nsec >= 1000000000) {                     \
00071                         (vvp)->tv_sec++;                                \
00072                         (vvp)->tv_nsec -= 1000000000;                   \
00073                 }                                                       \
00074         } while (0)
00075 
00076 /**
00077  * Subtract one timespec from another.
00078  *
00079  * @param vvp Pointer to the timespec that will be subtracted from.
00080  * @param uvp Pointer to the timespec to subtract.
00081  */
00082 #define timespecsub(vvp, uvp)                                           \
00083         do {                                                            \
00084                 (vvp)->tv_sec -= (uvp)->tv_sec;                         \
00085                 (vvp)->tv_nsec -= (uvp)->tv_nsec;                       \
00086                 if ((vvp)->tv_nsec < 0) {                               \
00087                         (vvp)->tv_sec--;                                \
00088                         (vvp)->tv_nsec += 1000000000;                   \
00089                 }                                                       \
00090         } while (0)
00091 #endif
00092 
00093 /**
00094  * Trampoline function for new threads that calls the 'run' method of
00095  * RKObserverAdvocate.
00096  *
00097  * @param arg The RKObserverAdvocate object.
00098  * @return NULL.
00099  */
00100 static void *trampoline(void *arg)
00101 {
00102     RKObserverAdvocate *rkoa = (RKObserverAdvocate *)arg;
00103     void *retval = NULL;
00104 
00105     require(arg != NULL);
00106     
00107     rkoa->run();
00108 
00109     return( retval );
00110 }
00111 
00112 RKObserverAdvocate::RKObserverAdvocate(void)
00113     throw (CORBA::SystemException)
00114 {
00115     if( pthread_mutex_init(&this->rkoa_Mutex, NULL) != 0 )
00116     {
00117         ensure(0);
00118     }
00119     if( pthread_cond_init(&this->rkoa_Cond, NULL) != 0 )
00120     {
00121         ensure(0);
00122     }
00123     this->rkoa_ResourceSet = NULL_RESOURCE_SET;
00124     this->rkoa_Period = microsec_to_timespec(10000);
00125     this->rkoa_Thread = 0;
00126 }
00127 
00128 RKObserverAdvocate::~RKObserverAdvocate(void)
00129 {
00130     this->rkoa_Thread = 0;
00131     if( pthread_mutex_destroy(&this->rkoa_Mutex) != 0 )
00132     {
00133         ensure(0);
00134     }
00135     if( pthread_cond_destroy(&this->rkoa_Cond) != 0 )
00136     {
00137         ensure(0);
00138     }
00139 }
00140 
00141 void
00142 RKObserverAdvocate::BeginCPUScheduling(const Broker::ScheduleParameters &sp)
00143     throw (CORBA::SystemException,
00144            Broker::DuplicateScheduleParameter,
00145            Broker::InvalidScheduleParameter,
00146            Broker::MissingScheduleParameter)
00147 {
00148     unsigned long long period = ~0;
00149     unsigned int lpc;
00150     const char *str;
00151     pid_t pid = -1;
00152     
00153     if( CORBA::is_nil(this->dm_RemoteObject.in()) )
00154     {
00155         throw CORBA::BAD_INV_ORDER();
00156     }
00157 
00158     if( this->rkoa_ResourceSet != NULL_RESOURCE_SET )
00159     {
00160         throw CORBA::BAD_INV_ORDER();
00161     }
00162 
00163     /* Process the arguments */
00164     for( lpc = 0; lpc < sp.length(); lpc++ )
00165     {
00166         if( strcasecmp(sp[lpc].name.in(), "pid") == 0 )
00167         {
00168             CORBA::Long c_pid;
00169             const char *str;
00170 
00171             if( pid != -1 )
00172             {
00173                 throw Broker::DuplicateScheduleParameter("pid");
00174             }
00175             else if( sp[lpc].value >>= c_pid )
00176             {
00177                 pid = c_pid;
00178             }
00179             else if( (sp[lpc].value >>= str) &&
00180                      (sscanf(str, "%d", &pid) == 1) )
00181             {
00182             }
00183             else
00184             {
00185                 throw Broker::InvalidScheduleParameter("'pid' not a long",
00186                                                        sp[lpc]);
00187             }
00188             if( pid < 0 )
00189             {
00190                 pid = -1;
00191                 throw Broker::InvalidScheduleParameter("'pid' not a long",
00192                                                        sp[lpc]);
00193             }
00194         }
00195         else if( strcasecmp(sp[lpc].name.in(), "period") == 0 )
00196         {
00197             CORBA::ULong period_ul;
00198             
00199             if( period != ~0ULL )
00200             {
00201                 throw Broker::DuplicateScheduleParameter("period");
00202             }
00203             else if( sp[lpc].value >>= period_ul )
00204             {
00205                 period = period_ul;
00206             }
00207             else if( (sp[lpc].value >>= str) &&
00208                      string_to_microsec(&period, str) )
00209             {
00210             }
00211             else
00212             {
00213                 throw Broker::InvalidScheduleParameter(
00214                         "'period' is not a time",
00215                         sp[lpc]);
00216             }
00217         }
00218     }
00219 
00220     if( pid < -1 )
00221     {
00222         throw CORBA::BAD_PARAM();
00223     }
00224     
00225     this->dm_RemoteObject->BeginCPUScheduling(sp);
00226 
00227     this->rkoa_Period = microsec_to_timespec(period);
00228 
00229     this->rkoa_Advocate = Broker::RealTimeTask::_duplicate(this->_this());
00230     
00231     {
00232         this->rkoa_ResourceSet = rk_proc_get_rset(pid);
00233 
00234         ensure(this->rkoa_ResourceSet != NULL_RESOURCE_SET); // XXX
00235     }
00236     
00237     if( pthread_create(&this->rkoa_Thread, NULL, trampoline, this) != 0 )
00238     {
00239         this->dm_RemoteObject->EndCPUScheduling();
00240         throw CORBA::NO_MEMORY();
00241     }
00242 }
00243 
00244 void RKObserverAdvocate::EndCPUScheduling(void)
00245     throw (CORBA::SystemException)
00246 {
00247     pthread_t pt;
00248     void *rc;
00249     
00250     if( pthread_mutex_lock(&this->rkoa_Mutex) != 0 )
00251     {
00252         ensure(0);
00253     }
00254     {
00255         this->rkoa_ResourceSet = NULL_RESOURCE_SET;
00256         pt = this->rkoa_Thread;
00257         this->rkoa_Thread = 0;
00258         if( pthread_cond_signal(&this->rkoa_Cond) != 0 )
00259         {
00260             ensure(0);
00261         }
00262     }
00263     if( pthread_mutex_unlock(&this->rkoa_Mutex) != 0 )
00264     {
00265         ensure(0);
00266     }
00267     if( pthread_join(pt, &rc) != 0 )
00268     {
00269         ensure(0);
00270     }
00271 
00272     this->dm_RemoteObject->EndCPUScheduling();
00273 }
00274 
00275 void RKObserverAdvocate::run(void)
00276 {
00277     struct rk_resource_set_proc proc_elements[128];
00278     struct rk_resource_set_proc_cache pc;
00279     Broker::KeyedReportParameters krp;
00280     unsigned long long last_usage = 0;
00281     struct rk_resource_set_usage rsu;
00282     struct timespec abstime;
00283     pid_t my_pid;
00284     int rc;
00285 
00286     pc.data = proc_elements;
00287     pc.used = 0;
00288     pc.length = 128;
00289     memset(&rsu, 0, sizeof(rsu));
00290     my_pid = getpid();
00291     if( rk_resource_set_detach_process(rk_proc_get_rset(my_pid),
00292                                        my_pid) == -1 )
00293     {
00294         perror("rk_resource_set_detach_process");
00295     }
00296     if( rk_resource_set_attach_process(this->rkoa_ResourceSet,
00297                                        my_pid) == -1 )
00298     {
00299         perror("rk_resource_set_attach_process");
00300     }
00301     clock_gettime(CLOCK_REALTIME, &abstime);
00302     if( (rc = pthread_mutex_lock(&this->rkoa_Mutex)) != 0 )
00303     {
00304         cerr << "pthread_mutex_lock: " << rc << endl;
00305         ensure(0);
00306     }
00307     do {
00308         if( this->rkoa_ResourceSet == NULL_RESOURCE_SET )
00309         {
00310             /* ... */
00311         }
00312         else
00313         {
00314             int rc;
00315 
00316             if( (rc = rk_resource_set_get_usage(this->rkoa_ResourceSet,
00317                                                 &rsu,
00318                                                 &pc)) != 0 )
00319             {
00320                 cerr << "rk_resource_set_get_usage: "
00321                      << strerror(rc)
00322                      << endl;
00323             }
00324             else
00325             {
00326                 try
00327                 {
00328                     unsigned long long total_usage;
00329                     Broker::CPUReserve status;
00330 
00331                     total_usage = rsu.inactive_cpu_usage +
00332                         rsu.active_cpu_usage;
00333                     status.Period = timespec_to_microsec(&this->rkoa_Period);
00334                     status.Compute = (CORBA::ULong)
00335                         (total_usage - last_usage) + 50;
00336                     /*
00337                      * NOTE: We must use PassCPU with rkoa_Advocate here
00338                      * because using _this() will return an object reference in
00339                      * the default POA.
00340                      */
00341                     this->PassCPU(this->rkoa_Advocate, status, status, krp);
00342                     last_usage = total_usage;
00343                 }
00344                 catch(const CORBA::SystemException &e)
00345                 {
00346                     cerr << "System exception: " << e << endl;
00347                 }
00348                 catch(...)
00349                 {
00350                     cerr << "Caught unknown exception..." << endl;
00351                 }
00352             }
00353             timespecadd(&abstime, &this->rkoa_Period);
00354             rc = pthread_cond_timedwait(&this->rkoa_Cond,
00355                                         &this->rkoa_Mutex,
00356                                         &abstime);
00357             switch( rc )
00358             {
00359             case 0:
00360                 /* Time to exit. */
00361                 break;
00362             case ETIMEDOUT:
00363                 /* Expected. */
00364                 break;
00365             default:
00366                 cerr << "pthread_cond_timedwait: " << rc << endl;
00367                 ensure(0);
00368                 break;
00369             }
00370         }
00371     } while( this->rkoa_Thread != 0 );
00372     if( pthread_mutex_unlock(&this->rkoa_Mutex) != 0 )
00373     {
00374         ensure(0);
00375     }
00376     rk_resource_set_proc_cache_release(&pc);
00377 }

Generated on Tue Jun 22 14:50:10 2004 for CPU Broker by doxygen 1.3.6