Parallel computing is a programming method that harnesses the power of multiple processors (or cores) at once. Once of concern only to programmers of large supercomputers, modern computers now almost always have multi-core processors. However:
At the heart of efficient parallel code is fast serial code!!
using Hwloc
Hwloc.num_physical_cores()
(Note that Sys.CPU_THREADS
may or may not be equal to the number above. It indicates the number of CPUs + Hyperthreads.)
Naive expectation: I have 4 cores, give me my 4x speedup!
If $p$ is the fraction of a code that can be parallelized than the maximal theoretical speedup by parallelizing on $n$ cores is given by $F(n) = 1/(1-p + p/n)$.
using Plots
F(p,n) = 1/(1-p + p/n)
pl = plot()
for p in reverse(sort(vcat(0.2:0.2:1, [0.9, 0.95])))
plot!(pl, n -> F(p,n), 1:16, lab="$(Int(p*100))%", lw=2,
legend=:topleft, xlab="number of cores", ylab="parallel speedup", frame=:box)
end
pl
Julia documentation link: Parallel computing
There are many types of parallelism, some of which are (from micro to macro)
Julia provides (more or less) native support for all of these forms of parallel processing (same order as above)
@simd
and SIMD.jlBase.Threads.@threads
(experimental since 2015 but seems to be fine)@async
, @sync
, Channel
@spawnat
, @fetch
, RemoteChannel
, SharedArray
, etc.@spawnat
, @fetch
, RemoteChannel
, DArray
, MPI.jl
etc.With scientific computing in mind, we will mainly focus on how to distribute a process through multiple cores or machines (our thp cluster for example), that is Multi-Core processing and Distributed processing. But before we can do so, we have to learn how to control Julia's control flow through tasks.
By default, Julia waits for every command to finish and run everything sequentially.
Tasks are a control flow feature that allows computations to be suspended and resumed in a flexible manner. This feature is sometimes called by other names, such as coroutines, green or lightweight threads and cooperative multitasking.
To me, the name cooperative multitasking is the most descriptive. Tasks are managed/scheduled by Julia and can sometimes be run in a quasi-parallel fashion.
An important use case is asynchronous I/O, which is typically slow. Examples are
How do we execute commands asynchronously?
@async
and @sync
¶(Based on this stackoverflow answer.)
?@async
What this means is that for whatever falls within its scope, Julia will start a task to then proceed to whatever comes next in the script without waiting for the task to complete.
@time sleep(2);
@time @async sleep(2)
Julia allows the script to proceed (and the @time
macro to fully execute) without waiting for the task (in this case, sleeping for two seconds) to complete.
We can use the @sync
macro to synchronize, that is wait for, all encapsulated tasks. (see ?@sync
).
@time @sync @async sleep(2)
Of course, here it doesn't make much sense to write @sync @async
- we could simply drop it altogether.
A better example is the following.
@time @sync begin
@async sleep(2.0)
@async sleep(2.0)
end
@sync begin
@async (sleep(2); println("Today is reverse day!"))
@async (sleep(1); println(" class!"))
@async print("Hello")
end;
Distributed computing in Julia means having multiple separate Julia instances running on different cores on the same or different machines.
Data movement and communication between processes is explicit.
Let's focus on the multi-core case (your laptop/desktop) and save some cluster fun for later.
Julia uses a master-worker paradigm for its native distributed parallelism.
One master process coordinates all the worker processes, which perform the actual computations.
By default, Julia starts with one process on one core. If this single process is all we have, than it is both the master and the worker.
using Distributed # Loading all tools that we need for distributed computing
nprocs()
nworkers() # the master is considered a worker as long as there are no real workers
To increase the number of workers, i.e. Julia processes, from within a Julia session we can use addprocs
.
Alternatively, when starting Julia from the command line, one can use the -p
option. Example,
julia -p 4
will start Julia with 5 processes, 1 master and 4 workers.
addprocs(4) # I have 4 cores, so let's add 4 worker processes.
Every process has a Julia internal pid
(process id). The master is always 1. You can get the workers pids from workers()
.
workers()
Note that the 4 worker's pids aren't necessarily 2, 3, 4 and 5. Let's remove the processes and add them once more.
rmprocs(workers()) # rmprocs(array of pids of worker processes to remove)
nworkers() # only the master is left
addprocs(4)
workers()
@spawn
, @spawnat
, @fetch
, @fetchfrom
, @everywhere
...¶To execute commands and start computations on workers we can use the following macros
@spawn
: run a command or a code block on any worker and return a Future
to it's result. It's basically a version of @async
for remote processes.@spawnat
: same as @spawn
but one can choose a specific worker by providing its pid.Example: Let's say we would like to generate a random matrix on one of the workers.
@spawn rand(2,2) # basically @async for remote process, i.e. returns immediately
result = @spawn rand(2,2)
fetch(result) # blocks, like @sync
Because the combination of spawning at fetching is so common, there is @fetch
which combines them.
@fetch rand(2,2)
Which worker did the work?
@fetch begin
println(myid());
rand(2,2)
end
Using @spawnat
and @fetchfrom
we can delegate the work to a specific worker.
@fetchfrom 7 begin
println(myid());
rand(2,2)
end
We can use @sync
as a blocker to wait for all workers to complete their tasks.
@sync begin
pids = workers()
@spawnat pids[1] (sleep(2); println("Today is reverse day!"))
@spawnat pids[2] (sleep(1); println(" class!"))
@spawnat pids[3] println("Hello")
end;
println("Done!")
Ok, now that we understood all that, let's delegate a complicated calculation
using Random
function complicated_calculation()
sleep(1) # so complex that it takes a long time :)
randexp(5)
end
@fetch complicated_calculation()
What happened?
Think of every worker as a separate Julia instance.
We only defined complicated_calculation()
on the master process. The function doesn't exist on any of the workers yet.
The macro @everywhere
comes for the rescue.
@everywhere begin # execute this block on all workers
using Random
function complicated_calculation()
sleep(1)
randexp(5) # lives in Random
end
end
@fetch complicated_calculation()
There is a crucial difference between the following two pieces of code. Can you guess what it is?
function method1()
A = rand(100,100)
B = rand(100,100)
C = @fetch A^2 * B^2
end
function method2()
C = @fetch rand(100,100)^2 * rand(100,100)^2
end
Let's benchmark them.
using BenchmarkTools
@btime method1();
@btime method2();
Method 1 is slower, because A
and B
are created on the master process, transferred to a worker, and squared and multiplied on the worker process before the result is finally transferred back to the master.
Method 2, on the other hand, creates, squares, and multiplies the random matrix all on the work process and only submits the result to the master.
Hence, method1
is transferring 3x as much data between the master and the worker!
Data movement is crucial!
In this toy example, it's rather easy to identify the faster method.
In a real program, however, understanding data movement does require more thought and likely some measurement.
For example, if the first process needs matrix A
in a follow-up computation then the first method might be better in this case. Or, if computing A
is expensive and only the current process has it, then moving it to another process might be unavoidable.
To understand why thinking about data is important it's instructive to look at the time scales involved in data access.
(taken from https://www.prowesscorp.com/computer-latency-at-a-human-scale/)
myglobal = 4
function whohas(s::String)
@everywhere begin
var = Symbol($s)
if isdefined(Main, var)
println("$var exists.")
else
println("Doesn't exist.")
end
end
nothing
end
whohas("myglobal")
@fetchfrom 6 myglobal+2
whohas("myglobal")
Globals get copied to workers and continue to exist as globals even after the call.
This could lead to memory accumulation if many globals are used (just as it would in a single Julia session).
It's better to avoid them.
Channel
and RemoteChannel
¶Channels in Julia are constructs to explicitly exchange data between workers.
They implement put!
, take!
, fetch
, isready
and wait
methods.
# ?Channel
ch = Channel{Int}(5) # a channel that can hold up to 5 integers
isready(ch) # something in the channel?
put!(ch, 3)
isready(ch)
take!(ch)
isready(ch)
put!(ch, 4)
fetch(ch) # basically take without a bang
take!(ch)
Be careful, take!
and put!
are blocking if the channel is empty or full!
isready(ch)
# take!(ch) if we execute this, while isready(ch) == false, the current Julia session will hang.
RemoteChannel
¶A Channel
is local to a process. Worker 2 cannot directly refer to a Channel
on worker 3 and vice-versa.
A RemoteChannel
, however, can put and take values across workers. A RemoteChannel
can be thought of as a handle to a Channel
.
Any process with a reference to a RemoteChannel
can put and take items from the channel. Data is automatically sent to (or retrieved from) the process a RemoteChannel
is associated with.
The process id, pid, associated with a RemoteChannel
identifies the process where the backing store, i.e., the backing Channel exists.
nworkers()
addprocs(4)
?RemoteChannel
# creates a channel on the second worker process
# create a RemoteChannel handle to this channel on the master process
const mychannel = RemoteChannel(()->Channel{Int}(10), workers()[2])
whohas("mychannel")
# One could create a global constant mychannel everywhere
@everywhere const mychannel = $mychannel
whohas("mychannel")
However, as we said many times before, one should generally try to avoid globals. The following is preferable.
function do_something()
rc = RemoteChannel(()->Channel{Int}(10)) # lives on the master
@sync for p in workers()
@spawnat p put!(rc, myid())
end
rc
end
r = do_something()
isready(r)
while isready(r)
@show take!(r)
end
The ecosystem also contains a couple of tools, that make data transfer even simpler. See for example ParallelDataTransfer.jl.
@distributed
and pmap
¶So far we have seen the build block of commands for distributed computing in Julia. Having scientific computing in mind, one might not always want to think about how to distribute the work and explicitly spawn tasks.
Also, fortunately, many useful parallel computations do not require (much) data movement. A common example is a direct Monte Carlo simulation, where multiple processes can handle independent simulation trials simultaneously. (We'll get to that later!)
Julia provides convenience macros to
@distributed
)pmap
)Let's explore these!
@distributed
)¶using Distributed, BenchmarkTools; rmprocs(workers()); addprocs(4); nworkers()
# serial version - count heads in a series of coin tosses
function add_serial(n)
c = 0
for i = 1:n
c += rand(Bool)
end
c
end
@btime add_serial(200_000_000);
This is trivially parallelizable since the loop iterations are independent of each other. We can distribute coin tosses over a couple of workers.
Afterwards we combine the results, that is we sum them up. The combination process is generally called a reduction, and in this case sum
is the reducer function.
To distribute the for loop over worker processes Julia provides the @distributed
macro:
?@distributed
# distributed version
function add_distributed(n)
c = @distributed (+) for i in 1:n
Int(rand(Bool))
end
c
end
@btime add_distributed(200_000_000);
The distributed version is about 4x faster, which is all we could hope for.
Let's see who is doing the work
# verbose distributed version
function add_distributed(n)
c = @distributed (+) for i in 1:n
x = Int(rand(Bool))
println(x);
x
end
c
end
add_distributed(8);
Apparently, the work is evenly distributed between the workers. By using @distributed
we let Julia decide how to split up the work and can't control it ourselves.
A common mistake when using @distributed
is the following:
function f(n)
a = 0
@distributed (+) for i in 1:n
a += 1
end
a
end
a = f(10);
What do you expect the value of a
to be?
a
We can (sort of) see what's happening by making everything global
a = 0
@distributed (+) for i in 1:10
println("1")
global a += 1
end;
@everywhere @show a
The variable a
gets copied to the worker processes as it is referenced in the distributed loop.
Every worker will then increment its copy of a
.
However, we do not save the result of the reduction (sum) but instead return a
from the master process, which hasn't been altered at all.
Corrected version:
function f2(n)
a = @distributed (+) for i in 1:n
1
end
a
end
a = f2(10)
Similar to the mistake above, the following example might not have the effect one expects. Why?
a = zeros(10)
@distributed for i = 1:10
a[i] = i
end
@everywhere @show a
Note that @distributed
without a reduction function returns a Task
. It is basically a distributed version of @spawn
for all the iterations.
SharedArray
s¶To actually make all processes operate on the same array, one can use a SharedArray
.
Note that a SharedArray
only works if the processes live on the same host.
The constructor of a SharedArray is
SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])
which creates an N
-dimensional shared array of a (bits) type T
and size dims
across the processes specified by pids
.
(If an init
function, of signature initfn(S::SharedArray)
, is specified, it is called on all the participating workers. You can specify that each worker runs the init function on a distinct portion of the array, thereby parallelizing initialization.)
@everywhere using SharedArrays # must be loaded everywhere
A = rand(2,3)
S = SharedArray(A)
Ok, now that we know how to create and fill our SharedArray
we can create a parallel fill function:
function fill_shared_problematic(N)
S = SharedMatrix{Float64}(N,N)
@distributed for i in 1:length(S)
S[i] = i
end
S
end
fill_shared_problematic(3)
Why is the method in its current form problematic? Try to find out yourself by going to larger N
and, for example, inspecting the minimum of the returned SharedArray
!
Going to larger matrix sizes....
function fill_shared_problematic(N)
S = SharedMatrix{Int64}(N,N)
@distributed for i in 1:length(S)
S[i] = i
end
S
end
S = fill_shared_problematic(100)
minimum(S)
Note how sometimes the array isn't completely filled but still contains zeros. This is because it isn't filled yet!
Check again!
minimum(S)
We can use @sync
to synchronize our distributed for loop.
function fill_shared_problematic(N)
S = SharedMatrix{Int64}(N,N)
@sync @distributed for i in 1:length(S) # added @sync here
S[i] = i
end
S
end
S = fill_shared_problematic(100)
minimum(S)
Ok, let's benchmark this for a larger matrix size
# regular array
function fill_regular(N)
A = Matrix{Int64}(undef,N,N)
for i in 1:length(A)
A[i] = i
end
A
end
@time fill_regular(10000);
# shared array
function fill_shared(N)
S = SharedMatrix{Int64}(N,N)
@sync @distributed for i in 1:length(S)
S[i] = i
end
S
end
@time fill_shared(10000);
This is of course just filling an array.
If there were actual calculations it might actually be beneficial to distribute the work across workers.
pmap
¶Sometimes we merely wish to apply a function to all all elements in a collection.
For those cases, Julia provides the pmap
(parallel map) function.
Say, we want to compute the singular values of a bunch of larger matrices in parallel.
using Distributed, BenchmarkTools; rmprocs(workers()); addprocs(Hwloc.num_physical_cores()); nworkers()
@everywhere using LinearAlgebra
M = Matrix{Float64}[rand(200,200) for i = 1:10];
pmap(svdvals, M)
# Check that really all of the workers participated
pmap(m->begin println(myid()); svdvals(m) end, M);
function svds_loop(M)
svds = Vector{Vector{Float64}}(undef, 10)
for (i, m) in enumerate(M)
svds[i] = svdvals(m)
end
svds
end
@time svds_loop(M);
@time svdvals.(M);
@time pmap(svdvals, M);
Julia's pmap is designed for the case where
In contrast, @distributed
can handle situations where
So far we have worked on multiple cores on a single machine, your laptop for example.
Processes can live on other machines as well! This allows us to distribute our computation across computer clusters.
In principle, the plan of action is the same as in the multi-core case. However, we have to take into account the different memory situation. In particular, data movement is expensive and we won't be able to use SharedArray
s.
rmprocs(workers()) # fresh start
Adding processes on different machines is not much harder than adding them on your local machine. In the following we will take the last example, calculating singular values of a bunch of matrices, and distribute it over multiple computers in our thp network.
In Julia, starting worker processes is handled by ClusterManagers.
LocalManager
. It is automatically used when running addprocs(i::Integer)
and we have implicitly used it already!SSHManager
. It is automatically used when running addprocs(hostnames::Array)
.Other cluster managers for SLURM, PBS, and others are provided in ClusterManagers.jl.
In principle, starting processes on other computers can be done by addprocs(["l93", "l94"])
, where "l93"
and "l94"
are hostnames. The only requirement is a passwordless ssh access to all specified hosts.
Demonstrate in terminal from thp node
using Distributed
addprocs(["l93", "l94"])
@everywhere println(gethostname())
One can also start multiple processes on different machines:
addprocs([("l93", 2), ("l94", 3)]) # starts 2 workers on l92 and 3 workers on l93
# Use :auto to start as many processes as CPUs are available
By default, addprocs
expects the julia executable in the same folder as on the master computer (remember: workers are independent Julia processes). It will also try to cd
to the same folder.
In my case this would be
@show pwd();
@show Sys.BINDIR;
Both folders don't exist in my thp account (those are linux machines!), so I'll have to tell Julia to use different paths.
Also, as per thp cluster guidelines one (!) must (!) run computations on other thp computer with nice -19
priority setting!
nice -19
workers and specifying directories¶As you can see from ?addprocs
, addprocs
takes a bunch of keyword arguments, two of which are of particular importance.
dir
: working directory of the worker processexename
: path to julia executable (potentially augmented with pre-commands)params = (exename=`nice -19 /home/bauer/bin/julia-1.5.3/bin/julia --project=/home/bauer/JuliaNRW21`, dir="/home/bauer")
addprocs([("l93", :auto)]; params...)
@everywhere println(gethostname())
rmprocs(workers())
Ok, let's get some resources :)
machines = ["l93", "l94", "l96"];
procs_per_machine = :auto; # :auto for n = # cpus
jobs = [(m,procs_per_machine) for m in machines]
addprocs(jobs; params...)
@everywhere println(gethostname())
@everywhere using LinearAlgebra
@time x = pmap(svdvals, M);
DArray
)¶Github: https://github.com/JuliaParallel/DistributedArrays.jl
In a DArray
, each process has local access to just a chunk of the data, and no two processes share the same chunk. Processes can be on different hosts.
Distributed arrays are for example useful if
@everywhere using DistributedArrays, LinearAlgebra
M = Matrix{Float64}[rand(200,200) for i = 1:10];
D = distribute(M)
Which workers hold parts of D?
procs(D)
Which parts do they hold?
localpart(D) # the master doesn't hold anything
# Which parts do they hold?
for p in workers()
display(@fetchfrom p localpart(D))
display(@fetchfrom p DistributedArrays.localindices(D)) # DistributedArrays. necessary because of SharedArrays above
end
@time Msquared = map(svdvals, M);
@time Dsquared = map(svdvals, D);
@time Psquared = pmap(svdvals, M);
Msquared ≈ Dsquared
Dsquared ≈ Psquared
But remember, for small operations the data movement can (and will) exceed the benefit of parallelizing the computation!
@time map(sum, M);
@time map(sum, D);
# Stop worker processes!
rmprocs(workers())