Skip to content
Snippets Groups Projects

Scale jobtime

Merged Michiel Cottaar requested to merge scale-jobtime into main
4 files
+ 48
3
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 39
2
@@ -338,6 +338,16 @@ class JobList:
@@ -338,6 +338,16 @@ class JobList:
raise ValueError("One or more of the jobs have failed.")
raise ValueError("One or more of the jobs have failed.")
logger.info("Successfully finished running all jobs using Dask.")
logger.info("Successfully finished running all jobs using Dask.")
 
def scale_jobtime(self, scaling):
 
"""
 
Scale the submit job times of all jobs by `scaling`.
 
 
This will only affect jobs submitted to the cluster.
 
"""
 
for job in self.jobs:
 
job.scale_jobtime(scaling)
 
return self
 
class JobParent:
class JobParent:
"""Parent for `SingleJob` and `BatchJob`.
"""Parent for `SingleJob` and `BatchJob`.
@@ -520,7 +530,7 @@ class SingleJob(JobParent):
@@ -520,7 +530,7 @@ class SingleJob(JobParent):
self.input_targets = input_targets
self.input_targets = input_targets
self.output_targets = output_targets
self.output_targets = output_targets
self.optional_targets = set(optionals)
self.optional_targets = set(optionals)
self.submit_params = submit_params
self.submit_params = dict(submit_params)
self.batch = batch
self.batch = batch
for target in self.input_targets:
for target in self.input_targets:
target.required_by.add(self)
target.required_by.add(self)
@@ -573,6 +583,15 @@ class SingleJob(JobParent):
@@ -573,6 +583,15 @@ class SingleJob(JobParent):
result = JobList(tree, [self], {}).copy()
result = JobList(tree, [self], {}).copy()
return self.batch, self.submit_params, result
return self.batch, self.submit_params, result
 
def scale_jobtime(self, scaling):
 
"""
 
Scale the submit job time in place by `scaling`.
 
 
This will only affect jobs submitted to the cluster.
 
"""
 
if "jobtime" in self.submit_params.keys():
 
self.submit_params["jobtime"] *= scaling
 
def get_target(filename: Path, all_targets, from_template=None) -> "FileTarget":
def get_target(filename: Path, all_targets, from_template=None) -> "FileTarget":
"""
"""
@@ -780,6 +799,15 @@ class BatchJob(JobParent):
@@ -780,6 +799,15 @@ class BatchJob(JobParent):
as_tuples = set.intersection(*[{(key, value) for key, value in job.set_parameters.items()} for job in self.torun])
as_tuples = set.intersection(*[{(key, value) for key, value in job.set_parameters.items()} for job in self.torun])
self.set_parameters = {key: value for key, value in as_tuples}
self.set_parameters = {key: value for key, value in as_tuples}
 
def copy(self, targets: Dict[str, "FileTarget"]):
 
"""
 
Create a copy of the `BatchJob` to be included in a new :class:`JobList`.
 
 
`targets` contain the set of FileTargets recognised by this new `JobList`.
 
This will be updated based on the input/output targets of each job within this batch.
 
"""
 
return BatchJob(*[job.copy(targets) for job in self.torun])
 
@property
@property
def batch(self, ):
def batch(self, ):
b = self.torun[0].batch
b = self.torun[0].batch
@@ -797,7 +825,7 @@ class BatchJob(JobParent):
@@ -797,7 +825,7 @@ class BatchJob(JobParent):
@property
@property
def submit_params(self, ):
def submit_params(self, ):
def sum_value(name, *times):
def sum_value(name, *times):
total = sum(int(t) for t in times if t is not None)
total = sum(t for t in times if t is not None)
return None if total == 0 else total
return None if total == 0 else total
def max_value(name, *rams):
def max_value(name, *rams):
@@ -852,6 +880,15 @@ class BatchJob(JobParent):
@@ -852,6 +880,15 @@ class BatchJob(JobParent):
value = re.sub(r'[^\w\s-]', '', name).strip().lower()
value = re.sub(r'[^\w\s-]', '', name).strip().lower()
return base + re.sub(r'[-\s]+', '-', value)
return base + re.sub(r'[-\s]+', '-', value)
 
def scale_jobtime(self, scaling):
 
"""
 
Scale the submit job time in place by `scaling`.
 
 
This will only affect jobs submitted to the cluster.
 
"""
 
for job in self.torun:
 
job.scale_jobtime(scaling)
 
def __repr__(self, ):
def __repr__(self, ):
"""Print job as a function call."""
"""Print job as a function call."""
return f"Batch({self.torun})"
return f"Batch({self.torun})"
Loading