Skip to main content

Shakudo API

Notebook_Common

notebook_common is part of the Shakudo Platform Hyperplane API that contains convenience functions for Dask and pipeline jobs. It contains functions to manage Dask clusters, pipeline jobs, and Slack messages, and GraphQL operations.

Dask

Dask is a flexible open source distributed framework for parallel computing. It has similar APIs to NumPy and Pandas, is an ideal choice for parallelizing NumPy, Pandas and List based code.

quickstart_dask()

Use quickstart_dask to quickly spin up a Dask cluster using t-shirt sizes. Returns a tuple [Client, KubeCluster].

from hyperplane.notebook_common import quickstart_dask
client, cluster = quickstart_dask(
num_workers = 4,
size = 'hyperplane-med-high-mem'
)

Parameters

NameTypeDescription
num_workers
Required
integerNumber of workers
size
Required
stringPre-configured worker pools

Pre-configured Worker pools

The preconfigured worker pools are the following

NameWorker PoolAllocatable coresAllocatable ram
hyperplane-xs-high-memPOOL_4_323.57.0
hyperplane-smallPOOL_8_87.05.0
hyperplane-small-mid-memPOOL_8_167.512.0
hyperplane-small-high-memPOOL_8_647.558.0
hyperplane-medPOOL_16_1615.012.0
hyperplane-med-mid-memPOOL_16_3215.027.0
hyperplane-med-high-memPOOL_16_12815.0110.0
hyperplane-largePOOL_32_3228.027.0
hyperplane-xxl-high-memPOOL_96_76894.0675.0

initialize_dask_cluster()

Initialize a distributed DASK cluster. Returns a tuple [Client, KubeCluster]. You may use the returned client and cluster like any other dask cluster.

from hyperplane.notebook_common import initialize_dask_cluster
client, cluster = initialize_dask_cluster(
num_workers:int=2,
local_mode:bool=False,
worker_spec_yaml:str=WORKER_SPEC_TEMPLATE_1_1,
timeout:int=1200,
nthreads:int=1,
nprocs:int=15, ram_gb_per_proc:float=0.7,
cores_per_worker:int=15,
scheduler_deploy_mode:str="remote",
dashboard_port:str="random",
logging:str="quiet"
)

Parameters

NameTypeDescription
num_workersinteger(Default value: 2) Number of Dask worker nodes.
local_modeboolWhether to use local cluster or distributed KubeCluster
worker_spec_yamlstringA string YAML for cluster configs
timeoutintegerTime limit (seconds) for a scheduler to wait to connect before returning a timeout error
nthreadsintegerNumber of threads per worker in your cluster
nprocsintegerNumber of processes per worker in your cluster
ram_gb_per_procfloatGB of Ram per process, per worker
cores_per_workerintegerNumber of cores per worker
scheduler_deploy_modestringWhere to deploy the scheduler (remote in its own worker, or locally in jhub). Choose remote when the Dask graph
dashboard_portstringChoose a port number for your dashboard, or leave as "random" to have a random port, which will not conflict
loggingstringLogging level for printouts when initializing. Available options are verbose or quiet.
note

The number of dask workers in the cluster will be the num_workers x num_procs. Shakudo platform will automatically choose the closest pool from the pre-configured node pool based on the combination of parameters specified.

Example

from hyperplane import notebook_common as nc
client, cluster = nc.initialize_dask_cluster(num_workers=2)
from hyperplane import notebook_common as nc
client, cluster = nc.initialize__dask_cluster(
num_workers=2,
nthreads=1,
nprocs=15,
ram_gb_per_proc=0.7,
cores_per_worker=15
)

daskpool_candidates

Use daskpool_candidates when you'd like to access the list of available dask pools to choose from to spin up a Dask cluster.

candidates = nc.daskpool_candidates
candidates

get_dask_cluster()

Retrieve a Dask cluster. Use this function if there's a Dask cluster that's already spun up that you would like to connect.

from hyperplane import notebook_common as nc
client = nc.get_dask_cluster("dask_cluster_name")
client

Parameters

NameTypeDescription
dask_cluster_namestringName of Dask cluster

To retrieve the Dask cluster name, navigate to the Ray & Dask tab on the platform and click the copy button in the table column Cluster Name.


cluster.close() & client.close()

Use cluster.close() and client.close() to destroy or shut down a dask cluster after it is no longer needed to free up resources. The platform comes with an automatic garbage collection functionality - if you forget to close the cluster the platform will automatically close it after a few minutes of idle time.

Starting a cluster and shutting it down:

from hyperplane import notebook_common as nc
client, cluster = nc.initialize_dask_cluster(num_workers=2)

cluster.close()
client.close()

Retrieving a forgotten Dask cluster and closing it:

from hyperplane import notebook_common as nc
client = nc.get_dask_cluster("dask-cluster-with-some-random-hash")

cluster.close()
client.close()

client.restart()

Use client.restart whenever you want to clean up dask memory.


client.restart()

note

Dask remembers every line of code that was run since initializing the cluster. If you'd like to edit a line of code after it's already been run once, then restart the dask client to ensure that the script runs smoothly.


Pipeline Jobs

There are many ways pipeline jobs can be controlled: dashboard interface, GraphQL Playground, and Hyperplane API notebook_commons. You can submit, cancel, get output, and check status on jobs from your Sessions.

submit_pipeline_job

Use this function to submit a job from your Sessions. See Create a pipeline for details on submission query fields. The function returns a job id and runID.

Navigate to the Shakudo Platform Jobs tab and filter by the returned job ID if you'd like to check that the job has been successfully submitted.

from hyperplane import notebook_common as nc
newjob = await nc.submit_pipeline_job(
jobName = 'name_of_newjob',
pipelineYamlPath = 'yaml_path.yaml',
jobType = 'basic',
timeout = 1800,
active_timeout = 1800,
max_retries = 2,
parameters = {
"a":1,
"b":1
}
)
newjob

Parameters

NameTypeDescription
jobNamestringcustom name for your job
pipelineYamlPath
Required
integer(Default value: 2) Number of Dask worker nodes.
jobTypestringJob (EC) type to use
timeoutintegerMaximum time that the pipeline may run, starting from the moment of job submission
active_timeoutintegerMaximum time that the pipeline may run once it is picked up
max_retriesintegerNumber of times to retry the job if the job run has failed or timed out.
parametersdictionaryKey value pairs for any parameters in your script you'd like to overwrite for this pipeline job

checkjobs()

Check status on jobs using job IDs. Returns a summary of job status and links to Dask dashboards if Dask is used.

ids = ['a-job-id', 'another-job-id']
res = await nc.checkjobs(ids, loop = True)
print(res)

Output will look like the following:

#### Jobs summary
0 / 2 in progress

0 / 2 pending

2 / 2 processed

2 done | 0 timed out | 0 cancelled | 0 failed

Progress: 100.0%

#### Dask dashboards
a-job-id done None
another-job-id done None

Parameters

NameTypeDescription
ids
Required
listList of job IDs to check status
loopboolean(Default value: False) True will refresh the output every 5 seconds until all jobs are processed
intervalintegerRefresh frequency for loop = True

cancel_pipeline_jobs()

Use cancel_pipeline_jobs() to cancel a pipeline job from Sessions. Returns {'id': 'job-id', 'status': 'cancelled}

await nc.cancel_pipeline_job('job-id')

Parameters

NameTypeDescription
jobID
Required
stringID of pipeline job to cancel