Skip to main content

Shakudo API

Notebook_Common

notebook_common is part of the Shakudo Platform Hyperplane API that contains convenience functions for Dask and pipeline jobs. It contains functions to manage Dask clusters, pipeline jobs, and Slack messages, and GraphQL operations.

Dask

Dask is a flexible open source distributed framework for parallel computing. It has similar APIs to NumPy and Pandas, is an ideal choice for parallelizing NumPy, Pandas and List based code.

quickstart_dask()

Use quickstart_dask to quickly spin up a Dask cluster using t-shirt sizes. Returns a tuple [Client, KubeCluster].

from hyperplane.notebook_common import quickstart_dask
client, cluster = quickstart_dask(
num_workers = 4,
size = 'hyperplane-med-high-mem'
)

Parameters

NameTypeDescription
num_workers
Required
integerNumber of workers
size
Required
stringPre-configured worker pools

Pre-configured Worker pools

The preconfigured worker pools are the following

NameWorker PoolAllocatable coresAllocatable ram
hyperplane-xs-high-memPOOL_4_323.57.0
hyperplane-smallPOOL_8_87.05.0
hyperplane-small-mid-memPOOL_8_167.512.0
hyperplane-small-high-memPOOL_8_647.558.0
hyperplane-medPOOL_16_1615.012.0
hyperplane-med-mid-memPOOL_16_3215.027.0
hyperplane-med-high-memPOOL_16_12815.0110.0
hyperplane-largePOOL_32_3228.027.0
hyperplane-xxl-high-memPOOL_96_76894.0675.0

initialize_dask_cluster()

Initialize a distributed DASK cluster. Returns a tuple [Client, KubeCluster]. You may use the returned client and cluster like any other dask cluster.

from hyperplane.notebook_common import initialize_dask_cluster
client, cluster = initialize_dask_cluster(
num_workers:int=2,
local_mode:bool=False,
worker_spec_yaml:str=WORKER_SPEC_TEMPLATE_1_1,
timeout:int=1200,
nthreads:int=1,
nprocs:int=15, ram_gb_per_proc:float=0.7,
cores_per_worker:int=15,
scheduler_deploy_mode:str="remote",
dashboard_port:str="random",
logging:str="quiet"
)

Parameters

NameTypeDescription
num_workersinteger(Default value: 2) Number of Dask worker nodes.
local_modeboolWhether to use local cluster or distributed KubeCluster
worker_spec_yamlstringA string YAML for cluster configs
timeoutintegerTime limit (seconds) for a scheduler to wait to connect before returning a timeout error
nthreadsintegerNumber of threads per worker in your cluster
nprocsintegerNumber of processes per worker in your cluster
ram_gb_per_procfloatGB of Ram per process, per worker
cores_per_workerintegerNumber of cores per worker
scheduler_deploy_modestringWhere to deploy the scheduler (remote in its own worker, or locally in jhub). Choose remote when the Dask graph
dashboard_portstringChoose a port number for your dashboard, or leave as "random" to have a random port, which will not conflict
loggingstringLogging level for printouts when initializing. Available options are verbose or quiet.
note

The number of dask workers in the cluster will be the num_workers x num_procs. Shakudo platform will automatically choose the closest pool from the pre-configured node pool based on the combination of parameters specified.

Example

from hyperplane import notebook_common as nc
client, cluster = nc.initialize_dask_cluster(num_workers=2)
from hyperplane import notebook_common as nc
client, cluster = nc.initialize__dask_cluster(
num_workers=2,
nthreads=1,
nprocs=15,
ram_gb_per_proc=0.7,
cores_per_worker=15
)

daskpool_candidates

Use daskpool_candidates when you'd like to access the list of available dask pools to choose from to spin up a Dask cluster.

candidates = nc.daskpool_candidates
candidates

get_dask_cluster()

Retrieve a Dask cluster. Use this function if there's a Dask cluster that's already spun up that you would like to connect.

from hyperplane import notebook_common as nc
client = nc.get_dask_cluster("dask_cluster_name")
client

Parameters

NameTypeDescription
dask_cluster_namestringName of Dask cluster

To retrieve the Dask cluster name, navigate to the Ray & Dask tab on the platform and click the copy button in the table column Cluster Name.


cluster.close() & client.close()

Use cluster.close() and client.close() to destroy or shut down a dask cluster after it is no longer needed to free up resources. The platform comes with an automatic garbage collection functionality - if you forget to close the cluster the platform will automatically close it after a few minutes of idle time.

