Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
ENH: Large SGE jobs were overloading on qstat
When large jobs were run with SGE, the accumulation of qstat queries was causing a massive load on the cluster server, and was affecting overal system performance. This modification queries based on userid, stores information about all that users jobs (both running and recently finished), subsequent queries are then addressed by looking at the cached values first, then only updating with a system qstat call periodically. This change so that qstat is called on a more reasonable replication time frame. User can supply externally supplied version of qmake. Prevent DOS style of attacks on the batch processing server by preventing continuous queries by many jobs. This was affecting the performance of the entire server and the excess load of querying when jobs were done in the niavie way was affecting the performance of dispatching new jobs. The following two scripts can be used as plugin_args to provide cached versions of qmake suitable for running huge jobs. plugin_args=dict(template=JOB_SCRIPT, qsub_args="-S /bin/bash -cwd -pe smp 1-12 -l h_vmem=19G,mem_free=9G -o /dev/null -e /dev/null " + CLUSTER_QUEUE, qstatProgramPath=qstat_immediate.sh, qstatCachedProgramPath=qstat_cached.sh)) =qstat_cached.sh=================================================== \#!/bin/bash \# \author Hans J. Johnson \# This file provides a wrapper around qstat to ensure that \# the qstat server is not overloaded with too many requests \#debug_file=/dev/null debug_file=/tmp/TESTINGLOG qstat_cache=/tmp/qstat.xml echo "USING EXTERNAL QSTAT: $@" >> ${debug_file} 2>&1 older_than_60_sec=$( find $(dirname ${qstat_cache}) -maxdepth 1 -name $(basename ${qstat_cache}) -mmin $(echo 5/60 |bc -l) ) if [ -z "${older_than_60_sec}" ]; then DoQstatNow=$(dirname ${0})/qstat_immediate.sh ${DoQstatNow} $@ else echo "using cache $(date)" >> ${debug_file} 2>&1 cat ${qstat_cache} fi =================================================================== =qstat_immediate.sh=============================================== \#!/bin/bash \# \author Hans J. Johnson \# This file provides a wrapper around qstat to ensure that \# the qstat server is not overloaded with too many requests \#debug_file=/dev/null debug_file=/tmp/TESTINGLOG qstat_cache=/tmp/qstat.xml echo "USING EXTERNAL QSTAT: $@" >> ${debug_file} 2>&1 echo "Refreshing $(date)" >> ${debug_file} 2>&1 cacheUpdated=0; while [ ${cacheUpdated} -eq 0 ]; do if [ ! -f ${qstat_cache}_lock ]; then touch ${qstat_cache}_lock DoQstatNow=$(which qstat) ${DoQstatNow} $@ > ${qstat_cache}_tmp 2>&1 mv ${qstat_cache}_tmp ${qstat_cache} rm ${qstat_cache}_lock let cacheUpdated=1 else echo "Waiting for contention lock $(date)" >> ${debug_file} 2>&1 sleep 1 fi done cat ${qstat_cache} ===================================================================
  • Loading branch information
hjmjohnson committed Feb 10, 2014
commit 8b0f36b63caaa7c5c94eaed7d02774b3a33a5430
37 changes: 24 additions & 13 deletions nipype/pipeline/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,8 @@ def _submit_mapnode(self, jobid):
self.mapnodesubids[self.depidx.shape[0] + i] = jobid
self.procs.extend(mapnodesubids)
self.depidx = ssp.vstack((self.depidx,
ssp.lil_matrix(np.zeros((numnodes,
self.depidx.shape[1])))),
ssp.lil_matrix(np.zeros(
(numnodes, self.depidx.shape[1])))),
'lil')
self.depidx = ssp.hstack((self.depidx,
ssp.lil_matrix(
Expand Down Expand Up @@ -349,16 +349,19 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
if self._status_callback:
self._status_callback(self.procs[jobid], 'start')
continue_with_submission = True
if str2bool(self.procs[jobid].config['execution']['local_hash_check']):
if str2bool(self.procs[jobid].config['execution']
['local_hash_check']):
logger.debug('checking hash locally')
try:
hash_exists, _, _, _ = self.procs[
jobid].hash_exists()
logger.debug('Hash exists %s' % str(hash_exists))
if (hash_exists and
(self.procs[jobid].overwrite == False or
(self.procs[jobid].overwrite == None and
not self.procs[jobid]._interface.always_run))):
(self.procs[jobid].overwrite == False or
(self.procs[jobid].overwrite == None and
not self.procs[jobid]._interface.always_run)
)
):
continue_with_submission = False
self._task_finished_cb(jobid)
self._remove_node_dirs()
Expand Down Expand Up @@ -436,7 +439,8 @@ def _remove_node_dirs(self):
"""Removes directories whose outputs have already been used up
"""
if str2bool(self._config['execution']['remove_node_directories']):
for idx in np.nonzero((self.refidx.sum(axis=1) == 0).__array__())[0]:
for idx in np.nonzero(
(self.refidx.sum(axis=1) == 0).__array__())[0]:
if idx in self.mapnodesubids:
continue
if self.proc_done[idx] and (not self.proc_pending[idx]):
Expand Down Expand Up @@ -506,9 +510,13 @@ def _get_result(self, taskid):
'traceback': None}
results_file = None
try:
raise IOError(('Job (%s) finished or terminated, but results file '
'does not exist. Batch dir contains crashdump '
'file if node raised an exception' % node_dir))
error_message = ('Job id ({0}) finished or terminated, but '
'results file does not exist after ({1}) '
'seconds. Batch dir contains crashdump file '
'if node raised an exception.\n'
'Node working directory: ({2}) '.format(
taskid,timeout,node_dir) )
raise IOError(error_message)
except IOError, e:
result_data['traceback'] = format_exc()
else:
Expand Down Expand Up @@ -582,13 +590,16 @@ def _get_args(self, node, keywords):
value = getattr(self, "_" + keyword)
if keyword == "template" and os.path.isfile(value):
value = open(value).read()
if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict) and keyword in node.plugin_args:
if keyword == "template" and os.path.isfile(node.plugin_args[keyword]):
if hasattr(node, "plugin_args") and
isinstance(node.plugin_args, dict) and keyword in node.plugin_args:
if keyword == "template" and
os.path.isfile(node.plugin_args[keyword]):
tmp_value = open(node.plugin_args[keyword]).read()
else:
tmp_value = node.plugin_args[keyword]

if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']:
if 'overwrite' in node.plugin_args and
node.plugin_args['overwrite']:
value = tmp_value
else:
value += tmp_value
Expand Down
Loading