/** * @File : batchServ.cxx, Fichiers * @Authors : A. B. Dragut * @Date : 2/12/2012 * @Synopsis : mini-lanceur-surveilleur de programmes **/ #include #include #include #include #include #include #include #include #include #include #include "CExc.h" #include "nsSysteme.h" // using namespace nsSysteme; // using namespace std; namespace { #define WAITBETWEENTERMANDKILL 4 #define WAITFORINSPECT 10 #define WAITAFTERKILL 4 volatile sig_atomic_t cmdCompleted; volatile sig_atomic_t inspCompleted; pid_t cmdPid, inspectorPid; enum Something {DO_RUN,DO_STOP,DO_CONT,DO_INSPECT,DO_KILL,DO_DUMP, IS_RUNNING,WAS_ALREADY_RUNNING,ALARM_WENT_OFF, NOT_UNDERSTOOD,HICKUP,SHOULD_NEVER_GET_HERE,FORK_ERROR, RUN_COMPLETE,INSPECT_COMPLETE,KILL_COMPLETE,BYE}; enum Completed { NOT_YET, YES_EXIT, YES_SIG }; const char *dict[BYE+1] = {"run","stop","cont","inspect","kill","dump", "is_running","was_already_running", "alarm","not_understood","hickup","ouch", "fork_error", "run_complete","inspect_complete", "kill_complete","bye"}; const char *complDict[3] = {"still_running","exited","signaled"}; Completed howCmdCompl, howInspCompl; int cmdComplCode, inspComplCode; Completed analyzeEndOfExec(int status, int *pCode) { if(WIFSIGNALED(status)) { *pCode = WTERMSIG(status); return YES_SIG; } if(WIFEXITED(status)) { *pCode = WEXITSTATUS(status); return YES_EXIT; } return NOT_YET; } void DeroutSig(int sig) { if(sig == SIGCHLD) { for(bool qHarvest (true); qHarvest;) { try { int status; pid_t whoFinished (Waitpid(-1,&status,WNOHANG)); if(whoFinished == cmdPid) { howCmdCompl = analyzeEndOfExec(status,&cmdComplCode); cmdCompleted = (howCmdCompl != NOT_YET); } else if(whoFinished == inspectorPid) { howInspCompl = analyzeEndOfExec(status,&inspComplCode); inspCompleted = (howInspCompl != NOT_YET); } else if(whoFinished == 0) { qHarvest = false; } else { throw CExc("Waitpid()","DeroutSig()"); } } catch(const CExc &e) { switch(e . GetCodErr()) { case EINTR:continue; case ECHILD:qHarvest = false;break; default: cerr << e << "\n"; cerr << "ERROR Strange waitpid() error " << errno << "\n"; qHarvest = false; break; } } catch(...) { cerr << "ERROR waitpid() issue\n"; qHarvest = false; } } } } class Escort { // public: const string commandAndArgs; int traceTimeout, traceCount, termTimeout; // pipe Dispatch -> Escort: pipe(pFd1); int escortPipeFromDispatch; // pFd1[0] int dispatchPipeToEscort; // pFd1[1] // pipe Escort -> Dispatch: pipe(pFd2); int dispatchPipeFromEscort; // pFd2[0] int escortPipeToDispatch; // pFd2[1] const string logPath; const int jobNum; int inspectCount; bool qWasAlreadyRunning; bool qIAmOnTheDispatchSide; sigset_t masque,emptyMasque; pid_t escortPid; void initComm() { int pFd1[2],pFd2[2]; Pipe(pFd1);Pipe(pFd2); escortPipeFromDispatch = pFd1[0]; dispatchPipeToEscort = pFd1[1]; dispatchPipeFromEscort = pFd2[0]; escortPipeToDispatch = pFd2[1]; } void endComm() { if(qIAmOnTheDispatchSide) { Close(dispatchPipeToEscort); Close(dispatchPipeFromEscort); } else { Close(escortPipeFromDispatch); Close(escortPipeToDispatch); } } Something waitForEvent() { Something whatToDo; try { Sigprocmask(SIG_SETMASK,&emptyMasque,0); readFromDispatch(whatToDo); Sigprocmask(SIG_SETMASK,&masque,0); alarm(0); return(whatToDo); } catch (const CExc &e) { Sigprocmask(SIG_SETMASK,&masque,0); if(e . GetCodErr() == EINTR) { if(cmdCompleted) { alarm(0); return(RUN_COMPLETE); } return(ALARM_WENT_OFF); } cerr << e << "\n"; return(HICKUP); } catch(...) { Sigprocmask(SIG_SETMASK,&masque,0); return(HICKUP); } Sigprocmask(SIG_SETMASK,&masque,0); return(SHOULD_NEVER_GET_HERE); } void initSigChld() { cmdCompleted = 0; inspCompleted = 0; sigemptyset(&masque); sigemptyset(&emptyMasque); sigaddset(&masque,SIGCHLD);sigaddset(&masque,SIGALRM); Sigprocmask(SIG_SETMASK,&masque,0); struct sigaction sigAct; sigemptyset(&(sigAct . sa_mask)); sigAct . sa_handler = &DeroutSig; sigAct . sa_flags = 0; Sigaction(SIGCHLD,&sigAct,0); Sigaction(SIGALRM,&sigAct,0); } void forkEscort() { if((escortPid = Fork())) { // still inside the dispatch process Close(escortPipeFromDispatch); Close(escortPipeToDispatch); qIAmOnTheDispatchSide = true; return; } // now we are inside the true escort (child) process Close(dispatchPipeFromEscort); Close(dispatchPipeToEscort); qIAmOnTheDispatchSide = false; initSigChld(); while(1) { doSomething(waitForEvent()); } } public: Escort(const string &cmdNargs, const int jbNm, int trcTmt = 4, int trcCnt = 1, int trmTmt = 3, const string &lgPth = "/tmp/c2") : commandAndArgs(cmdNargs), traceTimeout(trcTmt),traceCount(trcCnt), termTimeout(trmTmt), logPath(lgPth), jobNum(jbNm), inspectCount(0), qWasAlreadyRunning(false) { initComm(); forkEscort(); } ~Escort() { if(qIAmOnTheDispatchSide) { tellEscort(DO_KILL); sleep(WAITAFTERKILL); kill(escortPid,SIGTERM); sleep(WAITAFTERKILL); kill(escortPid,SIGKILL); } endComm(); } private: // we set the signals with SA_RESTART // so we don't have to worry about EINTR // these methods to be called from the escort (child) void killCmdOrInspect(const bool qCmd) { const pid_t whom(qCmd ? cmdPid : inspectorPid); if(!(qCmd ? cmdCompleted : inspCompleted)) { Kill(whom,qCmd ? SIGTERM : SIGINT); alarm(WAITBETWEENTERMANDKILL); Sigprocmask(SIG_SETMASK,&emptyMasque,0); pause();alarm(0); Sigprocmask(SIG_SETMASK,&masque,0); if(!(qCmd ? cmdCompleted : inspCompleted)) { kill(whom,SIGKILL); alarm(WAITBETWEENTERMANDKILL); Sigprocmask(SIG_SETMASK,&emptyMasque,0); pause();alarm(0); Sigprocmask(SIG_SETMASK,&masque,0); } } } void tellDispatch(Something what, Completed q = NOT_YET, int h = -1) { Write(escortPipeToDispatch,&what,sizeof(what)); Write(escortPipeToDispatch,&q,sizeof(q)); Write(escortPipeToDispatch,&h,sizeof(h)); } void readFromDispatch(Something &what) { Read(escortPipeFromDispatch,&what,sizeof(what)); } public: // these methods to be called from the dispatch int didEscortTellDispatch(fd_set *pSwitchBoard) { return(FD_ISSET(dispatchPipeFromEscort,pSwitchBoard)); } int reConfigure(fd_set *pSwitchBoard) { FD_SET(dispatchPipeFromEscort,pSwitchBoard); return(1 + dispatchPipeFromEscort); } private: void tellEscort(Something what) { Write(dispatchPipeToEscort,&what,sizeof(what)); } public: void readFromEscort(Something &what, Completed &q, int &h) { Read(dispatchPipeFromEscort,&what,sizeof(what)); Read(dispatchPipeFromEscort,&q,sizeof(q)); Read(dispatchPipeFromEscort,&h,sizeof(h)); } public: // these methods to be called from either escort or dispatch void doInspectCmd() { if(qIAmOnTheDispatchSide) { tellEscort(DO_INSPECT); } else { // do inspect ostringstream buffLogF,buffPid; /*buffCmd << "strace -f -i -p " << cmdPid << " > " << logPath << "/" << jobNum << ".log.inspect." << inspectCount << " 2>&1 ";*/ buffPid << cmdPid; buffLogF<< logPath << "/" << jobNum << ".log.inspect." << inspectCount; inspectCount++; try { if((inspectorPid = Fork())) { alarm(WAITFORINSPECT); Sigprocmask(SIG_SETMASK,&emptyMasque,0); pause();alarm(0); Sigprocmask(SIG_SETMASK,&masque,0); killCmdOrInspect(false); tellDispatch(INSPECT_COMPLETE, howInspCompl,inspComplCode); inspCompleted = 0; } else { const int outFile (Open(buffLogF.str().c_str(), O_CREAT|O_TRUNC|O_WRONLY, 0700)); Dup2(outFile,STDOUT_FILENO); Dup2(outFile,STDERR_FILENO); Close(STDIN_FILENO); Close(outFile); Close(escortPipeFromDispatch); Close(escortPipeToDispatch); execl("/usr/bin/strace", "strace", "-f","-p",buffPid.str().c_str(), (char *)0); cerr << "ERROR Could not run inspection " << "/usr/bin/strace -f -p " << buffPid << "\n"; exit(0); } } catch(...) { tellDispatch(FORK_ERROR); } } } void doKillCmd () { if(qIAmOnTheDispatchSide) { tellEscort(DO_KILL); } else { // do kill killCmdOrInspect(true); } } private: void doSomething(Something whatToDo, int *pKTrace = 0) { switch(whatToDo) { case DO_RUN:doRunCmd(); break; case DO_STOP:Kill(cmdPid,SIGSTOP); break; case DO_CONT:Kill(cmdPid,SIGCONT); break; case ALARM_WENT_OFF:if(pKTrace) { (*pKTrace)++; } case DO_INSPECT:doInspectCmd(); break; case DO_KILL:doKillCmd(); tellDispatch(KILL_COMPLETE,howCmdCompl,cmdComplCode); Sigprocmask(SIG_SETMASK,&emptyMasque,0); pause(); // all done, just waiting exit(0); case RUN_COMPLETE: tellDispatch(RUN_COMPLETE,howCmdCompl,cmdComplCode); Sigprocmask(SIG_SETMASK,&emptyMasque,0); pause(); exit(0); default:tellDispatch(NOT_UNDERSTOOD);break; } } void escortRunningCmd() { for(int kTrace (0) ; kTrace < traceCount;) { try { alarm(traceTimeout); doSomething(waitForEvent(),&kTrace); } catch(const CExc &e) { cerr << e << "\n"; } catch(...) { cerr << "Hickup\n"; } } try { alarm(termTimeout); Sigprocmask(SIG_SETMASK,&emptyMasque,0); pause(); Sigprocmask(SIG_SETMASK,&masque,0); killCmdOrInspect(true); tellDispatch(RUN_COMPLETE,howCmdCompl,cmdComplCode); Sigprocmask(SIG_SETMASK,&emptyMasque,0); pause(); // all done, just waiting exit(0); } catch(...) { cerr << "Hickup end.\n"; } exit(1); } public: void doRunCmd() { if(qIAmOnTheDispatchSide) { tellEscort(DO_RUN); } else { if(qWasAlreadyRunning) { tellDispatch(WAS_ALREADY_RUNNING); return; } try { if((cmdPid = Fork())) { qWasAlreadyRunning = true; tellDispatch(IS_RUNNING); escortRunningCmd(); // -> ends here no matter what } else { // -> also ends here no matter what try { ostringstream buffLogFName; buffLogFName << logPath << "/" << jobNum << ".log"; const int outFile (Open(buffLogFName.str().c_str(), O_CREAT|O_TRUNC|O_WRONLY, 0700)); Dup2(outFile,STDOUT_FILENO); Dup2(outFile,STDERR_FILENO); Close(outFile); Close(STDIN_FILENO); Close(escortPipeFromDispatch); Close(escortPipeToDispatch); execl("/bin/bash","bash","-c", commandAndArgs . c_str(),(char *)0); throw CExc(string("execl(): " + commandAndArgs),"doRunCmd()"); } catch (const CExc &e) { cerr << "ERROR While trying to run " << commandAndArgs << " " << e << "\n"; } exit(1); } } catch(...) { tellDispatch(FORK_ERROR); } } } }; } int main(int argc, char * argv []) { if (argc < 1 || argc > 4) throw CExc (string ("\nUsage : ") + argv[0],"main()"); vector escort; const int traceTimeout (3000); const int traceCount (3); const int termTimeout (3000); Signal(SIGCHLD,SIG_IGN); Signal(SIGQUIT,DeroutSig); fd_set switchBoard; struct timeval tWait; while(1) { cout << "batch > " << flush; string cmd, args; cin >> cmd; if(cmd == "quit") { cout << "Ok, bye.\n"; break; } if(cmd == "load") { string args; getline(cin,args); escort . push_back(new Escort(args,escort . size(), traceTimeout,traceCount,termTimeout)); cout << "Ok, loaded job " << escort . size() - 1 << " as '" << args << "'\n"; } else { string jobNstr; if(cmd != "_") { cin >> jobNstr; const unsigned int jobN (atoi(jobNstr . c_str())); if(jobN < 0 || jobN >= escort . size() || escort[jobN] == 0) { cout << "INVALID Job Number " << jobN << " for command '" << cmd << "'\n"; } else if(cmd == "run") { escort[jobN] -> doRunCmd(); } else if(cmd == "kill") { escort[jobN] -> doKillCmd(); } else if(cmd == "inspect") { escort[jobN] -> doInspectCmd(); } else if(cmd == "dump") { cout << "Ok, getting rid of job " << jobN << "...\n"; delete escort[jobN]; escort[jobN] = 0; cout << " Done.\n"; } else { cout << "I got '" << cmd << "'. What's that ?\n"; } } } // end of the stdin input // reconfiguration select FD_ZERO(&switchBoard); int nSwitch (0); for(unsigned int k (0); k < escort . size() ; ++k) { if(escort[k] == 0) continue; int nE (escort[k] -> reConfigure(&switchBoard)); if(nE > nSwitch) { nSwitch = nE; } } if(nSwitch == 0) continue; tWait . tv_sec = 2; tWait . tv_usec = 0; int nReady (Select(nSwitch,&switchBoard,0,0,&tWait)); if(!nReady) { continue; } for(unsigned int k (0); k < escort . size() ; ++k) { if(escort[k] && escort[k] -> didEscortTellDispatch(&switchBoard)) { Something what; Completed did; int how; escort[k] -> readFromEscort(what,did,how); cout << "[Job " << k << "] " << dict[what]; if(how >= 0) { cout << " " << complDict[did] << " " << how; } cout << "\n"; } } } // end of the "shell" loop return(0); }