Starting a cluster and shutting it down:

from hyperplane import notebook_common as nc
client, cluster = nc.initialize_dask_cluster(num_workers=2)

cluster.close()
client.close()

Retrieving a forgotten Dask cluster and closing it:

from hyperplane import notebook_common as nc
client = nc.get_dask_cluster("dask-cluster-with-some-random-hash")

cluster.close()
client.close()

client.restart()

Use client.restart whenever you want to clean up dask memory.


client.restart()

note

Dask remembers every line of code that was run since initializing the cluster. If you'd like to edit a line of code after it's already been run once, then restart the dask client to ensure that the script runs smoothly.


Pipeline Jobs

There are many ways pipeline jobs can be controlled: dashboard interface, GraphQL Playground, and Hyperplane API notebook_commons. You can submit, cancel, get output, and check status on jobs from your Sessions.

submit_pipeline_job

Use this function to submit a job from your Sessions. See Create a pipeline for details on submission query fields. The function returns a job id and runID.

Navigate to the Shakudo Platform Jobs tab and filter by the returned job ID if you'd like to check that the job has been successfully submitted.

from hyperplane import notebook_common as nc
newjob = await nc.submit_pipeline_job(
jobName = 'name_of_newjob',
pipelineYamlPath = 'yaml_path.yaml',
jobType = 'basic',
timeout = 1800,
active_timeout = 1800,
max_retries = 2,
parameters = {
"a":1,
"b":1
}
)
newjob

Parameters

NameTypeDescription
jobNamestringcustom name for your job
pipelineYamlPath
Required
integer(Default value: 2) Number of Dask worker nodes.
jobTypestringJob (EC) type to use
timeoutintegerMaximum time that the pipeline may run, starting from the moment of job submission
active_timeoutintegerMaximum time that the pipeline may run once it is picked up
max_retriesintegerNumber of times to retry the job if the job run has failed or timed out.
parametersdictionaryKey value pairs for any parameters in your script you'd like to overwrite for this pipeline job

checkjobs()

Check status on jobs using job IDs. Returns a summary of job status and links to Dask dashboards if Dask is used.

ids = ['a-job-id', 'another-job-id']
res = await nc.checkjobs(ids, loop = True)
print(res)

Output will look like the following:

#### Jobs summary
0 / 2 in progress

0 / 2 pending

2 / 2 processed

2 done | 0 timed out | 0 cancelled | 0 failed

Progress: 100.0%

#### Dask dashboards
a-job-id done None
another-job-id done None

Parameters

NameTypeDescription
ids
Required
listList of job IDs to check status
loopboolean(Default value: False) True will refresh the output every 5 seconds until all jobs are processed
intervalintegerRefresh frequency for loop = True

cancel_pipeline_jobs()

