Welcome to PyUNICORE
UNICORE (UNiform Interface to COmputing REsources) offers a ready-to-run system including client and server software. It makes distributed computing and data resources available in a seamless and secure way in intranets and the internet.
PyUNICORE is a Python library providing an API for UNICORE’s REST API , making common tasks like file access, job submission and management, workflow submission and management more convenient, and integrating UNICORE features better with typical Python usage.
In addition, this library contains code for using UFTP (UNICORE FTP) for filesystem mounts with FUSE, a UFTP driver for PyFilesystem and a UNICORE implementation of a Dask Cluster
This project has received funding from the European Union’s Horizon 2020 Framework Programme for Research and Innovation under the Specific Grant Agreement Nos. 720270, 785907 and 945539 (Human Brain Project SGA 1, 2 and 3)
PyUNICORE is Open Source under the BSD License, the source code is on GitHub.
Installation
Install from PyPI with
pip install -U pyunicore
Additional extra packages may be required for your use case:
Using the UFTP fuse driver requires “fusepy”
Using UFTP with pyfilesystem requires “fs”
Creating JWT tokens signed with keys requires the “cryptography” package
You can install (one or more) extras with pip:
pip install -U pyunicore[crypto,fs,fuse]
Getting started
Creating a client for a UNICORE site
import pyunicore.client as uc_client
import pyunicore.credentials as uc_credentials
import json
base_url = "https://localhost:8080/DEMO-SITE/rest/core"
# authenticate with username/password
credential = uc_credentials.UsernamePassword("demouser", "test123")
client = uc_client.Client(credential, base_url)
print(json.dumps(client.properties, indent = 2))
Running a job and read result files
my_job = {'Executable': 'date'}
job = client.new_job(job_description=my_job, inputs=[])
print(json.dumps(job.properties, indent = 2))
job.poll() # wait for job to finish
work_dir = job.working_dir
print(json.dumps(work_dir.properties, indent = 2))
stdout = work_dir.stat("/stdout")
print(json.dumps(stdout.properties, indent = 2))
content = stdout.raw().read()
print(content)
Connect to a Registry and list all registered services
registry_url = "https://localhost:8080/REGISTRY/rest/registries/default_registry"
# authenticate with username/password
credential = uc_credentials.UsernamePassword("demouser", "test123")
r = uc_client.Registry(credential, registry_url)
print(r.site_urls)
More examples
Further examples for using PyUNICORE can be found in the “integration-tests” folder in the source code repository.
Authentication
PyUNICORE supports all the authentication options available for UNICORE, so you can use the correct one for the server that you are trying to access.
Basic authentication options
The classes for the supported authentication options are in the pyunicore.credentials package.
Username and password
import pyunicore.credentials as uc_credentials
# authenticate with username/password
credential = uc_credentials.UsernamePassword("demouser", "test123")
This will encode the supplied username/password and add it as an
HTTP header Authorization: Basic ...
to outgoing calls.
Bearer token (OAuth/OIDC)
This will add the supplied token as an HTTP header
Authorization: Bearer ...
to outgoing calls.
import pyunicore.credentials as uc_credentials
# authenticate with Bearer token
token = "..."
credential = uc_credentials.OIDCToken(token)
Basic token
This will add the supplied value as a HTTP header
Authorization: Basic ...
to outgoing calls.
import pyunicore.credentials as uc_credentials
# authenticate with Bearer token
token = "..."
credential = uc_credentials.BasicToken(token)
JWT Token
This is a more complex option that creates a JWT token that is signed with a private key - for example this is usually an authentication option supported by the UFTP Authserver. In this case the user’s UFTP / SSH key is used to sign.
The simplest way to create this credential is to use the create_credential() helper function.
import pyunicore.credentials as uc_credentials
# authenticate with SSH key
uftp_user = "demouser"
identity_file = "~/.uftp/id_uftp"
credential = uc_credentials.create_credential(
username = uftp_user,
identity = identity_file)
The JWTToken
credential can also be used for “trusted services”,
where a service uses its server certificate to sign the token. Of
course this must be enabled / supported by the UNICORE server.
Anonymous access
If for some reason you explicitly want anonymous calls, i.e. NO authentication
(which is treated differently from having invalid credentials!),
you can use the Anonymous
credential class:
import pyunicore.credentials as uc_credentials
# NO authentication
credential = uc_credentials.Anonymous()
This can be useful for simple health checks and the like.
User preferences (advanced feature)
If the user mapping at the UNICORE server gives you access to more than one remote user ID or primary group, you can select one using the user preferences feature of the UNICORE REST API.
The access_info() method shows the result of authentication and authorization.
import json
import pyunicore.client as uc_client
import pyunicore.credentials as uc_credentials
credential = uc_credentials.UsernamePassword("demouser", "test123")
base_url = "https://localhost:8080/DEMO-SITE/rest/core"
client = uc_client.Client(credential, base_url)
print(json.dumps(client.access_info(), indent=2)
You can get access to the user preferences via the Transport
object that every
PyUNICORE resource has.
For example, to select a primary group (from the ones that are available)
client = uc_client.Client(credential, base_url)
client.transport.preferences = "group:myproject1"
Note that (of course) you cannot select a UID/group that is not available, trying that will cause a 403 error.
Creating an authentication token (advanced feature)
For some use cases (like automated workflows) you might want to not store your actual credentials (like passwords or private keys) for security reasons. For this purpose, you can use your (secret) credentials to have the UNICORE server issue a (long-lived) authentication token, that you can then use for your automation tasks without worrying that your secret credentials get compromised.
Note that you still should keep this token as secure as possible, since it would allow anybody who has the token to authenticate to UNICORE with the same permissions and authorization level as your real credentials.
You can access the token issue endpoint using the PyUNICORE client class:
client = uc_client.Client(credential, base_url)
my_auth_token = client.issue_auth_token(lifetime = 3600,
renewable = False,
limited = True)
and later use this token for authentication:
import pyunicore.credentials as uc_credentials
credential = uc_credential.create_token(token=my_auth_token)
client = uc_client.Client(credential, base_url)
- The parameters are
lifetime
: token lifetime in secondsrenewable
: if True, the token can be used to issue a new tokenlimited
: if True, the token is only valid for the server that issued it. If False, the token is valid for all UNICORE servers that the issuing server trusts, i.e. usually those that are in the same UNICORE Registry
UFTP
UFTP (UNICORE FTP) is a fast file transfer toolkit, based on the standard FTP protocol, with an added authentication layer based on UNICORE.
To make a UFTP connection, a user first needs to authenticate to an authentication service, which will produce a one-time password, which is then used to connect to the actual UFTP file server.
UFTP support in PyUNICORE is based on the ftplib standard library.
Basic UFTP usage
Opening an FTP session involves authenticating to an authentication service using UNICORE credentials. Depending on the authentication service, different credentials might be accepted.
Here is a basic example using username/password.
import pyunicore.credentials as uc_credentials
import pyunicore.uftp as uc_uftp
# URL of the authentication service
auth_url = "https://localhost:9000/rest/auth/TEST"
# remote base directory that we want to access
base_directory = "/data"
# authenticate with username/password
credential = uc_credentials.UsernamePassword("demouser", "test123")
uftp_session = uc_uftp.UFTP().connect(credential, auth_url, base_directory)
The object returned by connect() is an ftplib FTP object.
Using UFTP for PyFilesystem
You can create a PyFilesystem FS object either directly in code, or implicitely via a URL.
The convenient way is via URL:
from fs import open_fs
fs_url = "uftp://demouser:test123@localhost:9000/rest/auth/TEST:/data"
uftp_fs = open_fs(fs_url)
The URL format is
uftp://[username]:[password]@[auth-server-url]:[base-directory]?[token=...][identity=...]
The FS driver supports three types of authentication
Username/Password - give username and password
SSH Key - give username and the identity parameter, where identity is the filename of a private key. Specify the password if needed to load the private key
Bearer token - give the token value via the token parameter
Mounting remote filesystems via UFTP
PyUNICORE contains a FUSE driver based on fusepy, allowing you to mount a remote filesystem via UFTP. Mounting is a two step process,
authenticate to an Auth server, giving you the UFTPD host/port and one-time password
run the FUSE driver
The following code example gives you the basic idea:
import pyunicore.client as uc_client
import pyunicore.credentials as uc_credentials
import pyunicore.uftp as uc_uftp
import pyunicore.uftpfuse as uc_fuse
_auth = "https://localhost:9000/rest/auth/TEST"
_base_dir = "/opt/shared-data"
_local_mount_dir = "/tmp/mount"
# authenticate
cred = uc_credentials.UsernamePassword("demouser", "test123")
_host, _port, _password = uc_uftp.UFTP().authenticate(cred, _auth, _base_dir)
# run the fuse driver
fuse = uc_fuse.FUSE(
uc_fuse.UFTPDriver(_host, _port, _password), _local_mount_dir, foreground=False, nothreads=True)
Dask integration
PyUNICORE provides the UNICORECluster
class, which is an implementation
of a Dask Cluster, allowing to run the Dask client on your local host (or in
a Jupyter notebook in the Cloud), and have the Dask scheduler and workers
running remotely on the HPC site.
Here is a basic usage example:
import pyunicore.client as uc_client
import pyunicore.credentials as uc_credentials
import pyunicore.dask as uc_dask
# Create a UNICORE client for accessing the HPC cluster
base_url = "https://localhost:8080/DEMO-SITE/rest/core"
credential = uc_credentials.UsernamePassword("demouser", "test123")
submitter = uc_client.Client(credential, base_url)
# Create the UNICORECluster instance
uc_cluster = uc_dask.UNICORECluster(
submitter,
queue = "batch",
project = "my-project",
debug=True)
# Start two workers
uc_cluster.scale(2, wait_for_startup=True)
# Create a Dask client connected to the UNICORECluster
from dask.distributed import Client
dask_client = Client(uc_cluster, timeout=120)
That’s it! Now Dask will run its computations using the scheduler and workers started via UNICORE on the HPC site.
Configuration
When creating the UNICORECluster
, a number of parameters can be set via the constructor.
All parameters except for the submitter to be used are OPTIONAL.
submitter: this is either a Client object or an Allocation, which is used to submit new jobs
n_workers: initial number of workers to launch
queue: the batch queue to use
project: the accounting project
threads: worker option controlling the number of threads per worker
processes: worker option controlling the number of worker processes per job (default: 1)
scheduler_job_desc: base job description for launching the scheduler (default: None)
worker_job_desc: base job description for launching a worker (default: None)
local_port: which local port to use for the Dask client (default: 4322)
connect_dashboard: if True, a second forwarding process will be lauched to allow a connection to the dashboard (default: False)
local_dashboard_port: which local port to use for the dashboard (default: 4323)
debug: if True, print some debug info (default: False)
connection_timeout: timeout in seconds while setting up the port forwarding (default: 120)
Customizing the scheduler and workers
By default, the Dask extension will launch the Dask components using server-side applications
called dask-scheduler
and dask-worker
, which need to be defined in the UNICORE IDB.
The job description will look like this:
{
"ApplicationName": "dask-scheduler",
"Arguments": [
"--port", "0",
"--scheduler-file", "./dask.json"
],
"Resources": {
"Queue": "your_queue",
"Project": "your_project"
}
}
If you want to customize this, you can pass in a basic job description when creating
the UNICORECluster
object.
The job descriptions need not contain all command-line arguments, the UNICORECluster
will add them as required. Also, the queue and project will be set if necessary.
For example
# Custom job to start scheduler
sched_jd = {
"Executable" : "conda run -n dask dask-scheduler",
"Resources": {
"Runtime": "2h"
},
"Tags": ["dask", "testing"]
}
# Custom job to start worker
worker_jd = {
"Executable" : "srun --tasks=1 conda run -n dask dask-worker",
"Resources": {
"Nodes": "2"
}
}
# Create the UNICORECluster instance
uc_cluster = uc_dask.UNICORECluster(
submitter,
queue = "batch",
project = "my-project",
scheduler_job_desc=sched_jd,
worker_job_desc=worker_jd
)
Scaling
To control the number of worker processes and threads, the UNICORECluster has the scale() method, as well as two properties that can be set from the constructor, or later at runtime
The scale() method controls how many workers (or worker jobs when using “jobs=…” as argument) are running.
# Start two workers
uc_cluster.scale(2, wait_for_startup=True)
# Or start two worker jobs with 4 workers per job
# and 128 threads per worker
uc_cluster.processes = 4
uc_cluster.threads = 128
uc_cluster.scale(jobs=2)
The dashboard
By default a connection to the scheduler’s dashboard is not possible. To allow connecting to
the dashboard, set connect_dashboard=True
when creating the UNICORECluster
.
The dashboard will then be available at http://localhost:4323
, the port can be changed,
if necessary.
Using an allocation
To speed up the startup and scaling process, it is possible to pre-allocate a multinode batch job (if the server side UNICORE supports this, i.e. runs UNICORE 9.1 and Slurm), and run the Dask components in this allocation.
import pyunicore.client as uc_client
import pyunicore.credentials as uc_credentials
import pyunicore.dask as uc_dask
# Create a UNICORE client for accessing the HPC cluster
base_url = "https://localhost:8080/DEMO-SITE/rest/core"
credential = uc_credentials.UsernamePassword("demouser", "test123")
submitter = uc_client.Client(credential, base_url)
# Allocate a 4-node job
allocation_jd = {
"Job type": "ALLOCATE",
"Resources": {
"Runtime": "60m",
"Queue": "batch",
"Project": "myproject"
}
}
allocation = submitter.new_job(allocation_jd)
allocation.wait_until_available()
# Create the UNICORECluster instance using the allocation
uc_cluster = uc_dask.UNICORECluster(allocation, debug=True)
Note that in this case your custom scheduler / worker job descriptions MUST use srun --tasks=1 ...
to make sure that exactly one scheduler / worker is started on one node.
Also make sure to not lauch more jobs than you have nodes - otherwise the new jobs will stay “QUEUED”.
Port forwarding / tunneling
Opens a local server socket for clients to connect to, where traffic gets forwarded to a service on a HPC cluster login (or compute) node. This feature requires UNICORE 9.1.0 or later on the server side.
You can use this feature in two ways
in your own applications via the
pyunicore.client.Job
class.you can also open a tunnel from the command line using the
pyunicore.forwarder
module
Here is an example for a command line tool invocation:
LOCAL_PORT=4322
JOB_URL=https://localhost:8080/DEMO-SITE/rest/core/jobs/some_job_id
REMOTE_PORT=8000
python3 -m pyunicore.forwarder --token <your_auth_token> \
-L $LOCAL_PORT \
$JOB_URL/forward-port?port=REMOTE_PORT \
Your application can now connect to localhost:4322
but all traffic
will be forwarded to port 8000 on the login node.
See
python3 -m pyunicore.forwarder --help
for all options.
License
BSD 3-Clause License
Copyright (c) Human Brain Project, Forschungszentrum Juelich GmbH
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the names of the copyright holders nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.