This example uses cloudknot to perform matrix-vector multiplication of some random matrices with varying standard deviations.
import cloudknot as ck
First, we write the python script that we want to run on AWS batch. Note that we import the necessary python packages within the function random_mv_prod
.
def random_mv_prod(b):
import numpy as np
import pandas as pd
import s3fs
import json
import logging
import os.path as op
import nibabel as nib
import dipy.data as dpd
import dipy.tracking.utils as dtu
import dipy.tracking.streamline as dts
from dipy.io.streamline import save_tractogram, load_tractogram
from dipy.stats.analysis import afq_profile, gaussian_weights
from dipy.io.stateful_tractogram import StatefulTractogram
from dipy.io.stateful_tractogram import Space
import dipy.core.gradients as dpg
from dipy.segment.mask import median_otsu
x = np.random.normal(0, b, 1024)
A = np.random.normal(0, b, (1024, 1024))
return np.dot(A, x)
Create a knot using the random_mv_prod
function and a job definition memory of 128 MiB.
knot = ck.Knot(name='random-mv-prod', base_image="python:3.7", func=random_mv_prod, memory=128, retries=3)
Submit 20 batch jobs to the knot. The map()
method returns a list of futures for the results of each batch job. You can optionally supply a list of environment variables to each job.
# import numpy since it was only imported in the `random_mv_prod` function above
import numpy as np
# Submit the jobs
result_future = knot.map(np.linspace(0.1, 100, 17), env_vars=[{'name': 'MY_ENV_VAR', 'value': 'foo'}])
We can query the jobs associated with this knot by calling knot.view_jobs()
, prints a bunch of job info and provides a consice summary of job statuses.
# Rerun this cell as often as you like to update your job status info
knot.view_jobs()
Job ID Name Status --------------------------------------------------------- 565605cc-6c10-45dc-9634-430c92a04d36 random_mv_prod-0 SUBMITTED
We can also inspect each BatchJob instance by looking at knot.jobs
which returns a list of BatchJob instances for each submitted job, e.g.:
last_job = knot.jobs[-1]
print(last_job.done)
print(last_job.result(timeout=5))
False
--------------------------------------------------------------------------- CKTimeoutError Traceback (most recent call last) <ipython-input-8-8f32aad30717> in <module>() 1 print(last_job.done) ----> 2 print(last_job.result(timeout=5)) ~/code/projects/cloudknot/cloudknot/aws/batch.py in result(self, timeout) 1898 1899 if not self.done: -> 1900 raise CKTimeoutError(self.job_id) 1901 1902 status = self.status CKTimeoutError: The job with job-id 565605cc-6c10-45dc-9634-430c92a04d36 did not finish within the requested timeout period
last_job.status
{'arrayProperties': {'size': 17, 'statusSummary': {'FAILED': 0, 'PENDING': 0, 'RUNNABLE': 17, 'RUNNING': 0, 'STARTING': 0, 'SUBMITTED': 0, 'SUCCEEDED': 0}}, 'attempts': [], 'status': 'PENDING', 'statusReason': None}
Knot.map()
returns a list of futures so you can use any of the futures methods to query the results, e.g. done()
or result()
.
print(result_future.done())
True
print(result_future.result())
[array([ 0.33573444, -0.17816871, 0.04182915, ..., 0.18977458, 0.18525011, 0.28909461]), array([-2170.50641905, -914.96192525, -2935.42394086, ..., -845.61602121, -1066.55543101, 3786.07847293]), array([ 5518.26250287, 2629.82191285, 3973.26642432, ..., 245.35879457, 2378.91610231, 1440.85016483]), array([ 12145.92971008, -1752.67075638, -2236.9687673 , ..., -10473.04861508, 4948.93624631, -5164.02257178]), array([ 19220.37103598, -11636.39028164, 4435.09553021, ..., 275.34159951, -40660.23318601, -31436.39319062]), array([-12139.67549791, -37191.77517368, 17676.0058413 , ..., 3871.21504177, -15745.62622439, -15256.27259278]), array([-33710.44216824, 26970.72741577, -25699.97672714, ..., 907.34711287, -12405.55383236, 25991.10145291]), array([ 20642.78548984, -51662.87680929, -13446.32639517, ..., -13242.89581277, 8647.19798733, 69037.63436123]), array([ -11077.38603201, 25959.65404178, -123167.99186893, ..., -25486.75276358, 33565.61649554, 68675.4970642 ]), array([-139402.98727787, 35577.84187373, 8167.51240975, ..., -115266.37136274, 53858.43230435, -22372.13590293]), array([ -85660.27928164, -87524.78978449, 5849.47007401, ..., 108583.87339172, -38679.96416028, -6415.63103812]), array([ 67318.53630424, 1914.09187751, -20859.14110294, ..., -156192.64555252, -293051.5218775 , -180415.32923358]), array([ -26414.10480353, -145369.12520887, -82828.38001282, ..., -273897.86414521, -161427.48206156, -88802.77429876]), array([ 107773.50860438, 315763.8425277 , -64905.07963653, ..., 32352.21473818, 191698.54867767, 215704.20427246]), array([ 308656.37474034, 147695.23439019, -26775.28966502, ..., -159280.69577612, -88390.58938526, 290458.81708465]), array([ 108520.36949807, -232753.97901184, 162913.86153282, ..., 70634.99906695, 40860.97626671, -597361.98929967]), array([ 453458.8684153 , 148091.52259826, -430783.77627923, ..., 6989.64826569, 299962.44403145, 370271.46710545])]
Once you're all done, clobber the knot, including the underlying PARS and the remote repo.
knot.clobber(clobber_pars=True, clobber_repo=True, clobber_image=True)