Use cancel_pipeline_jobs() to cancel a pipeline job from Sessions. Returns {'id': 'job-id', 'status': 'cancelled}

await nc.cancel_pipeline_job('job-id')

Parameters

NameTypeDescription
jobID
Required
stringID of pipeline job to cancel

GraphQL

Submit a GraphQL query using graphql_operations() function.

Example for submitting a job

gql_query = '''    
mutation {
createShakudoPipelineJob(
jobName: "unruffled_ardinghelli",
jobType: "basic",
timeout: 1800,
activeTimeout: 1800,
maxRetries: 2,
pipelineYamlPath: "yaml_path.yaml",
defaultCommands: true,
gitInit: true,
commitId: "",
branchName: "",
parameters: { create: [
{ key: "a", value: "1" },
]}
) {
id
jobName
schedule
parameters {
key
value
}
noGitInit
noHyperplaneCommands
}
}
'''
gql_queries = [gql_query]
results = await nc.graphql_operation(gql_queries)

Parameters

NameTypeDescription
gql_queries
Required
Union[str, List[str]]GraphQL query for relevant operation

Ray_Common

ray_common is part of the Shakudo Platform Hyperplane API that contains convenience functions to manage Ray clusters. We support extensions to the basic Ray framework by supporting Ray Tune, Ray Spark, Ray with RAPIDS, and more.

Ray

Ray is an open source project that distributed frameworks that has a more support for deep learning and reinforcement learning. It has a rich set of libraries and integrations built on a flexible distributed execution framework, is ideal choice for parallelizing model training and hyper-parameter tuning.

quickstart_ray()

Use quickstart_ray to quickly spin up a Ray cluster using t-shirt sizes (Sizes are the same as quick start for Dask clusters).

from hyperplane import ray_common as rc
ray_cluster = rc.quickstart_ray(
num_workers = 4,
size = 'hyperplane-med-high-mem'
)

Parameters

NameTypeDescription
num_workers
Required
integerNumber of workers
size
Required
objectPre-configured worker pools

initialize_ray_cluster()

Initialize a distributed Ray cluster with ease and more customizability. You can also run this function to clean up the Ray nodes and re-initialize.

from hyperplane import ray_common as rc
ray_cluster = rc.initialize_ray_cluster(
num_workers = 4,
cpu_core_per_worker = 4,
ram_gb_per_worker = 4,
n_gpus = 0
)

Parameters

NameTypeDescription
num_workersinteger(Default value: 2) Number of Ray nodes to be initialized
cpu_core_per_workerintegerNumber of CPU cores in each Ray node
ram_gb_per_workerfloatMemory size in GB for each Ray node
n_gpusintegerNumber of Nvidia GPUs in each Ray node (if n_gpus > 0, cpu_core_per_worker and ram_gb_per_worker are ignored)
use_existingboolean(Default: use_existing = False) Whether to connect to/ reinitialize existing Ray cluster or spin up a new one
note

If you are aiming for a specific pool, ensure your cpu_core_per_worker = the number of allocatable cores and ram_gb_per_worker = the allocatable ram. For example, if you would like to use a POOL_16_16 worker, you may want to use the following cluster initialization.

from hyperplane import ray_common as rc
ray_cluster = rc.initialize_ray_cluster(
num_workers = 4,
cpu_core_per_worker = 15,
ram_gb_per_worker = 12
)

stop_ray_cluster()

Use stop_ray_cluster to shutdown a Ray cluster. After computation is finished, it's a good idea to shutdown the distributed cluster and release the resources back to the node pool. If any Ray nodes are left hanging, Shakudo Platform's garbage collection function will also automatically shutdown the Ray workers when the Session or job is finished.

from hyperplane import ray_common as rc
rc.stop_ray_cluster(ray_cluster)

Parameters

NameTypeDescription
ray_cluster
Required
objectRay cluster to shutdown

get_ray_cluster()

Reconnect to a Ray cluster by using the get_ray_cluster to retrieve the cluster. You can use this function if you've already spun up a Ray cluster and want to connect to the same cluster (for example: in another notebook in the same session). This function will connect to an existing cluster. There are two ways to reconnect to Ray clusters.

from hyperplane import ray_common as rc
rc.get_ray_cluster(extra_workers = 1)

Parameters

NameTypeDescription
extra_workersintegerAdds nodes to your existing cluster (Default: extra_workers = 0) The nodes that are added to the cluster will be of the same specification as the original cluster.

There are two ways to reconnect to Ray clusters. The method using the function get_ray_cluster() is the simpler and recommended way.

You can also use the initialize_ray_cluster() to accomplish the same. Note, the arguments for cpu_core_per_worker and ram_gb_per_worker must be the same as when you initialized the cluster originally.

from hyperplane import ray_common as rc
ray_cluster = rc.initialize_ray_cluster(
num_workers = 0,
cpu_core_per_worker = 15,
ram_gb_per_worker = 12,
use_existing = True
)

find_ray_workers

Use find_ray_workers() function to see if there are any Ray workers already spun up. Returns a list of Ray workers running.

from hyperplane import ray_common as rc
rc.find_ray_workers()

Utils

get_service_url

The get_service_url function will return the internal service URL for a running service on the platform. This is useful when another service or job uses an existing service. Using the internal service URL allows bypassing the authentication and lowers latency.

from hyperplane import utils
import requests
model_inference_url = utils.get_service_url('model_inference_endpoint')
requests.get(model_inference_url)

Slack

Shakudo's Hyperplane API includes a function allowing jobs to post messages to your Slack channel. You can use this to notify you of finished jobs, or even to post job results. On Shakudo there are two ways to send Slack notification messages: Use a slack token or use a webhook.

Method 1: Slack token To use the Slack alert integration you first need to set up a Slack token.

A slack token can be set up following this tutorial on Slack by creating a quick pre-configured App. Once the App is created and installed to the workspace, the token can be found at the Install App tab in the App's homepage. The token looks like this xoxb-694301530724-2549825675367-Zn4NNP34r3c7aN3EkPDLMiNX

Method 2: Slack Webhook URL

To send messages to Slack with webhooks, first follow this tutorial on Slack to create a new Slack App and obtain the webhook URL like this https://hooks.slack.com/services/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

note

You can also ask the Shakudo Platform admin to add or change the WEBHOOK_URL variable in your environment permanently.

SlackHelper

Initialize a Slack Helper object after specifying your Slack token.

Method 1: Token method

#Specify your Slack token
import os
os.environ["SLACK_TOKEN"] = "your-slack-t0ken"

#Initialize Slack Helper
from hyperplane import utils
sh = utils.SlackHelper()
note

When using Method 2: Slack Webhook URL then you do not need to initialize SlackHelper. Just add the webhook URL as an environment variable.

import os
os.environ['WEBHOOK_URL']='your_webhook_url'

post_message

Post a Slack Message with Slack Token.

Method 1: Token method

from hyperplane import utils
sh = utils.SlackHelper()
sh.post_message('Testing!', '#some-channel')

Method 2: Webhook method

#Post a message
from hyperplane import utils
utils.post_message('Testing!', '#some-channel')

Parameters

NameTypeDescription
message
Required
stringString of message to send (can include formatting)
channelstring(Default: "") A public channel to post the message. If left empty, it will default to the webhook URL's default channel

post_table()

Post a Slack Message in table format with Slack Token.

Method 1: Token method

from hyperplane import utils
sh = utils.SlackHelper()

message_dict = {
"key1": "value1",
"key2": "value2"
}
sh.post_table(message_dict, channel = "alert-channel")

Method 2: Webhook method

from hyperplane import utils

message_dict = {
"key1": "value1",
"key2": "value2"
}
sh.post_table(message_dict, channel = "alert-channel")

Parameters

NameTypeDescription
message_dict
Required
dictionaryDictionary of values to send in a table format
channelstring(Default: "") A public channel to post the msg. If left empty, it will default to the webhook URL's default channel

NLP models

Hyperplane comes with some pre-packged NLP models for common tasks. The hyperplane nlp tools can be accessed through

from hyperplane import hyper_nlp as nlp

Extract topics and themes

Find the main topics and themes with the function get_topics() given a list of sentences, documents, or texts.

Returns: Topic_Model, List[str]

get_topics(data:List[str], ntopic:int=10, sample_size:int=50000, method:str="USE" )

Parameters

NameTypeDescription
data
Required
List[str]A list of strings (sentences or phrases) from which you want to discover topics and themes
ntopicintegerThe number of topics/themes to return
sample_sizeintegerThe number of samples to discover topics from
methodstringThe method for theme extraction. Choose from TFIDF, LDA, BERT, LDA_BERT, fasttext, USE

Example:

%matplotlib inline ## use this in a notebook to display charts

nlp.get_topics(["I need to buy some green and red apples.",
"Oranges are also useful for juices",
"I really like bananas and fruits",
"You need to clean up your car.",
"I am running out of out of gas"],
ntopic=2,
sample_size=50,
method="USE"
)

The function will return the topic model and a list of top words per cluster:

(<topic_discovery.Topic_Model at 0x7f88abc8e9a0>,
array([['out', 'of', 'I', 'am', 'running', 'gas', 'You', 'need', 'to',
'clean'],
['I', 'and', 'Oranges', 'are', 'also', 'useful', 'for', 'juices',
'need', 'to']], dtype=object))

Extract text

With the function extract_qa() when given some text, you can ask a question to extract an arbitrary field.

Returns: List[Dict]

from hyperplane import hyper_nlp as nlp
nlp.get_topics(data:List[str], ntopic:int=10, sample_size:int=50000, method:str="USE" )

Parameters

NameTypeDescription
text
Required
stringContext string which the answer will be extracted from
question
Required
stringA question that you want to ask based on the context
topkintegerThe number of top answers to return per question
return_contextbooleanWhether or not to return context around the answer
context_windowintegerIf return_context how much of the context to return

Example:

text = '''
Hyperplane is an end-to-end platform designed to take AI teams from ideation to production at breakthrough speeds. We built Hyperplane because we needed a powerful platform for our scientists to design, develop, deploy and maintain their own work in production.

Why Us?

The Shakudo team grew out of advanced AI organizations across the industry. After having seen, tried and used every product out there, we came to the conclusion that there is a gap to be filled, and Hyperplane was born.

What does this mean for you?

If you are scaling up an AI organization, starting up an AI-powered product, or looking to get your existing solutions faster and more reliably to production, Hyperplane may be for you.
'''

questions = [
"What does Hyperplane do?"
]

nlp.extract_qa(text, questions[0])

This will return:

[{'score': 0.04723832756280899,
'start': 51,
'end': 115,
'answer': 'take AI teams from ideation to production at breakthrough speeds'}]

Save and load

To save serializable models, you can use the following:

nlp.save_model(model=tm, filename="tm_model")

where tm is a serializable object.

To load an existing model saved with hyper_nlp, use the following:

loaded_model = nlp.load_model(filename='tm_model')

Find urls

Use the function find_urls() to look for strings that are urls

Returns: List[str]

nlp.find_urls(s:str)

Parameters

NameTypeDescription
s
Required
stringA string from which you would like to search for urls

Remove stopwords

Use the function clean_text() to remove stopwords from a string of text.

Returns a string with stopwords removed.

nlp.clean_text(s:str, remove_list:List[str]=en_stop_words)

Parameters

NameTypeDescription
s
Required
stringA string which you would like to clean by removing stopwords
remove_listList[str]A list of strings to include as stopwords to remove

Extract text from pdfs

Use extract_digital_pdf() function to extract text from pdfs.

Returns a string of text.

extract_digital_pdf(filepath:str, auto_clean_threshold:int=0)

Parameters

NameTypeDescription
filepath
Required
stringA filepath location for the digital pdf to extract
auto_clean_thresholdintegerA threshold for removing words that are too short. Keep at 0 to keep all words that are not stopwords. Use any other positive integer to remove words containing fewer than auto_clean_threshold number of letters.

Utils

get_service_url

The get_service_url function will return the internal service URL for a running service on the platform. This is useful when another service or job uses an existing service. Using the internal service URL allows bypassing the authentication and lowers latency.

from hyperplane import utils
import requests
model_inference_url = utils.get_service_url('model_inference_endpoint')
requests.get(model_inference_url)

Slack

Shakudo's Hyperplane API includes a function allowing jobs to post messages to your Slack channel. You can use this to notify you of finished jobs, or even to post job results. On Shakudo there are two ways to send Slack notification messages: Use a slack token or use a webhook.

Method 1: Slack token To use the Slack alert integration you first need to set up a Slack token.

A slack token can be set up following this tutorial on Slack by creating a quick pre-configured App. Once the App is created and installed to the workspace, the token can be found at the Install App tab in the App's homepage. The token looks like this xoxb-694301530724-2549825675367-Zn4NNP34r3c7aN3EkPDLMiNX

Method 2: Slack Webhook URL

To send messages to Slack with webhooks, first follow this tutorial on Slack to create a new Slack App and obtain the webhook URL like this https://hooks.slack.com/services/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

note

You can also ask the Shakudo Platform admin to add or change the WEBHOOK_URL variable in your environment permanently.

SlackHelper

Initialize a Slack Helper object after specifying your Slack token.

Method 1: Token method

#Specify your Slack token
import os
os.environ["SLACK_TOKEN"] = "your-slack-t0ken"

#Initialize Slack Helper
from hyperplane import utils
sh = utils.SlackHelper()
note

When using Method 2: Slack Webhook URL then you do not need to initialize SlackHelper. Just add the webhook URL as an environment variable.

import os
os.environ['WEBHOOK_URL']='your_webhook_url'

post_message

Post a Slack Message with Slack Token.

Method 1: Token method

from hyperplane import utils
sh = utils.SlackHelper()
sh.post_message('Testing!', '#some-channel')

Method 2: Webhook method

#Post a message
from hyperplane import utils
utils.post_message('Testing!', '#some-channel')

Parameters

NameTypeDescription
message
Required
stringString of message to send (can include formatting)
channelstring(Default: "") A public channel to post the message. If left empty, it will default to the webhook URL's default channel

post_table()

Post a Slack Message in table format with Slack Token.

Method 1: Token method

from hyperplane import utils
sh = utils.SlackHelper()

message_dict = {
"key1": "value1",
"key2": "value2"
}
sh.post_table(message_dict, channel = "alert-channel")

Method 2: Webhook method

from hyperplane import utils

message_dict = {
"key1": "value1",
"key2": "value2"
}
sh.post_table(message_dict, channel = "alert-channel")

Parameters

NameTypeDescription
message_dict
Required
dictionaryDictionary of values to send in a table format
channelstring(Default: "") A public channel to post the msg. If left empty, it will default to the webhook URL's default channel