J9Qc@s@dZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl m Z ddl m Z ddl mZddlmZddlmZddlmZdd lmZdd lmZydd lmZWn!ek r)dd lmZnXdd lmZdd lmZyddl m!Z!Wnek r}ddl!Z!nXe"Z#ej$e%Z&e"a'aa(a)a*a+de,fdYZ-dZ.dfdYZ/defdYZ0dZ1de fdYZ2dZ3dZ4defdYZ5dS(s Overview ======== The multiprocess plugin enables you to distribute your test run among a set of worker processes that run tests in parallel. This can speed up CPU-bound test runs (as long as the number of work processeses is around the number of processors or cores available), but is mainly useful for IO-bound tests that spend most of their time waiting for data to arrive from someplace else. .. note :: See :doc:`../doc_tests/test_multiprocess/multiprocess` for additional documentation and examples. Use of this plugin on python 2.5 or earlier requires the multiprocessing_ module, also available from PyPI. .. _multiprocessing : http://code.google.com/p/python-multiprocessing/ How tests are distributed ========================= The ideal case would be to dispatch each test to a worker process separately. This ideal is not attainable in all cases, however, because many test suites depend on context (class, module or package) fixtures. The plugin can't know (unless you tell it -- see below!) if a context fixture can be called many times concurrently (is re-entrant), or if it can be shared among tests running in different processes. Therefore, if a context has fixtures, the default behavior is to dispatch the entire suite to a worker as a unit. Controlling distribution ^^^^^^^^^^^^^^^^^^^^^^^^ There are two context-level variables that you can use to control this default behavior. If a context's fixtures are re-entrant, set ``_multiprocess_can_split_ = True`` in the context, and the plugin will dispatch tests in suites bound to that context as if the context had no fixtures. This means that the fixtures will execute concurrently and multiple times, typically once per test. If a context's fixtures can be shared by tests running in different processes -- such as a package-level fixture that starts an external http server or initializes a shared database -- then set ``_multiprocess_shared_ = True`` in the context. These fixtures will then execute in the primary nose process, and tests in those contexts will be individually dispatched to run in parallel. How results are collected and reported ====================================== As each test or suite executes in a worker process, results (failures, errors, and specially handled exceptions like SkipTest) are collected in that process. When the worker process finishes, it returns results to the main nose process. There, any progress output is printed (dots!), and the results from the test run are combined into a consolidated result set. When results have been received for all dispatched tests, or all workers have died, the result summary is output as normal. Beware! ======= Not all test suites will benefit from, or even operate correctly using, this plugin. For example, CPU-bound tests will run more slowly if you don't have multiple processors. There are also some differences in plugin interactions and behaviors due to the way in which tests are dispatched and loaded. In general, test loading under this plugin operates as if it were always in directed mode instead of discovered mode. For instance, doctests in test modules will always be found when using this plugin with the doctest plugin. But the biggest issue you will face is probably concurrency. Unless you have kept your tests as religiously pure unit tests, with no side-effects, no ordering issues, and no external dependencies, chances are you will experience odd, intermittent and unexplainable failures and errors when using this plugin. This doesn't necessarily mean the plugin is broken; it may mean that your test suite is not safe for concurrency. New Features in 1.1.0 ===================== * functions generated by test generators are now added to the worker queue making them multi-threaded. * fixed timeout functionality, now functions will be terminated with a TimedOutException exception when they exceed their execution time. The worker processes are not terminated. * added ``--process-restartworker`` option to restart workers once they are done, this helps control memory usage. Sometimes memory leaks can accumulate making long runs very difficult. * added global _instantiate_plugins to configure which plugins are started on the worker processes. iN(tTextTestRunner(tfailure(tloader(tPlugin(tbytes_(tTextTestResult(t ContextSuite(t test_address(t_WritelnDecorator(tEmpty(twarn(tStringIOtTimedOutExceptioncBseZddZdZRS(s Timed OutcCs ||_dS(N(tvalue(tselfR ((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyt__init__scCs t|jS(N(treprR (R((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyt__str__s(t__name__t __module__RR(((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyR s cCsyddlm}matjtjtj}|}tjtj||j|j|j|j |j f\aaaa a Wnt k rt dt nXdS(Ni(tManagertProcesssKmultiprocessing module is not available, multiprocess plugin cannot be used(tmultiprocessingRRtsignaltSIGINTtSIG_IGNtQueuetPooltEventtValuetArrayt ImportErrorR tRuntimeWarning(Rtoldtm((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyt _import_mps 7 tTestLetcBs,eZdZdZdZdZRS(cCsIy|j|_Wntk r&nX|j|_t||_dS(N(tidt_idtAttributeErrortshortDescriptiont_short_descriptiontstrt_str(Rtcase((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyRs  cCs|jS(N(R&(R((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyR%scCs|jS(N(R)(R((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyR(scCs|jS(N(R+(R((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyRs(RRRR%R(R(((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyR$s   t MultiProcesscBs>eZdZdZiZdZdZdZdZRS(sF Run tests in multiple processes. Requires processing module. ic Cs|jdddd|jddddd d d d |jd ddd|jddddd dd d|jdddd|jdtddd ddS(s0 Register command-line options. s --processestactiontstoretdefaulttNOSE_PROCESSESitdesttmultiprocess_workerstmetavartNUMthelpsNSpread test run among this many processes. Set a number equal to the number of processors or cores in your machine for best results. Pass a negative number to have the number of processes automatically set to the number of cores. Passing 0 means to disable parallel testing. Default is 0 unless NOSE_PROCESSES is set. [NOSE_PROCESSES]s--process-timeouttNOSE_PROCESS_TIMEOUTi tmultiprocess_timeouttSECONDSsfSet timeout for return of results from each test runner process. Default is 10. [NOSE_PROCESS_TIMEOUT]s--process-restartworkert store_truetNOSE_PROCESS_RESTARTWORKERtmultiprocess_restartworkersIf set, will restart each worker process once their tests are done, this helps control memory leaks from killing the system. [NOSE_PROCESS_RESTARTWORKER]N(t add_optiontgettFalse(Rtparsertenv((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pytoptionss cCs[y|jjdWntk r'nXt|dsDt|_dS|jrQdS||_yt|j }Wnt t fk rd}nX|rWt t dkrt|_dS|dkryddl}|j}Wqtk rt|_dSXnt|_||j_ t|j}||j_t|j}||j_t|jdtgetattrR?RQt_multiprocess_can_split_tcollecttaddtasktlen( RRot testQueuettaskst to_teardowntresultR,t ancestorstant test_addr((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyR}s<  !     c Cstdtd}tdtj}t}tdtd||||||||j|jtj |j f } || _ || _ || _ tjtjt} | jtjtj| | S(Ntcttdttargettargs(RRttimeRRR]RXRWtpickletdumpsRJt currentaddrt currentstarttkeyboardCaughtRtSIGILLRctstart( RtiworkerRt resultQueuet shouldStopRRRRtpR!((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyt startProcessEs(      c Cstjd||tj|jjj|}|dk rF|}n|jjj|j }|dk rv||_ nt }t }g}g}g}g} t } |j } t j } |j|||| | tjd|jjxVt|jjD]B} |j| ||| | }|j|tjd| dqWt|}|jj}d}yxY|rtjdt|||y|jd|\}}}}tjd||t|yYy|j|Wn$tk rtjd||nX|t|7}|j|WnAtk rhtjd |tjd tt|d nX|j||g|j| ||jjr| j r| j Pn|jj!r2tjd |||j"dd| j# r2|j$ r2tjd ||j|||| | ||nXyYxe| D]]}tjd|y|j?Wq5t9t:fk ruq5| j<|t=j>q5Xq5Wt j }| j@| jA| ||jjjB| |dkrtjdx3|D](}|j(r|jCddt&qqWnxbt'|D]T\}}|j(r+tjd ||j"|j(rtjd|qq+q+WWn`t9t:fk rtj;dx'|D]}|j(r|j2qqW|r|qnX| S(s Execute the test (which may be a test suite). If the test is a suite, distribute it out among as many processes as have been configured, at as fine a level as is possible given the context fixtures defined in the suite or any sub-suites. s%s.run(%s) (%s)sStarting %s workerssStarted worker process %sis5Waiting for results (%s/%s tasks), next timeout=%.3fsttimeouts1Results received for worker %d, %s, new tasks: %ds)worker %s failed to remove from tasks: %ssGot result for unknown task? %ss current: %sisjoining worker %ss!starting new process on worker %ss8Timed out with %s tasks pending (empty testQueue=%r): %stasciig?sLworker %d has finished its work item, but is not exiting? do we wait for it?stimed out worker %s: %sRsterminating worker %ssAll workers deadsCompleted %s tasks (%s remain)s4parent received ctrl-c when waiting for test resultss#Tearing down shared fixtures for %ssTell all workers to stoptSTOPtblocksfailed to join worker %ss=parent received ctrl-c when shutting down: stop all processesN(DRiRjtostgetpidRJtpluginst prepareTestRNtsetOutputStreamRZRRt _makeResultRR}R3trangeRRyRR8R>tremoveRMR textendRFR*tlistt consolidatet stopOnErrort wasSuccessfultsetR<tjointis_settemptyR R?t enumeratetis_aliveRRR RRQRtcleart waitkilltimeterrort terminatetkilltpidRRtsleeptminRtRutinfoRxRvRwttearDownt printErrorst printSummarytfinalizetput(RRotwrappertwrappedRRRt completedRSRRRRtiRt total_taskst nexttimeoutt thrownErrorRtaddrt newtask_addrst batch_resultt any_alivetwt worker_addrttimeprocessingt startkilltimeteR,tstopRI((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pytrun[s                    $    )          #   $!#               cCsd}t|tjjrHt|jdrHd|j_|jj}nt j |}|j ||fdt |dk r|t |7}n|dk r|j|n|S(NtargR(RNRlRmR,RnRGRot descriptorRR\taddressRR?R*Ry(RRR,RR((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyR~s'   cCst|dr'|j\}}}n:t|drQt|j\}}}ntd|g}|dkr|dkrtd|q|j|n=tjj |\}}|j dr|}n|j||dk r|j|ndj t t |S(NRRqsUnable to convert %s to addresssUnaddressable case %sRt:(RGRRRqt ExceptionRNRyRtpathtsplitt startswithRtmapR*(R,tfiletmodtcalltpartstdirnametbasename((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyR!s"     ccs t|dr+t|jdts+dSnt|trL|j|jsrt|dt srt|tj  rt|trt |}t |dkrt|ddd|jkr|d}qn|Vn0x-|D]%}x|j |D] }|VqWqWdS(NRqt_multiprocess_t can_splitii(RGR{RqRQRlRt hasFixturest checkCanSplittunittestt TestSuiteRRRNRh(RRot containedR,tbatch((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyRh:s"   cCs$|s tSt|dtr tStS(s8 Callback that we use to check whether the fixtures found in a context or ancestor are ones we care about. Contexts can tell us that their fixtures are reentrant by setting _multiprocess_can_split_. So if we see that, we return False to disregard those fixtures. R|(R?R{RQ(Rqtfixt((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyRZs cCs,t|dd}|stSt|dtS(NRqRg(R{RNR?(RR,Rq((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyRrjscCs1tjd|y|\}}}}}Wn;tk rftjd|tjtj|dSX|jj||j |7_ |j j ||j j |xn|j D]`\}\} } } ||jkrg| | f|j|d }nd }t j|||td |_t"j#t$j%||j|||j||fnJ|rd }nd }t j ||||j|||j||f|sqnt&k r/td |_t jd|natd |_t jd|t"j#t$j%||j|||j||fnXj'rPqqWt j d|dS(NsWorker %s executing, pid=%dRJcsjdjS(NR(R>R8((RJR(s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyR>scsPtt}|dddjd}jj|}|rL|S|S(Nt descriptionsiR[RJ(RR R[RtprepareTestResult(RZRt plug_result(RJR(s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyt makeResults  c Ssg|jD]\}}t||f^q }g|jD]\}}t||f^q8}i}x^|jjD]M\}\}}} g|D]\}}t||f^q|| f||RRRRRRoRRtmsg((RJRRs=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyRs              &  % "   & RcBs;eZdZdZdZdZdZdZdZ RS(s Context suite that never fires shared fixtures. When a context sets _multiprocess_shared_, fixtures in that context are executed by the main process. Using this suite class prevents them from executing in the runner process as well. cCs0t|dtrdStt|j|dS(NRg(R{R?ReRt setupContext(RRq((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyRscCs0t|dtrdStt|j|dS(NRg(R{R?ReRtteardownContext(RRq((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyRscCs5tjdt|||j|jrD|j|||}}n ||}}y|jWn:tk run'd|_|j||j dSXz>x7|jD],}t |t j j r|jdk r|j|j_n |j|_|j|_|j|_|jr#tjdPny||Wqtk r}t |t}|rad}nd}tj|||ttt|tjdf}|jjj|||j|||sqqXqWWdt|_y|jWn9tk r n&d|_|j||j nXXdS( s5Run tests in suite inside of suite fixtures. s#suite %s (%s) run called, tests: %stsetupNtstoppings(Timeout when running test %s in suite %ss2KeyboardInterrupt when running test %s in suite %sitteardown(RiRjR%t_testst resultProxyRsRtt error_contextRxt _exc_infoRlRmR,RnRRNRoRRRR R*RvRwRJRRQthas_runR(RRtorigRoRRR R((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyRs\              N( RRR_RNRRRRRR(((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyRs  (6R_tloggingRRvRt tracebackRRRt nose.caseRmt nose.coreRRRtnose.plugins.baseRtnose.pyversionRt nose.resultRt nose.suiteRt nose.utilRtunittest.runnerRRRR twarningsR t cStringIOR RNRt getLoggerRRiRRRRRRtR R#R$R-RcR\R]RR(((s=/sys/lib/python2.7/site-packages/nose/plugins/multiprocess.pyt^sP            _ r a