#!/usr/bin/env python # coding: utf-8 # # PYROSETTA.DISTRIBUTED - RosettaScripts/Python Interface Integration # ## Integration Components # The python software ecosystem relies on a small set of shared core interfaces utilizing primitive language-native data structures, pure function invocation, and object serialization to provide loosely coupled interoperability between independent software components. Our component, the `pyrosetta.distributed` namespace, utilizes established elements of the Rosetta internal architecture: the Pose model & score representation, RosettaScript protocols, and Pose serialization. # # The adoption of a small set of core interfaces supports integration with an array of scientific computing tools, including support for interactive development environments, common record-oriented data formats, statistical analysis and machine learning packages, and multiple distributed computing packages. The pyrosetta.distributed package provides example integrations with several preferred packages for data analysis (Pandas), distributed computing (Dask), and interactive development (Jupyter Notebook), but is loosely coupled to allow later integration with additional libraries. # In[1]: import pyrosetta.distributed # Distributed components perform default initialization on-demand, but # can be request custom initialization via pyrosetta.distributed.maybe_init() # # Data Structures (pyrosetta.distributed.packed_pose) # “Primitive” datatypes form a primary interface between many python libraries and, though not strictly defined, typically include the built-in scalar types (string, int, bool, float, ...), key-value dicts, and lists. Libraries operating on more complex user-defined classes often expose routines interconverting to and from primitive datatypes, and primitive datatypes can be efficiently serialized in multiple formats. # For interaction between Rosetta protocol components and external libraries, we developed the `pyrosetta.distributed.packed_pose` namespace. This implements an isomorphism between the Pose object and dict-like records of the molecular model and scores. The Pose class represents a mutable, full-featured molecular model with non-trivial memory footprint. A Pose may be inexpensively interconverted to a compact binary encoding via recently developed cereal-based serialization in the suite. This serialized format is used to implement the `PackedPose` class, an immutable record containing model scores and the encoded model, which is isomorphic to a dict-based record. Adaptor functions within the packed_pose namespace freely adapt between collections of Pose (`packed_pose.to_pose`), PackedPose (`packed_pose.to_packed`), dict-records (`packed_pose.to_dict`) and pandas.DataFrame objects. (Fig 2.A) # In[2]: import pyrosetta.distributed.packed_pose as packed_pose import pyrosetta.distributed.io as io import requests import pandas ubq = io.pose_from_pdbstring(requests.get("https://files.rcsb.org/download/1UBQ.pdb").text) # Packed pose structures interconvert between multiple datatypes. display(ubq) display(packed_pose.to_pose(ubq)) display(packed_pose.to_dict(ubq).keys()) # A dict-record and DataFrame interface provides zero-friction integration with a wide variety of data analysis tools and storage formats. For example, the record-oriented format can be passed through statsmodels or scikit-learn based filtering and analysis and written to any json-encoded text file, avro record-oriented storage, or parquet column-oriented storage. The pyrosetta.distributed.io namespace implements functions that mirror the pyrosetta.io namespace, providing conversion between PackedPose and the PDB, MMCIF & Rosetta silent-file formats. # Critically, the PackedPose record format can also be transparently serialized, stored with a minimal memory footprint, and transmitted between processes in a distributed computing context. This allows a distributed system to process PackedPose records as plain data, storing and transmitting a large number of model decoys while only unpacking a small working set into heavyweight Pose objects. # In[3]: # Collections of packed pose structures interconvert to pandas DataFrame. frame_poses = pandas.DataFrame.from_records([packed_pose.to_dict(ubq) for _ in range(5)]) display(frame_poses) # In[4]: packed_poses = packed_pose.to_packed(frame_poses) display(packed_poses) # # Protocol Components (pyrosetta.distributed.tasks) # # RosettaScripts uses an XML-based DSL to tersely encode molecular modeling protocols with a pipeline-like dataflow. The rosetta_scripts interpreter functions by parsing, XSD-validating and initializing a single RosettaScripts protocol. It then applies this protocol to input structures repeatedly to produce simulation output. Recent work has expanded support for more complex dataflow, including multi-stage operations and additional logic; however, RosettaScripts is not intended to be a general purpose programming language. # # The pyrosetta.distributed.tasks namespace encapsulates the RosettaScripts interface, allowing the DSL to be utilized within python processes. Protocol components are represented as ‘task’ objects containing an XML encoded script. Task objects are serializable via the standard pickle interface, and they use a simple caching strategy to perform on-demand initialization of the underlying protocol object as needed for task application. # In[5]: import pyrosetta.distributed.tasks.score as score import pyrosetta.distributed.tasks.rosetta_scripts as rosetta_scripts # A blank RosettaScripts task blank_task = rosetta_scripts.SingleoutputRosettaScriptsTask(""" """) display(blank_task) # A simple scoring task score_task = score.ScorePoseTask() display(score_task) # The results of filters and scores are available as the PackedPose "scores" scored_ubq = score_task(ubq) display(scored_ubq.scores) # Task components accept any valid pose-equivalent data structure and return immutable PackedPose data structures by (1) deserializing the input into a short-lived Pose object, (2) applying the parsed protocol to the Pose and (3) serializing the resulting model as a PackedPose. Two task classes, SingleOutputRosettaScriptsTask and MultipleOutputRosettaScriptsTask define either a one-to-one function returning a single output, or a one-to-many protocol component returning a lazy iterator of outputs. All tasks operate as “pure functions”, returning a modified copy rather than directly manipulating input data structures. (Fig 2.B) # In[6]: relax_task = rosetta_scripts.SingleoutputRosettaScriptsTask(""" """) # Protocol execution does not change the input pose. # A modified copy is returned. relaxed_ubq = relax_task(scored_ubq) print(f"relaxed score: {relaxed_ubq.scores['total_score']}") print(f"delta score: {relaxed_ubq.scores['total_score'] - scored_ubq.scores['total_score']}") # # Interactive Analysis and Notebook-based Computing # Notebook-based interactive analysis, typified by the Jupyter project,18 has become a dominant tool in modern data science software development. In this model, data, code, output, and visualization are combined in a single document which is viewed and edited through a browser-based interface to a remote execution environment. # # To facilitate interactive analysis, we extended the PyRosetta Pose interface to expose total, residue one-body, and residue-pair two-body terms of the Rosetta score function as NumPy structured arrays. Combined with the pandas.DataFrame representation offered in pyrosetta.distributed.packed_pose, this provides an expressive interface for interactive model analysis and selection. # In[7]: # Pose energies are available under the energies *_energies_array accessor functions. source_energies = scored_ubq.pose.energies() relaxed_energies = relaxed_ubq.pose.energies() display(relaxed_energies.residue_onebody_energies_array().dtype) source_frame = pandas.DataFrame.from_records(source_energies.residue_total_energies_array()) relaxed_frame = pandas.DataFrame.from_records(relaxed_energies.residue_total_energies_array()) delta = relaxed_frame - source_frame delta.index.name="residue index" delta[["total_score"]].plot(title="Delta score via relax.") # We also integrated existing documentation into the pyrosetta.distributed.docs namespace to allow introspection-based exploration of Mover and Filter # In[8]: import pyrosetta.distributed.docs as docs display(dir(docs.filters)[15:20]) display(docs.filters.ChainBreak) # RosettaScripts components. Existing tools for web-based biomolecular visualization, such as `py3dmol` and `NGLview` extend this interface to a fully-featured biomolecular simulation, analysis, and visualization environment. (Fig 5) # In[9]: import py3Dmol view = py3Dmol.view(linked=False, width=600, height=600) view.addModel( io.to_pdbstring(relaxed_ubq), "pdb") view.setStyle({'stick':{}}) view.addStyle({'cartoon':{}}) view.zoomTo() # # Multithreaded and Distributed Execution # Remote notebook execution has the distinct advantage of allowing a user to access computational resources far beyond the capabilities of a single workstation. By using tools such as Dask via the integrations described above, a remote notebook interface can be used to manage a distributed simulation spanning hundreds of cores for rapid model analysis, and it offers a viable alternative to traditional batch-based computing for some classes of simulation. # In[10]: import dask import dask.distributed # Establish a single-node cluster of worker processes. # See dask.distributed documentation for multi-node cluster tools. cluster = dask.distributed.Client(dask.distributed.LocalCluster()) print(cluster) # Rosetta-based simulations frequently involve execution of a large number of independent monte-carlo sampling trajectories that all begin from a single starting structure; in other words, they are “embarrassingly” or “trivially” parallel. The Rosetta suite implements a job distribution framework to manage I/O and task scheduling for parallelizable workloads of this type; this allows the rosetta_scripts interpreter to operate as a single process or within MPI, BOINC, and other distributed computing frameworks. Semantics of the RosettaScripts language have also evolved to incorporate non-trivial forms of parallelism, including support for multi-stage scatter/gather protocols. Though fully functional, this framework is optimized for operation as a standalone application and does not provide straightforward integration with third party tools or generalized program logic. # # The combination of immutable data structures and pure function interfaces implemented in the pyrosetta.distributed namespace provides an alternative approach to job parallelization by integrating RosettaScripts as a submodule that is compatible with dask.distributed and other task-based distributed computing frameworks. By virtue of reliance on standard python primitives, the `pyrosetta.distributed` namespace is not tightly coupled to a single execution engine. Single-node scheduling may be managed via the standard `multiprocessing` or `concurrent.futures` interfaces, providing a zero-dependency solution for small-scale sampling or analysis tasks. Execution via MPI-based HPC deployments may be managed via the `mpi4py` interface. # # To support effective distributed execution, the pyrosetta.distributed namespace is intended to be installed via a build configuration of PyRosetta, provided by conda packages described above, supporting multithreaded execution. This variant utilizes existing work establishing thread-safety in the suite, and it releases the CPython global interpreter lock when calling compiled Rosetta interfaces. This enables multi-core concurrent execution of independent modeling trajectories via python-managed threads, as well as python-level operations such as network I/O and process heartbeats to occur concurrently with long-running Rosetta API calls. # In[11]: # A "delayed" task is distributed on the worker clusters delayed_relax = dask.delayed(rosetta_scripts.SingleoutputRosettaScriptsTask(""" """)) relax_tasks = [delayed_relax(ubq) for _ in range(64)] display(relax_tasks[:3]) # In[12]: # Persist, beginning computation on the distributed cluster. relax_tasks, = dask.persist(relax_tasks) # In[13]: # Multi-threaded worker processes begin a distributed relax. get_ipython().system('top -bn1 | head -n 20') # In[14]: # Compute, pulling results from workers when completed. relax_results, = dask.compute(relax_tasks) # In[15]: relax_result_frame = pandas.DataFrame.from_records(packed_pose.to_dict(relax_results)) display(relax_result_frame) display(relax_result_frame.describe())