Note that Parsl is not effective if multiple CPU cores aren't available because Parsl's ability to execute tasks in parallel is depenedent on the availability multiple cores.
import multiprocessing
print('Cores available: {}'.format(multiprocessing.cpu_count()))
Cores available: 4
We'll be using the htex configuration for Parsl. Read more here.
import numpy as np
import parsl
import os
from parsl.app.app import python_app, bash_app
from parsl.providers import LocalProvider
from parsl.channels import LocalChannel
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
config = Config(
executors=[
HighThroughputExecutor(
label="htex_local",
cores_per_worker=1,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=1,
),
)
],
)
parsl.load(config)
<parsl.dataflow.dflow.DataFlowKernel at 0x10c51edd0>
The block method of matrix multiplication between two matrices A and B of sizes sXq and rXs is as follows:
We carry out the operation above for two matrices A and B.
matrix_a = [[2,3,4,5],[4,5,6,7],[4,5,6,7]]
matrix_b = [[3,4],[4,5],[6,7],[8,9]]
'''
A python app for multiplying corresponding elements
'''
@python_app
def multiply(inputs=[]):
return inputs[0]*inputs[1]
@python_app
def sum_elements(inputs=[]):
return sum(inputs)
'''
A Function for two matrices that uses the block method to multiply them.
'''
def multiply_matrices(A,B):
C = [[0 for _ in range(len(B[0]))] for _ in range(len(A))] # Result matrix
for q in range(len(A)):
for r in range(len(B[0])):
s = len(A[0])
elements = []
for i in range(s):
elements.append(multiply(inputs=[A[q][i],B[i][r]])) # Multiplying elements
elements = [i.result() for i in elements] # evaluation for each row
C[q][r] = sum_elements(inputs=elements)
C = [[element.result() for element in row] for row in C]
return C
'''
Example implementation of the two given matrices
'''
multiply_matrices(matrix_a, matrix_b)
[[82, 96], [124, 146], [124, 146]]
We build a simple implementation of markov chains using Parsl. A Markov Chain is a chain of samples obtained using a transition matrix. Each sample is only dependent on the sample before it. We'll run 5 different chains for different amount of steps.
This will use the Parsl code for the block multiplication of matrices.
def multiply_matrices_for_markov(A,B):
'''
This function takes two matrices A and B, and uses the matrix multiplication in Parsl to compute C.
Here, C = AxB
'''
C = [[0 for _ in range(len(B[0]))] for _ in range(len(A))] # Result matrix
for q in range(len(A)):
for r in range(len(B[0])):
s = len(A[0])
elements = []
for i in range(s):
elements.append(multiply(inputs=[A[q][i],B[i][r]]))
# Multiplying elements
elements = [i.result() for i in elements]
# evaluation for each row
C[q][r] = sum_elements(inputs=elements)
# Implementing each element as sum of the multiplied elements
return C
transition_matrix = np.array([[0.5,0.1,0.3],[0.25,0.1,0.1],[0.25,0.8,0.6]])
'''
This is the transition probability kernel that maps
the probability for going from one state to the other
'''
initial_state = np.array([[5],[6],[1]])
matrices = []
for num_steps in [50,100,200,300,400,500]: # For different burn-in points
new_matrix = initial_state
for i in range(num_steps):
'''
Evaluating the matrices sequentially
'''
new_matrix = multiply_matrices_for_markov(transition_matrix,new_matrix)
matrices.append(new_matrix)
# Collecting the final matrix at the end of the markov process
final_results = []
for matrix in matrices:
matrix = [[element.result() for element in row] for row in matrix]
# Evaluating the final results
final_results.append(matrix)
final_results # Six different final states
[[[4.048192771084336], [1.8072289156626502], [6.14457831325301]], [[4.048192771084336], [1.8072289156626502], [6.14457831325301]], [[4.048192771084336], [1.8072289156626502], [6.14457831325301]], [[4.048192771084336], [1.8072289156626502], [6.14457831325301]], [[4.048192771084336], [1.8072289156626502], [6.14457831325301]], [[4.048192771084336], [1.8072289156626502], [6.14457831325301]]]
The Mandelbrot Fractal Set is a figure made by the repetition of the same pattern at different scales. Each element is a replica of the same function plotted at a smaller scale. Since the evaluation of the function and conversion to the coordinates can be done in parallel, we can use Parsl to execute them in parallel threads.
Code adapted from: https://www.geeksforgeeks.org/mandelbrot-fractal-set-visualization-in-python/
Let's run the Mandelbrot Fractal Set for a single pixel first:
@python_app
def mandelbrot(x, y):
from numpy import complex, array
import colorsys
'''
Internal function for converstion to the RGB type
tuple that can be added to the image.
'''
def rgb_conversion(i):
color = 255 * array(colorsys.hsv_to_rgb(i / 255.0, 1.0, 0.5))
return tuple(color.astype(int))
c0 = complex(x, y)
c = 0
for i in range(1, 1000):
if abs(c) > 2:
return rgb_conversion(i)
c = c * c + c0
return (0, 0, 0)
def mandelbrot_non_parallel(x,y):
from numpy import complex, array
import colorsys
'''
Internal function for converstion to the RGB type
tuple that can be added to the image
'''
def rgb_conversion(i):
color = 255 * array(colorsys.hsv_to_rgb(i / 255.0, 1.0, 0.5))
return tuple(color.astype(int))
c0 = complex(x, y)
c = 0
for i in range(1, 1000):
if abs(c) > 2:
return rgb_conversion(i)
c = c * c + c0
return (0, 0, 0)
# Evaluation for a single pixel.
mandelbrot(40,10).result()
(127, 6, 0)
Let us now execute the code using the Parsl function for an entire image
from PIL import Image
from numpy import complex, array
import colorsys
import time
import matplotlib.pyplot as plt
start = time.time()
# Setting up the image
WIDTH = 256
img = Image.new('RGB', (WIDTH, int(WIDTH / 2)))
pixels_raw = [[0 for _ in range(img.size[1])] for _ in range(img.size[0])]
for x in range(img.size[0]):
for y in range(img.size[1]):
'''
Looping through the image width and height to update the
raw pixels list with the output of the function
'''
a = (x - (0.75 * WIDTH)) / (WIDTH / 4)
b = (y - (WIDTH / 4)) / (WIDTH / 4)
pixels_raw[x][y] = mandelbrot(a, b)
pixels_raw = [[i.result() for i in x] for x in pixels_raw]
# Projecting the raw pixels onto the image
pixels = img.load()
for x in range(img.size[0]):
for y in range(img.size[1]):
pixels[x,y] = pixels_raw[x][y]
end = time.time()
import time
start2 = time.time()
# Setting up the image
WIDTH = 256
img = Image.new('RGB', (WIDTH, int(WIDTH / 2)))
pixels_raw = [[0 for _ in range(img.size[1])] for _ in range(img.size[0])]
for x in range(img.size[0]):
for y in range(img.size[1]):
# Looping through the image width and height to update the
# raw pixels list with the output of the function
a = (x - (0.75 * WIDTH)) / (WIDTH / 4)
b = (y - (WIDTH / 4)) / (WIDTH / 4)
pixels_raw[x][y] = mandelbrot_non_parallel(a, b)
# Projecting the raw pixels onto the image
pixels = img.load()
for x in range(img.size[0]):
for y in range(img.size[1]):
pixels[x,y] = pixels_raw[x][y]
end2 = time.time()
print('Time taken with Parsl: {}'.format(end-start))
print('Time taken with Parsl: {}'.format(end2-start2))
# Plotting the image of the projects of all the function evaluations
plt.figure()
plt.imshow(img)
plt.show()