Main Page | Namespace List | Class Hierarchy | Class List | File List | Namespace Members | Class Members | File Members

proc_advocate.cc

Go to the documentation of this file.
00001 /*
00002  * proc_advocate.c
00003  *
00004  * Copyright (c) 2003 The University of Utah and the Flux Group.
00005  * All rights reserved.
00006  *
00007  * This file is licensed under the terms of the GNU Public License.  
00008  * See the file "license.terms" for restrictions on redistribution 
00009  * of this file, and for a DISCLAIMER OF ALL WARRANTIES.
00010  */
00011 
00012 /**
00013  * @file proc_advocate.cc
00014  *
00015  * Wrapper used to attach unmodified programs to the Broker.
00016  */
00017 
00018 #include "config.h"
00019 
00020 #include <stdlib.h>
00021 #include <string.h>
00022 
00023 #include <sys/types.h>
00024 #include <sys/stat.h>
00025 #include <unistd.h>
00026 
00027 #include <assert_pp.h>
00028 #include <time_util.h>
00029 
00030 #include <iostream>
00031 
00032 #include <rk/rk.h>
00033 #include "rk_util.h"
00034 
00035 #include "BrokerC.h"
00036 #include "RKTask.hh"
00037 #include "MaxDecayTaskAdvocate.hh"
00038 
00039 enum {
00040     PAB_DONE,
00041 };
00042 
00043 /*
00044  * Flags for the pa_data.pa_Flags field.
00045  *
00046  * PAF_DONE - Stop monitoring the resource set/child process.
00047  */
00048 enum {
00049     PAF_DONE = (1L << PAB_DONE),
00050 };
00051 
00052 /*
00053  * Global data for the tool.
00054  *
00055  * pa_Flags - Holds the PAF_ flags.
00056  * pa_ManagerIOR - The IOR of the Broker::Manager object to use.
00057  * pa_TaskIOR - The IOR of the Broker::RealTimeTask object to send reports to.
00058  * pa_TaskName - The name of the task to create.
00059  * pa_ChildPID - The PID of the wrapped process.
00060  * pa_ChildPeriod - The period to use when making reports.
00061  * pa_Advocate - The Broker::RealTimeTask object to send reports to.
00062  * pa_ResourceSet - The resource set for this process and the child.
00063  * pa_LastUsage - The last recorded CPU usage, in microseconds.
00064  */
00065 static struct {
00066     unsigned long pa_Flags;
00067     const char *pa_ManagerIOR;
00068     const char *pa_TaskIOR;
00069     const char *pa_TaskName;
00070     pid_t pa_ChildPID;
00071     CORBA::ULong pa_ChildPeriod;
00072     Broker::RealTimeTask_ptr pa_Advocate;
00073     rk_resource_set_t pa_ResourceSet;
00074     unsigned long long pa_LastUsage;
00075 } pa_data;
00076 
00077 /**
00078  * Handle a SIGCHLD signal.  This function will be called when the child exits.
00079  *
00080  * @param The actual signal number received.
00081  */
00082 static void sigchld(int sig)
00083 {
00084     require(sig == SIGCHLD);
00085     
00086     pa_data.pa_Flags |= PAF_DONE;
00087 }
00088 
00089 /**
00090  * A signal handler that passes the received signal on to the child process.
00091  *
00092  * @param sig The actual signal number received.
00093  */
00094 static void sigpass(int sig)
00095 {
00096     require(pa_data.pa_ChildPID > 0);
00097     require((sig == SIGINT) || (sig == SIGTERM));
00098     
00099     kill(pa_data.pa_ChildPID, sig);
00100 }
00101 
00102 /**
00103  * The SIGALRM signal handler.  This function will be called at the end of
00104  * every period to report CPU usage to the advocate.
00105  *
00106  * @param sig The actual signal number received.
00107  */
00108 static void sigalrm(int sig)
00109 {
00110     struct rk_resource_set_usage rsu;
00111     int rc;
00112 
00113     require(pa_data.pa_ResourceSet != NULL_RESOURCE_SET);
00114     require(!CORBA::is_nil(pa_data.pa_Advocate));
00115     require(sig == SIGALRM);
00116 
00117     if( (rc = rk_resource_set_get_usage(pa_data.pa_ResourceSet, &rsu)) == 0 )
00118     {
00119         CORBA::ULong status;
00120         
00121         try
00122         {
00123             status = (CORBA::ULong)(rsu.cpu_usage - pa_data.pa_LastUsage);
00124             pa_data.pa_LastUsage = rsu.cpu_usage;
00125             pa_data.pa_Advocate->ReportCPU(pa_data.pa_Advocate,
00126                                            status,
00127                                            status);
00128         }
00129         catch(const CORBA::SystemException &e)
00130         {
00131             cerr << "System exception: " << e << endl;
00132             pa_data.pa_Flags |= PAF_DONE;
00133         }
00134         catch(...)
00135         {
00136             cerr << "Caught unknown exception..." << endl;
00137             pa_data.pa_Flags |= PAF_DONE;
00138         }
00139     }
00140     else
00141     {
00142         cerr << "rk_resource_set_get_usage: "
00143              << strerror(rc)
00144              << endl;
00145         pa_data.pa_Flags |= PAF_DONE;
00146     }
00147 }
00148 
00149 /**
00150  * Print out the usage statement to standard error.
00151  *
00152  * @param prog_name The program name.
00153  */
00154 static void paUsage(const char *prog_name)
00155 {
00156     require(pa_data.pa_ManagerIOR != NULL);
00157     require(prog_name != NULL);
00158 
00159     cerr << "CPU Broker wrapper for managing unmodified programs.\n"
00160          << "Usage: "
00161          << prog_name
00162          << " [options] -- <command> [argument ...]\n"
00163          << endl
00164          << "Options:\n"
00165          << "\t-h\t\tThis help message.\n"
00166          << "\t-V\t\tShow the version number.\n"
00167 
00168          << "\t-m <ior>\tThe Broker::Manager IOR."
00169          << " (Default: " << pa_data.pa_ManagerIOR << ")\n"
00170 
00171          << "\t-t <ior>\tThe Broker::RealTimeTask IOR.\n"
00172 
00173          << "\t-n <name>\tThe name for the task to create.\n"
00174 
00175          << "\t-P <period>\tThe period as a time value."
00176          << " (Default: " << pa_data.pa_ChildPeriod << "us)\n"
00177 
00178          << endl
00179         
00180          << "Package: " << PACKAGE_STRING << endl
00181          << "Contact: " << PACKAGE_BUGREPORT << endl
00182 
00183         ;
00184 }
00185 
00186 /**
00187  * Process the command line options.
00188  *
00189  * @param argc_inout Reference to main's argc variable.  On return, the
00190  * variable will contain the number of arguments remaining after option
00191  * processing.
00192  * @param argv_inout Reference to main's argv variable.  On return, the
00193  * variable will contain the remaining argument values.
00194  * @return True if the options were processed correctly, false otherwise.
00195  */
00196 static int paProcessOptions(int &argc_inout, char **&argv_inout)
00197 {
00198     int ch, retval = 0;
00199     char *prog_name;
00200     char **argv;
00201     int argc;
00202 
00203     argc = argc_inout;
00204     argv = argv_inout;
00205     prog_name = argv[0];
00206     while( ((ch = getopt(argc, argv, "hVn:m:t:P:")) != -1) && (retval == 0) )
00207     {
00208         switch( ch )
00209         {
00210         case 'm':
00211             if( strlen(optarg) == 0 )
00212             {
00213                 cerr << prog_name << ": Manager IOR is empty" << endl;
00214                 retval = 1;
00215             }
00216             else
00217             {
00218                 pa_data.pa_ManagerIOR = optarg;
00219             }
00220             break;
00221         case 't':
00222             if( strlen(optarg) == 0 )
00223             {
00224                 cerr << prog_name << ": Task IOR is empty" << endl;
00225                 retval = 1;
00226             }
00227             else if( pa_data.pa_TaskName != NULL )
00228             {
00229                 cerr << prog_name
00230                      << ": The -t and -n options are mutually exclusive"
00231                      << endl;
00232                 retval = 1;
00233             }
00234             else
00235             {
00236                 pa_data.pa_TaskIOR = optarg;
00237             }
00238             break;
00239         case 'n':
00240             if( strlen(optarg) == 0 )
00241             {
00242                 cerr << prog_name << ": Task name is empty" << endl;
00243                 retval = 1;
00244             }
00245             else if( pa_data.pa_TaskIOR != NULL )
00246             {
00247                 cerr << prog_name
00248                      << ": The -t and -n options are mutually exclusive"
00249                      << endl;
00250                 retval = 1;
00251             }
00252             else
00253             {
00254                 pa_data.pa_TaskName = optarg;
00255             }
00256             break;
00257         case 'P':
00258             {
00259                 unsigned long long usecs;
00260                 
00261                 /* Period for the CPU reservation. */
00262                 if( string_to_microsec(&usecs, optarg) )
00263                 {
00264                     pa_data.pa_ChildPeriod = usecs;
00265                 }
00266                 else
00267                 {
00268                     cerr << prog_name
00269                          << ": -P options requires a time value\n";
00270                     retval = 1;
00271                 }
00272             }
00273             break;
00274         case 'V':
00275             cerr << PACKAGE_VERSION << endl;
00276             retval = -1;
00277             break;
00278         case 'h':
00279         case '?':
00280         default:
00281             retval = 1;
00282             break;
00283         }
00284     }
00285     argc_inout -= optind;
00286     argv_inout += optind;
00287     /* Optionally skip the '--' that is used to terminate the option list. */
00288     if( (argc_inout > 0) && (strcmp(argv_inout[0], "--") == 0) )
00289     {
00290         argc_inout -= 1;
00291         argv_inout += 1;
00292     }
00293     return( retval );
00294 }
00295 
00296 /**
00297  * Create the advocate and begin CPU scheduling based on the command line
00298  * parameters.
00299  *
00300  * @param orb Pointer to the ORB instance.
00301  * @param manager The manager for this machine.
00302  * @param task_ior The IOR of the Broker::RealTimeTask to use or NULL if a
00303  * task should be created.
00304  * @param task_name The name of the Broker::RealTimeTask to create or NULL if
00305  * an existing task should be used.
00306  * @return A pointer to the allocated advocate.
00307  */
00308 static Broker::RealTimeTask_ptr paGetAdvocate(CORBA::ORB_ptr orb,
00309                                               Broker::Manager_ptr manager,
00310                                               const char *task_ior,
00311                                               const char *task_name)
00312 {
00313     Broker::RealTimeTask_var retval;
00314     CORBA::Object_var obj;
00315 
00316     require(!CORBA::is_nil(orb));
00317     require(!CORBA::is_nil(manager));
00318     require((task_ior != NULL) || (task_name != NULL));
00319 
00320     if( task_ior != NULL )
00321     {
00322         obj = orb->string_to_object(task_ior);
00323         retval = Broker::RealTimeTask::_narrow(obj.in());
00324         if( CORBA::is_nil(retval.in()) )
00325         {
00326             cerr << "Invalid task factory IOR: "
00327                  << task_ior
00328                  << endl;
00329             throw CORBA::BAD_PARAM();
00330         }
00331     }
00332     else if( task_name != NULL )
00333     {
00334         try
00335         {
00336             Broker::RealTimeTask_var rtt;
00337             MaxDecayTaskAdvocate *mdta;
00338             Broker::TaskParameters tp;
00339             CORBA::Any value;
00340             RKTask *rkt;
00341             
00342             tp.length(1);
00343             tp[0].name = "name";
00344             tp[0].value <<= task_name;
00345             rkt = new RKTask(tp);
00346             rtt = rkt->_this();
00347             mdta = new MaxDecayTaskAdvocate();
00348             value <<= rtt;
00349             mdta->SetDelegateAttribute("remote-object", value);
00350             retval = mdta->_this();
00351         }
00352         catch(const Broker::DuplicateTaskParameter &e)
00353         {
00354             ensure(0);
00355         }
00356         catch(const Broker::InvalidTaskParameter &e)
00357         {
00358             ensure(0);
00359         }
00360         catch(const Broker::MissingTaskParameter &e)
00361         {
00362             ensure(0);
00363         }
00364     }
00365     else
00366     {
00367         ensure(0);
00368     }
00369     
00370     try
00371     {
00372         struct rk_resource_set_usage rsu;
00373         Broker::ScheduleParameters sp;
00374         
00375         sp.length(2);
00376         sp[0].name = "period";
00377         sp[0].value <<= pa_data.pa_ChildPeriod;
00378         sp[1].name = "pid";
00379         sp[1].value <<= (CORBA::Long)getpid();
00380         manager->AddTask(retval.in(), sp);
00381         pa_data.pa_ResourceSet = rk_proc_get_rset(getpid());
00382         if( rk_resource_set_get_usage(pa_data.pa_ResourceSet, &rsu) == 0 )
00383         {
00384             pa_data.pa_LastUsage = rsu.cpu_usage;
00385         }
00386     }
00387     catch(const Broker::DuplicateScheduleParameter &e)
00388     {
00389         cerr << e << endl;
00390         cerr << e.name << endl;
00391         throw CORBA::UNKNOWN();
00392     }
00393     catch(const Broker::InvalidScheduleParameter &e)
00394     {
00395         cerr << e << endl;
00396         cerr << e.message << endl;
00397         throw CORBA::UNKNOWN();
00398     }
00399     catch(const Broker::MissingScheduleParameter &e)
00400     {
00401         cerr << e << endl;
00402         cerr << e.name << endl;
00403         throw CORBA::UNKNOWN();
00404     }
00405     
00406     return( retval._retn() );
00407 }
00408 
00409 /**
00410  * The parent portion of the fork(2) between pa and the monitored utility.
00411  * This function will attach itself to a resource set to ensure that it has
00412  * some CPU time to work and then wait for gkrellm connections or the child's
00413  * death.
00414  *
00415  * @param rtt The Broker::RealTimeTask that is managing this process.
00416  * @return The return code for main().
00417  *
00418  * @sa paChildPart
00419  */
00420 static int paParentPart(Broker::RealTimeTask_ptr rtt)
00421 {
00422     int retval = EXIT_FAILURE;
00423     struct itimerval itv;
00424     struct sigaction sa;
00425     sigset_t sigmask;
00426 
00427     require(pa_data.pa_ChildPID > 0);
00428     require(!CORBA::is_nil(rtt));
00429     
00430     /*
00431      * We use signals as a primitive event system, so any that we use and are
00432      * not passed on to the child need to be blocked before setting up the
00433      * handlers.  Once everything has been setup, we will use sigsuspend(2) to
00434      * atomically unblock and wait for the signals to arrive.
00435      */
00436     sigemptyset(&sigmask);
00437     sigaddset(&sigmask, SIGALRM);
00438     sigaddset(&sigmask, SIGCHLD);
00439     sigaddset(&sigmask, SIGINT);
00440     sigaddset(&sigmask, SIGTERM);
00441     if( sigprocmask(SIG_BLOCK, &sigmask, NULL) < 0 )
00442     {
00443         perror("sigprocmask");
00444 
00445         ensure(0);
00446     }
00447 
00448     /* Setup the signal handlers. */
00449     sa.sa_mask = sigmask;
00450     sa.sa_flags = 0;
00451 #if defined(SA_RESTART)
00452     sa.sa_flags |= SA_RESTART;
00453 #endif
00454 
00455     sa.sa_handler = sigalrm;
00456     if( sigaction(SIGALRM, &sa, NULL) < 0 )
00457     {
00458         perror("sigprocmask");
00459 
00460         ensure(0);
00461     }
00462 
00463     /* Catch SIGCHLD so we exit with the child. */
00464     sa.sa_handler = sigchld;
00465     if( sigaction(SIGCHLD, &sa, NULL) < 0 )
00466     {
00467         perror("sigprocmask");
00468 
00469         ensure(0);
00470     }
00471     
00472     /*
00473      * Pass SIGINT/SIGTERM onto the child so they can handle them as they
00474      * choose.  If they exit, we will get the SIGCHLD and exit shortly
00475      * thereafter.
00476      */
00477     sa.sa_handler = sigpass;
00478     if( (sigaction(SIGINT, &sa, NULL) < 0) ||
00479         (sigaction(SIGTERM, &sa, NULL) < 0) )
00480     {
00481         perror("sigprocmask");
00482 
00483         ensure(0);
00484     }
00485     
00486     itv.it_interval.tv_sec = 0;
00487     itv.it_interval.tv_usec = pa_data.pa_ChildPeriod;
00488     itv.it_value.tv_sec = 0;
00489     itv.it_value.tv_usec = pa_data.pa_ChildPeriod;
00490     /* XXX Use rk periodic stuff here. */
00491     if( setitimer(ITIMER_REAL, &itv, NULL) == 0 )
00492     {
00493         sigset_t empty_sigmask;
00494         int status;
00495 
00496         sigemptyset(&empty_sigmask);
00497 
00498         pa_data.pa_Advocate = rtt;
00499         
00500         /* Keep handling signals until the child has died. */
00501         while( !(pa_data.pa_Flags & PAF_DONE) )
00502         {
00503             sigsuspend(&empty_sigmask);
00504         }
00505 
00506         pa_data.pa_Advocate = NULL;
00507         
00508         /* Cleanup the child's status. */
00509         if( wait(&status) >= 0 )
00510         {
00511             if( WIFEXITED(status) )
00512             {
00513                 retval = WEXITSTATUS(status);
00514             }
00515             else if( WIFSIGNALED(status) )
00516             {
00517                 retval = EXIT_SUCCESS;
00518             }
00519             else
00520             {
00521                 retval = EXIT_FAILURE;
00522             }
00523         }
00524         else
00525         {
00526             perror("wait");
00527             retval = EXIT_FAILURE;
00528         }
00529         
00530         /* Clear the timer. */
00531         memset(&itv, 0, sizeof(itv));
00532         if( setitimer(ITIMER_REAL, &itv, NULL) < 0 )
00533         {
00534             perror("setitimer");
00535         }
00536     }
00537     else
00538     {
00539         perror("setitimer");
00540         retval = EXIT_FAILURE;
00541     }
00542 
00543     /* We will be dying soon, ignore any signals and */
00544     signal(SIGALRM, SIG_IGN);
00545     signal(SIGCHLD, SIG_IGN);
00546     signal(SIGINT, SIG_IGN);
00547     signal(SIGTERM, SIG_IGN);
00548 
00549     /* ... restore the old signal mask. */
00550     if( sigprocmask(SIG_UNBLOCK, &sigmask, NULL) < 0 )
00551     {
00552         perror("sigprocmask");
00553         
00554         ensure(0);
00555     }
00556     
00557     return( retval );
00558 }
00559 
00560 /**
00561  * The child portion of the fork(2) between the proc_advocate and the monitored
00562  * utility.  This function will execvp(3) the utility with the given arguments.
00563  *
00564  * @param argv The utility to startup and its arguments.
00565  * @return A failure exit code, otherwise this function will not return because
00566  * of the execvp(3).
00567  *
00568  * @sa paParentPart
00569  */
00570 static int paChildPart(char *argv[])
00571 {
00572     int retval = EXIT_SUCCESS;
00573     sigset_t sigmask;
00574 
00575     /* Reset the signal mask for the child. */
00576     sigfillset(&sigmask);
00577     if( sigprocmask(SIG_UNBLOCK, &sigmask, NULL) < 0 )
00578     {
00579         perror("sigprocmask");
00580         
00581         ensure(0);
00582     }
00583 
00584     execvp(argv[0], argv);
00585     
00586     /* FALLTHROUGH, normal operation will not reach this point. */
00587     perror(argv[0]);
00588     switch( errno )
00589     {
00590     case ENOENT:
00591     case EPERM:
00592         retval = 127;
00593         break;
00594     default:
00595         retval = EXIT_FAILURE;
00596         break;
00597     }
00598     return( retval );
00599 }
00600 
00601 int main(int argc, char *argv[])
00602 {
00603     int lpc, old_argc = argc, retval = EXIT_FAILURE;
00604     const char *prog_name = argv[0];
00605 
00606     pa_data.pa_ManagerIOR = "file://manager.ior";
00607     pa_data.pa_ChildPeriod = 10000;
00608 
00609     /*
00610      * Change the argc to not include the wrappee's arguments, otherwise,
00611      * our call to ORB_init could consume the wrappee's ORB arguments.
00612      */
00613     for( lpc = 0; (lpc < argc) && (strcmp(argv[lpc], "--") != 0); lpc++ );
00614 
00615     if( lpc < argc )
00616     {
00617         argc = lpc;
00618     }
00619     
00620     try
00621     {
00622         CORBA::ORB_var orb = CORBA::ORB_init(argc, argv);
00623         int rc;
00624 
00625         CORBA::Object_var obj = orb->resolve_initial_references("RootPOA");
00626         
00627         PortableServer::POA_var root_poa =
00628             PortableServer::POA::_narrow(obj.in());
00629         PortableServer::POAManager_var mgr = root_poa->the_POAManager();
00630         mgr->activate();
00631         
00632         PortableServer::ThreadPolicy_var thread = root_poa->
00633             create_thread_policy(PortableServer::SINGLE_THREAD_MODEL);
00634         
00635         CORBA::PolicyList policy_list;
00636         policy_list.length(1);
00637         policy_list[0] = PortableServer::ThreadPolicy::_duplicate(thread.in());
00638         
00639         PortableServer::POA_var st_poa = root_poa->
00640             create_POA("SingleThread",
00641                        PortableServer::POAManager::_nil(),
00642                        policy_list);
00643         
00644         thread->destroy();
00645 
00646         argc += (old_argc - argc);
00647         rc = paProcessOptions(argc, argv);
00648         if( (rc == 0) && (argc > 0) )
00649         {
00650             Broker::RealTimeTask_var rtt;
00651             Broker::Manager_var manager;
00652             CORBA::Object_var obj;
00653 
00654             obj = orb->string_to_object(pa_data.pa_ManagerIOR);
00655             manager = Broker::Manager::_narrow(obj.in());
00656             if( CORBA::is_nil(manager.in()) )
00657             {
00658                 cerr << "Invalid manager IOR: "
00659                      << pa_data.pa_ManagerIOR
00660                      << endl;
00661                 throw CORBA::BAD_PARAM();
00662             }
00663 
00664             /*
00665              * Block signals so we do not die without removing ourself from the
00666              * manager.
00667              */
00668             {
00669                 sigset_t sigmask;
00670                 
00671                 sigaddset(&sigmask, SIGINT);
00672                 sigaddset(&sigmask, SIGTERM);
00673                 if( sigprocmask(SIG_BLOCK, &sigmask, NULL) < 0 )
00674                 {
00675                     perror("sigprocmask");
00676                     
00677                     ensure(0);
00678                 }
00679             }
00680             
00681             rtt = paGetAdvocate(orb.in(),
00682                                 manager.in(),
00683                                 pa_data.pa_TaskIOR,
00684                                 pa_data.pa_TaskName);
00685             if( (pa_data.pa_ChildPID = fork()) > 0 )
00686             {
00687                 retval = paParentPart(rtt.in());
00688             }
00689             else if( pa_data.pa_ChildPID == 0 )
00690             {
00691                 return( paChildPart(argv) );
00692             }
00693             else
00694             {
00695                 /* The fork failed. */
00696                 retval = EXIT_FAILURE;
00697             }
00698             manager->RemoveTask(rtt.in());
00699         }
00700         else
00701         {
00702             if( rc == 0 )
00703             {
00704                 cerr << "No utility to execute\n";
00705             }
00706             if( rc >= 0 )
00707             {
00708                 paUsage(prog_name);
00709             }
00710         }
00711     }
00712     catch(const CORBA::UNKNOWN &e)
00713     {
00714         // Ignore...
00715         paUsage(prog_name);
00716     }
00717     catch(const CORBA::SystemException &e)
00718     {
00719         cerr << "Caught Exception: " << e << endl;
00720     }
00721     catch(...)
00722     {
00723         cerr << "Caught an unhandled exception" << endl;
00724     }
00725     
00726     return( retval );
00727 }

Generated on Mon Dec 1 16:29:06 2003 for CPU Broker by doxygen 1.3.4