import json
import os
import time
from datafed.CommandLib import API
Instantiate the DataFed API and set context
to the Training project:
df_api = API()
df_api.setContext('p/trn001')
Enter your username to work within your personal Collection.
parent_collection = # your username here
Copy over the ID for the record you created in the previous notebook.
record_id = ?
Shortly, we will learn how to upload data to the record we just created. For demonstration purposes, we will just create a simple text file and use this as the raw data for the Data Record
DataFed does not impose any restrictions on the file extension / format for the raw data
with open('./raw_data.txt', mode='w') as file_handle:
file_handle.write('This is some data')
Always ensure that your Globus endpoint is active and that your files are located in a directory that is visible to the Globus Endpoint
dataPut()
¶Uploading data files to DataFed is done using the dataPut
command.
put_resp = df_api.dataPut(record_id,
'./raw_data.txt',
wait=True, # Waits until transfer completes.
)
print(put_resp)
We get two components in the response:
task
- more on this laterThe dataPut()
method initiates a Globus transfer on our behalf from the machine wherever the file was present to wherever the default data repository is located. In this case, the file was in our local file system and on the same machine where we are executing the command.
The above data file was specified by its relative local path, so DataFed used our pre-configured default Globus endpoint to find the data file. As long as we have the id for any active Globus endpoint that we have authenticated access to, we can transfer data from that endpoint with its full absolute file path – even if the file system is not attached ot the local machine. Look for more information on this in later examples.
Let's view the data record now that we've uploaded our data. Pay attention to the ext
and source
fields which should now populated:
dv_resp = df_api.dataView(record_id)
print(dv_resp)
DataFed is also capable of getting data stored in a DataFed repository and placing it in the local or other Globus-visible filesystem via the dataGet()
function.
Let us download the content in the data record we have been working on so far for demonstration purposes
The current version of DataFed has a bug where
dataGet()
only accepts a list of Data Record or Collection IDs. Until the next version, users are recommended to put their singular ID into a list fordataGet().
get_resp = df_api.dataGet([record_id], # currently only accepts a list of IDs / aliases
'.', # directory where data should be downloaded
orig_fname=False, # do not name file by its original name
wait=True, # Wait until Globus transfer completes
)
print(get_resp)
In the response we only get back information about the data transfer task
- more on this shortly
The bug in dataGet()
also reveals its capability to download multiple data records or even Collections.
Let's confirm that the data has been downloaded successfully:
os.listdir('.')
expected_file_name = os.path.join('.', record_id.split('d/')[-1]) + '.json'
print('Does a file with this name: ' + expected_file_name + ' exist?')
print(os.path.exists(expected_file_name))
A DataFed task may itself contain / be responsible for several Globus file transfers, potentially from / to multiple locations.
DataFed makes it possible to check on the status of transfer tasks in an easy and programmatic manner.
Before we learn more about tasks, first lets try to get the id
of the task in get_resp
from the recent dataGet()
function call:
task_id = get_resp[0].task[0].id
print(task_id)
We can get more information about a given transfer via the taskView()
function:
task_resp = df_api.taskView(task_id)
print(task_resp)
We get a new kind of message - a TaskDataReply
.
Key fields to keep an eye on:
status
msg
source
dest
If we are interested in monitoring tasks, triggering activities or subsequent steps of workflows based on transfers, we would need to know how to get the status
property from the TaskDataReply
:
task_resp[0].task[0].status
Even though the message above says TS_SUCCEEDED
, we see that this task status codes to the integer 3
.
Cheat sheet for interpreting task statuses:
2
: in progress
3
: complete
anything else - problem
A future version of DataFed may change the nature of the output / type for the status property. In general, the exact return object types and nomenclature may evolve with DataFed.
We can request a listing of all our recently initiated tasks:
df_api.taskList()
The output of this listing would be very helpful for the Exercise below
Let's say that we want to run a series of simulations where one or more parameters are varied and each simulation is run with a unique set of parameters. Let's also assume that our eventual goal is to build a surrogate model for the computationally expensive simulation using machine learning. So, we want to capture the metadata and data associated with the series of simulations to train the machine learning model later on.
We have set up skeleton functions and code snippets to help you mimic the data management for such a simulation. We would like you to take what you have learnt so far and fill in the blanks
Here, we have simulated a computationally "expensive" simulation that simply sleeps for a few seconds.
def expensive_simulation():
time.sleep(5)
# Yes, this simulation is deterministic and always results in the same result:
path_to_results = 'sdss#public/uufs/chpc.utah.edu/common/home/sdss/dr10/apogee/spectro/data/55574/55574.md5sum'
# The simulation uses the same combination of parameters
metadata = {'a': 1, 'b': 2, 'c': 3.14}
return path_to_results, metadata
Define a function that:
1. creates a new Data Record with the provided metadata (as a dictionary) and other details,
2. extracts the record id,
3. puts the raw data into the record,
4. extracts and returns the task ID.
Feel free to print any messages that may help you track things.
Pay attention to the
wait
keyword argument when putting the raw data into record
def capture_data(simulation_index, # integer - counter to signify the Nth simulation in the series
metadata, # dictionary - combination of parameters used for this simulation
raw_data_path, # string - Path to the raw data file that needs to be put into the receord
parent_collection=parent_collection, # string - Collection to create this Data Record into
):
# 1. Create a new Data Record with the metadata and use the simulation index to provide a unique title
# 2. Extract the record ID from the response
# 3. Put the raw data into this record:
# 4. Extract the ID for the data transfer task
# 5. Return the task ID
Try out this function to make sure it works. See what it does on the DataFed web portal.
capture_data(14, {'param_1': 456, 'param_2': 'Hello', 'param_3': [1, -2.34, 54]})
We will want a simple function to monitor the status of all the data upload tasks. Define a function that accepts a list of task IDs and returns their status after looking them up on DataFed
def check_xfer_status(task_ids):
# put singular task ID into a list
if isinstance(task_ids, str):
task_ids = [task_ids]
# Create a list to hold the statuses of each of the tasks
# Iterate over each of the task IDs
# For each task ID, get detailed information about it
# Extract the task status from the detailed information
# Append this status to the list of statuses
# Return the list of statuses
Try out your function using the IDs of the recent dataPut()
and dataGet()
functions.
check_xfer_status(...)
Use the three functions defined above to mimic the process of exploring a parameter space using simulations, where for each iteration, we:
1. run a simulation,
2. capture the data + metadata into DataFed,
3. monitor the data upload tasks.
xfer_tasks = list()
for ind in range(3):
print('Starting simulation #{}'.format(ind))
# Run the simulation.
path_to_results, metadata = expensive_simulation()
# Capture the data and metadata into DataFed
task_id = capture_data(ind, metadata, path_to_results)
# Append the task ID for this data upload into xfer_tasks
xfer_tasks.append(task_id)
# Print out the status of the data transfers
print('Transfer status(es): {}'.format(check_xfer_status(xfer_tasks)))
print('')
print('Simulations complete')
time.wait(10)
print('Transfer status(es): {}'.format(check_xfer_status(xfer_tasks)))
What happens if you set the wait
in dataPut()
to True
?
Users are recommended to perform data orchestration (especially large data movement - upload / download) operations outside the scope of heavy / parallel computation operations in order to avoid wasting precious wall time on compute clusters.