#!/usr/bin/env python # coding: utf-8 # # Parsl: Advanced Features # # In this tutorial we present advanced features of Parsl including its ability to support multiple sites, elastically scale across sites, and its support for fault tolerance. # # ## 1) Multiple Sites # # In the "parsl-introduction" notebook we showed how a configuration file controls the execution provider and model used to execute a Parsl script. While we showed only a single site, Parsl is capable of distributing workload over several sites simultaneously. Below we show an example configuration that combines local thread execution and local pilot job execution. By default, Apps will execute on any configured sites. However, you can also specify a specific site, or sites, on which an App can execute by adding a list of sites to the App decorator. In the following cells, we show a three-stage workflow in which the first app uses local threads, the second uses local pilot jobs, and the third (with no sites specified) will use either threads or pilot jobs. # First, we define two "sites", which in this example are both local. The first uses threads, and the second uses pilot job execution. We then instantiate a DataFlowKernel object with these two sites. # In[ ]: from parsl.config import Config from parsl.executors.threads import ThreadPoolExecutor from parsl.executors import HighThroughputExecutor from parsl.providers import LocalProvider from parsl.channels import LocalChannel # Define a configuration for using local threads and pilot jobs multi_site_config = Config( executors=[ ThreadPoolExecutor( max_threads=8, label='local_threads' ), HighThroughputExecutor( label="local_htex", worker_debug=True, max_workers=1, provider=LocalProvider( channel=LocalChannel(), init_blocks=1, max_blocks=1, ), ) ] ) # Next, we define three Apps, which have the same functionality as in the previous tutorial. However, the first is specified to use the first site only, the second is specific to use the second site only, and the third doesn't have a site specification, so it can run on any available site. # In[ ]: import parsl from parsl.app.app import python_app, bash_app from parsl.data_provider.files import File parsl.load(multi_site_config) # Generate app runs using the "local_threads" executor @bash_app(executors=["local_threads"]) def generate(outputs=[]): return "echo $(( RANDOM )) &> {}".format(outputs[0].filepath) # Concat app runs using the "local_htex" executor @bash_app(executors=["local_htex"]) def concat(inputs=[], outputs=[]): return "cat {0} > {1}".format(" ".join(i.filepath for i in inputs), outputs[0].filepath) # Total app runs using either executor @python_app def total(inputs=[]): total = 0 with open(inputs[0], 'r') as f: for l in f: total += int(l) return total # Finally, we run the apps, and cleanup. # In[ ]: # Create 5 files with random numbers output_files = [] for i in range (5): output_files.append(generate(outputs=[File('random-%s.txt' % i)])) # Concatenate the files into a single file cc = concat(inputs=[i.outputs[0] for i in output_files], outputs=[File("all.txt")]) # Calculate the sum of the random numbers result = total(inputs=[cc.outputs[0]]) print (result.result()) parsl.clear() # ## 2) Elasticity # # As a Parsl script is evaluated, it creates a collection of tasks for asynchronous execution. In most cases this stream of tasks is variable as different stages of the workflow are evaluated. To address this variability, Parsl is able to monitor the flow of tasks and elastically provision resources, within user specified bounds, in response. # # In the following example, we declare the range of blocks to be provisioned from 0 to 2 (minBlocks and maxBlocks, respectively). We then set parallelism to 0.1, which means that Parsl will favor reusing resources rather than provisioning new resources. You should see that the app is executed on one process IDs. Note: we restrict Parsl to using one worker per block (max_workers=1). # In[ ]: import parsl from parsl.app.app import python_app, bash_app from parsl.providers import LocalProvider from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor local_htex = Config( executors=[ HighThroughputExecutor( label="local_htex", max_workers=1, provider=LocalProvider( channel=LocalChannel(), init_blocks=1, max_blocks=2, parallelism=0.1, ) ) ] ) parsl.load(local_htex) @python_app def py_hello(): import time import os time.sleep(5) return "(%s) Hello World!" % os.getpid() results = {} for i in range(0, 10): results[i] = py_hello() print("Waiting for results ....") for i in range(0, 10): print(results[i].result()) parsl.clear() # We now modify the parallelism option to 1. This configuration means that Parsl will favor elastic growth to execute as many tasks simultaneously as possible, up to the user defined limit of workers and blocks. You can modify the max_blocks and parallelism between 0 and 1 to experiment with different scaling policies. # In[ ]: import parsl from parsl.app.app import python_app, bash_app from parsl.providers import LocalProvider from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor local_htex = Config( executors=[ HighThroughputExecutor( label="local_htex", max_workers=1, provider=LocalProvider( channel=LocalChannel(), init_blocks=1, max_blocks=2, parallelism=1, ) ) ] ) parsl.load(local_htex) @python_app def py_hello(): import time import os time.sleep(5) return "(%s) Hello World!" % os.getpid() results = {} for i in range(0, 10): results[i] = py_hello() print("Waiting for results ....") for i in range(0, 10): print(results[i].result()) parsl.clear() # ## 3) Fault tolerance and caching # # Workflows are often re-executed for various reasons, including workflow or node failure, code errors, or extension of the workflow. It is inefficient to re-execute apps that have succesfully completed. Parsl provides two mechanisms to improve efficacy via app caching and/or workflow-level checkpointing. # # ### App Caching # # When developing a workflow, developers often re-execute the same workflow with incremental changes. Often large fragments of the workflow are re-executed even though they have not been modified. This wastes not only time but also computational resources. App Caching solves this problem by caching results from apps that have completed so that they can be re-used. Caching is enabled by setting the `cache` argument to the App wrapper. Note: the cached result is returned only when the same function, with the same name, input arguments, and function body is called. If any of these are changed, a new result is computed and returned. # # The following example shows two calls to the `slow_message` app with the same message. You will see that the first call is slow (since the app sleeps for 5 seconds), but the second call returns immedidately (the app is not actually executed this time, so there is no sleep delay). # # Note: running this example in Jupyter notebooks will cache the results through subsequent executions of the cell. # In[ ]: import parsl from parsl.app.app import python_app, bash_app from parsl.providers import LocalProvider from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor local_htex = Config( executors=[ HighThroughputExecutor( label="htex_Local", worker_debug=True, max_workers=1, provider=LocalProvider( channel=LocalChannel(), init_blocks=1, max_blocks=1, ) ) ] ) parsl.load(local_htex) @python_app(cache = True) def slow_message(message): import time time.sleep(5) return message # First call to slow_message will calcuate the value first = slow_message("Hello World") print ("First: %s" % first.result()) # Second call to slow_message with the same args will # return immediately second = slow_message("Hello World") print ("Second: %s" % second.result()) # Third call to slow_message with different arguments # will take some time to calculate values third = slow_message("Hello World!") print ("Third: %s" % third.result()) parsl.clear() # ### Checkpointing # # Parsl's checkpointing model enables workflow state to be saved and then used at a later time to resume execution from that point. Checkpointing provides workflow-level fault tolerance, insuring against failure of the Parsl control process. # # Parsl implements an incremental checkpointing model: each explicit checkpoint will save state changes from the previous checkpoint. Thus, the full history of a workflow may be distributed across multiple checkpoints. # # Checkpointing uses App caching to store results. Thus, the same caveats apply to non-deterministic functions. That is, the checkpoint saves results for an instance of an App when it has the same name, arguments, and function body. # # In this example we demonstrate how to automatically checkpoint workflows when tasks succesfully execute. This is enabled in the config by setting `checkpointMode` to `task_exit`. Other checkpointing models are described in the [checkpointing documentation](https://parsl.readthedocs.io/en/latest/userguide/checkpoints.html). # # In[ ]: import parsl from parsl.app.app import python_app, bash_app from parsl.providers import LocalProvider from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor local_htex = Config( executors=[ HighThroughputExecutor( label="local_htex", max_workers=1, provider=LocalProvider( channel=LocalChannel(), init_blocks=1, max_blocks=1, ) ) ], checkpoint_mode='task_exit', ) dfk = parsl.load(local_htex) @python_app(cache=True) def slow_double(x): import time time.sleep(2) return x * 2 d = [] for i in range(5): d.append(slow_double(i)) # wait for results print([d[i].result() for i in range(5)]) parsl.clear() # To restart from a previous checkpoint the DFK must be configured with the appropriate checkpoint file. In most cases this is likley to be the most recent checkpoint file created. The following approach works with any checkpoint file, irrespective of which checkpointing method was used to create it. # # In this example we reload the most recent checkpoint and attempt to run the same workflow. The results return immediately as there is no need to rexecute each app. # In[ ]: from parsl.utils import get_all_checkpoints from parsl import set_stream_logger local_htex = Config( executors=[ HighThroughputExecutor( label="local_htex", max_workers=1, provider=LocalProvider( channel=LocalChannel(), init_blocks=1, max_blocks=1, ) ) ], checkpoint_files = get_all_checkpoints(), ) parsl.load(local_htex) # Rerun the same workflow d = [] for i in range(5): d.append(slow_double(i)) # wait for results print([d[i].result() for i in range(5)]) parsl.clear() # ## 4) Globus data management # # Parsl uses Globus for wide area data movement. The following example shows how you can execute an app by passing in an input Globus-accesible file and transferring the output file to a Globus endpoint. # # Note: to run this example you will need to run in a location with a Globus endpoint and make that endpoint known to the configuration. E.g., for BlueWaters you will need to include the following configuration: # # ``` # local_endpoint = 'd59900ef-6d04-11e5-ba46-22000b92c6ec' # # storage_access=[GlobusScheme( # endpoint_uuid=local_endpoint, # endpoint_path="/", # local_path="/" # )] # ``` # # # Make sure to activate the destination endpoint before running this example. You can activate the endpoint on the Globus website or via the Globus Python SDK. # # The example is set to upload the sorted file to the Globus tutorial endpoint. You can send it to another location by updating the following code: # # ``` # dest_endpoint = 'ddb59aef-6d04-11e5-ba46-22000b92c6ec' # ``` # # You can view the destination endpoint in [Globus](https://app.globus.org/file-manager?origin_id=ddb59aef-6d04-11e5-ba46-22000b92c6ec&origin_path=%2F~%2F). # In[ ]: import parsl from parsl.app.app import python_app, bash_app from parsl.data_provider.files import File from parsl.config import Config from parsl.data_provider.globus import GlobusStaging from parsl.data_provider.data_manager import default_staging from parsl.executors.threads import ThreadPoolExecutor import os # Local endpoint used to transfer Globus file in for processing local_endpoint = '' # Insert your endpoint ID here dest_endpoint = 'ddb59aef-6d04-11e5-ba46-22000b92c6ec' # Destination for sorted file (here Globus Tutorial Endopoint 1) if not local_endpoint: print("You must specify a local endpoint") else: config = Config( executors=[ ThreadPoolExecutor( label='local_threads_globus', working_dir=os.getcwd(), # Update to your working directory storage_access=default_staging + [GlobusStaging( endpoint_uuid=local_endpoint, endpoint_path='/', local_path='/' )], ) ], ) parsl.load(config) # In[ ]: @python_app def sort_strings(inputs=[], outputs=[]): with open(inputs[0], 'r') as u: strs = u.readlines() strs.sort() with open(outputs[0].filepath, 'w') as s: for e in strs: s.write(e) # Remote unsorted file stored on the Globus-accesible Eagle data service at ALCF unsorted_globus_file = File('globus://6a3b7b4d-d874-417a-9fc0-6463258987e9/unsorted.txt') # Location to send the sorted file after processing sorted_globus_file = File('globus://%s/~/sorted.txt' % dest_endpoint) f = sort_strings(inputs=[unsorted_globus_file], outputs=[sorted_globus_file]) print (f.result()) # In[ ]: parsl.clear() # ## 5) Monitoring # # Parsl can be configured to capture fine grain monitoring information about workflows and resource usage. # # To enable monitoring you must install Parsl with the monitoring module and add the monitoring hub to the configuration. # # $ pip install parsl[monitoring] # # Note: in this example we set the resource monitoring interval to 3 seconds so that we can capture resource information from short running tasks. In practice you will likely use a longer interval. # In[ ]: import parsl from parsl import python_app from parsl.monitoring.monitoring import MonitoringHub from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.providers import LocalProvider from parsl.addresses import address_by_hostname from parsl.channels import LocalChannel import logging config = Config( executors=[ HighThroughputExecutor( label="local_htex", address=address_by_hostname(), max_workers=1, provider=LocalProvider( channel=LocalChannel(), init_blocks=1, max_blocks=1, ), ) ], monitoring=MonitoringHub( hub_address=address_by_hostname(), hub_port=6553, resource_monitoring_interval=1, ) ) parsl.load(config) # Run a simple workflow @python_app(cache=True) def slow_double(x): import time import random time.sleep(random.randint(1,15)) return x * 2 d = [] for i in range(5): d.append(slow_double(i)) # wait for results print([d[i].result() for i in range(5)]) parsl.clear() # ### Accessing monitoring data # # Parsl includes a workflow monitoring system and web-based visualization interface that can be used to monitor workflows during or after execution. The monitoring system stores information in a database and the web interface provides workflow-, task-, and resource-level views. To view the web interface run the parsl-visualize command. # # As all monitoring information is stored in a local SQLite database, you can connect to this database directly from your notebook to view and visualize workflow status. In the example below we show how to load the information into a Pandas dataframe. # # First connect to the database. By default the database is named 'monitoring.db'; however, that may be changed in the monitoring configuration. # In[ ]: import sqlite3 import pandas as pd conn = sqlite3.connect('monitoring.db') # The workflow table contains high level workflow information such as the name of the workflow, when it was run, by whom, as well as statistics about the state of tasks (e.g., completed or failed). # In[ ]: df_workflow = pd.read_sql_query('SELECT * from workflow', conn) df_workflow # The tasl table contains information about the invidual tasks in a worklow such as the input and output files, start and completion time, and where that task was executed. # In[ ]: run_id = df_workflow['run_id'].iloc[-1] df_task = pd.read_sql_query('SELECT * from task where run_id="%s"' % run_id, conn) df_task = df_task.sort_values(by=['task_id'], ascending=False) df_task # In[ ]: df_status = pd.read_sql_query("SELECT * FROM status WHERE run_id='%s'" % run_id, conn) df_status # The resource table contains information about resources used on the node. Resource information is sampled as defined by the `resource_monitoring_interval` in the configuration. All resource information is captured using [psutil](https://psutil.readthedocs.io/en/latest/). # In[ ]: df_resource = pd.read_sql_query('SELECT * from resource where run_id="%s"' % run_id, conn) df_resource # ### Visualizing monitoring data # # The monitoring data can be visualized directly in the Jupyter notebook using many of the Python graphing libraries. The following examples use Plotly. # # First we show a gantt chart of task execution times. It shows when a task was submitted, when it was executed, and when it completed. # In[ ]: import plotly.figure_factory as ff import plotly.graph_objs as go parsl_tasks = [] time_completed = df_status['timestamp'].max() for i, task in df_task.iterrows(): task_id = task['task_id'] description = "Task ID: {}, app: {}".format(task['task_id'], task['task_func_name']) statuses = df_status.loc[df_status['task_id'] == task_id].sort_values(by=['timestamp']) last_status = None for j, status in statuses.iterrows(): if last_status is not None: last_status_bar = {'Task': description, 'Start': last_status['timestamp'], 'Finish': status['timestamp'], 'Resource': last_status['task_status_name'] } parsl_tasks.extend([last_status_bar]) last_status = status if last_status is not None: last_status_bar = {'Task': description, 'Start': last_status['timestamp'], 'Finish': time_completed, 'Resource': last_status['task_status_name'] } parsl_tasks.extend([last_status_bar]) colors = {'pending': 'rgb(168, 168, 168)', 'launched': 'rgb(100, 255, 255)', 'running': 'rgb(0, 0, 255)', 'exec_done': 'rgb(0, 200, 0)', 'done': 'rgb(0, 200, 0)', 'failed': 'rgb(255, 100, 100)', } fig = ff.create_gantt(parsl_tasks, title="", colors=colors, group_tasks=True, show_colorbar=True, index_col='Resource', ) fig['layout']['yaxis']['title'] = 'Task' fig['layout']['yaxis']['showticklabels'] = False fig['layout']['xaxis']['title'] = 'Time' fig.show() # Here we show side-by-side plots of CPU and memory utilization over time. # In[ ]: from plotly.subplots import make_subplots import plotly.graph_objects as go fig = make_subplots(rows=1, cols=2, subplot_titles=("CPU", "Memory")) fig.add_trace( go.Scatter( x=df_resource['timestamp'], y=df_resource['psutil_process_cpu_percent']), row=1, col=1 ) fig.add_trace( go.Scatter( x=df_resource['timestamp'], y=df_resource['psutil_process_memory_percent']), row=1, col=2 ) xaxis = dict(tickformat='%m-%d\n%H:%M:%S', autorange=True, title='Time') yaxis = dict(title="Utilization (X)") fig.update_layout(height=600, width=800, xaxis=xaxis, showlegend=False) fig.update_yaxes(range=[0, 100]) fig.show() # In[ ]: conn.close() # ## 6) Debugging # # Debugging parallel dataflows is often more complicated than with serial programs. The easiest way to debug a Parsl application is via Python loggers. The following example shows how to enable logging. In this case we log the debug stream to the console. You can also log to a file using the `set_file_logger` function. # # When trying to debug individual apps it is often best to first capture stdout and stderr to files. These files will capture any text output by the app which may indicate app behavior. # In[ ]: import parsl from parsl.app.app import python_app, bash_app from parsl.configs.local_threads import config from parsl import set_stream_logger, NullHandler # set the stream logger to print debug messages set_stream_logger() parsl.load(config) @python_app def hello (): return 'Hello World!' print(hello().result()) parsl.clear()