Dask integration

PyUNICORE provides the UNICORECluster class, which is an implementation of a Dask Cluster, allowing to run the Dask client on your local host (or in a Jupyter notebook in the Cloud), and have the Dask scheduler and workers running remotely on the HPC site.

Here is a basic usage example:

import pyunicore.client as uc_client
import pyunicore.credentials as uc_credentials
import pyunicore.dask as uc_dask

# Create a UNICORE client for accessing the HPC cluster
base_url = "https://localhost:8080/DEMO-SITE/rest/core"
credential = uc_credentials.UsernamePassword("demouser", "test123")
submitter = uc_client.Client(credential, base_url)

# Create the UNICORECluster instance

uc_cluster = uc_dask.UNICORECluster(
   submitter,
   queue = "batch",
   project = "my-project",
   debug=True)

# Start two workers
uc_cluster.scale(2, wait_for_startup=True)

# Create a Dask client connected to the UNICORECluster

from dask.distributed import Client
dask_client = Client(uc_cluster, timeout=120)

That’s it! Now Dask will run its computations using the scheduler and workers started via UNICORE on the HPC site.

Configuration

When creating the UNICORECluster, a number of parameters can be set via the constructor. All parameters except for the submitter to be used are OPTIONAL.

  • submitter: this is either a Client object or an Allocation, which is used to submit new jobs

  • n_workers: initial number of workers to launch

  • queue: the batch queue to use

  • project: the accounting project

  • threads: worker option controlling the number of threads per worker

  • processes: worker option controlling the number of worker processes per job (default: 1)

  • scheduler_job_desc: base job description for launching the scheduler (default: None)

  • worker_job_desc: base job description for launching a worker (default: None)

  • local_port: which local port to use for the Dask client (default: 4322)

  • connect_dashboard: if True, a second forwarding process will be lauched to allow a connection to the dashboard (default: False)

  • local_dashboard_port: which local port to use for the dashboard (default: 4323)

  • debug: if True, print some debug info (default: False)

  • connection_timeout: timeout in seconds while setting up the port forwarding (default: 120)

Customizing the scheduler and workers

By default, the Dask extension will launch the Dask components using server-side applications called dask-scheduler and dask-worker, which need to be defined in the UNICORE IDB.

The job description will look like this:

{
      "ApplicationName": "dask-scheduler",
      "Arguments":  [
        "--port", "0",
        "--scheduler-file", "./dask.json"
      ],
      "Resources": {
              "Queue": "your_queue",
              "Project": "your_project"
      }
}

If you want to customize this, you can pass in a basic job description when creating the UNICORECluster object.

The job descriptions need not contain all command-line arguments, the UNICORECluster will add them as required. Also, the queue and project will be set if necessary.

For example

# Custom job to start scheduler

sched_jd = {
      "Executable" : "conda run -n dask dask-scheduler",
      "Resources": {
              "Runtime": "2h"
      },
      "Tags": ["dask", "testing"]
}

# Custom job to start worker

worker_jd = {
      "Executable" : "srun --tasks=1 conda run -n dask dask-worker",
      "Resources": {
              "Nodes": "2"
      }
}

# Create the UNICORECluster instance
uc_cluster = uc_dask.UNICORECluster(
   submitter,
   queue = "batch",
   project = "my-project",
   scheduler_job_desc=sched_jd,
   worker_job_desc=worker_jd
   )

Scaling

To control the number of worker processes and threads, the UNICORECluster has the scale() method, as well as two properties that can be set from the constructor, or later at runtime

The scale() method controls how many workers (or worker jobs when using “jobs=…” as argument) are running.

# Start two workers
uc_cluster.scale(2, wait_for_startup=True)

# Or start two worker jobs with 4 workers per job
# and 128 threads per worker
uc_cluster.processes =   4
uc_cluster.threads   = 128
uc_cluster.scale(jobs=2)

The dashboard

By default a connection to the scheduler’s dashboard is not possible. To allow connecting to the dashboard, set connect_dashboard=True when creating the UNICORECluster. The dashboard will then be available at http://localhost:4323, the port can be changed, if necessary.

Using an allocation

To speed up the startup and scaling process, it is possible to pre-allocate a multinode batch job (if the server side UNICORE supports this, i.e. runs UNICORE 9.1 and Slurm), and run the Dask components in this allocation.

import pyunicore.client as uc_client
import pyunicore.credentials as uc_credentials
import pyunicore.dask as uc_dask

# Create a UNICORE client for accessing the HPC cluster
base_url = "https://localhost:8080/DEMO-SITE/rest/core"
credential = uc_credentials.UsernamePassword("demouser", "test123")
submitter = uc_client.Client(credential, base_url)

# Allocate a 4-node job
allocation_jd = {
      "Job type": "ALLOCATE",

      "Resources": {
              "Runtime": "60m",
              "Queue": "batch",
              "Project": "myproject"
      }
}

allocation = submitter.new_job(allocation_jd)
allocation.wait_until_available()

# Create the UNICORECluster instance using the allocation

uc_cluster = uc_dask.UNICORECluster(allocation, debug=True)

Note that in this case your custom scheduler / worker job descriptions MUST use srun --tasks=1 ... to make sure that exactly one scheduler / worker is started on one node.

Also make sure to not lauch more jobs than you have nodes - otherwise the new jobs will stay “QUEUED”.