A Deep Dive into Distributed.jl
Distributed.jl is a standard library in Julia to do multi-processing and distributed computing. The distributed-computing section in the official julia docs already provides a nice introduction on how to use it. I assume most of you have skimmed through it. So here I'll mainly focus on how this package is designed and implemented, hoping that you'll have a better understanding after this talk.
My talk will be organized in the Q&A style. So feel free to raise more questions in the end of this talk if anything is still unclear to you.
1. How to initialize Distributed.jl
?
The easiest way is to start the julia
with an extra parameter -p auto
. Then based on the number of logical cores on your machine, the same number of julia processors will be created and connected on your machine. Or you can specify the number explicitly.
βββββββββββββββ
ββββββΊ myid() == 2 β
β βββββββββββββββ
βββββββββββββββ β
β julia -p 3 β β βββββββββββββββ
β ββββββΌβββββΊ myid() == 3 β
β myid() == 1 β β βββββββββββββββ
βββββββββββββββ β
β βββββββββββββββ
ββββββΊ myid() == 4 β
βββββββββββββββ
To create multiple processors, we can provide a file like this:
println(run(`cat $(joinpath(@__DIR__, "local_machines"))`))
2*127.0.0.1
2*127.0.0.1
And set it to the --machine-file
argument. In the above file, we use the localhost 127.0.0.1
for simplicity.
βββββββββββββββ
ββββΊ myid() == 2 β
ββββββββββ β βββββββββββββββ
ββββΊ Node 1 ββββ€
β ββββββββββ β βββββββββββββββ
β ββββΊ myid() == 3 β
β βββββββββββββββ
ββββββββββββββββββββββββ β
β julia --machine-file β β
β ββββ€
β myid() == 1 β β βββββββββββββββ
ββββββββββββββββββββββββ β ββββΊ myid() == 4 β
β ββββββββββ β βββββββββββββββ
ββββΊ Node 2 ββββ€
ββββββββββ β βββββββββββββββ
ββββΊ myid() == 5 β
βββββββββββββββ
Of course, you can also set them dynamically with addprocs
.
1.1 Why is there an extra processor created?
It's mentioned in the distributed-computing section from the official doc that:
Communication in Julia is generally "one-sided", meaning that the programmer needs to explicitly manage only one process in a two-process operation.
This means that, by design the processor we created to launch workers will serve as the header. Its main goal is to dispatch jobs to workers instead of doing computing on its own. So in most cases, we just execute code on the header and tell workers what they need to do. That is what "one-sided" means above.
So the header processor is just the extra one we created.
2. How are the workers created?
This can be narrowed down to several more specific questions. But before answering them, let's see what's happening when we load the package with using Distributed
first.
function __init__()
init_parallel()
end
function init_parallel()
start_gc_msgs_task()
# start in "head node" mode, if worker, will override later.
global PGRP
global LPROC
LPROC.id = 1
@assert isempty(PGRP.workers)
register_worker(LPROC)
end
Let's ignore the first line for now. We'll discuss start_gc_msgs_task
later. Two important global variables are initialized here, the first one is LPROC
(Local Processor for short), which serves like an identifier of the current processor. The second one is PGRP
, (Processor Group for short). It records the workers we'll create later. It will be initialized with the only one element LPROC
here. And that's why you'll see only one worker with id 1
is returned from workers()
.
julia> using Distributed
julia> workers()
1-element Vector{Int64}: 1
The function to create workers is provided by addprocs
. By default, it will be dispatched to addprocs_locked(manager::ClusterManager; kw...)
(Here ClusterManager
is an abstract type, we'll introduce two typical implementations in Distributed.jl
soon). Since two main public APIs are involved in its implementation, let's examine the code in detail here:
t_launch = @async launch(manager, params, launched, launch_ntfy)
@sync begin
while true
if isempty(launched)
istaskdone(t_launch) && break
@async (sleep(1); notify(launch_ntfy))
wait(launch_ntfy)
end
if !isempty(launched)
wconfig = popfirst!(launched)
let wconfig=wconfig
@async setup_launched_worker(manager, wconfig, launched_q)
end
end
end
end
First, it tries to call the launch
method implemented by specific ClusterManager
asynchronously. Then the main task will periodically check whether the launch
task has been finshed or not every second. If not, it'll try to setup the the connection with workers which has already been added into launched
by the ClusterManager
. Note that the outer @sync
will guarantee all the setup_launched_worker
calls are finished (all connections between header and worker are initialized).
So what should launch
do? As the doc says:
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
Implemented by cluster managers. For every Julia worker launched by this function, it should append a
WorkerConfig
entry tolaunched
and notifylaunch_ntfy
. The function MUST exit once all workers, requested bymanager
have been launched.params
is a dictionary of all keyword argumentsaddprocs
was called with.
But what is WorkerConfig
? Its definition in Distributed.jl
is specifically tight with two ClusterManager
and we'll discuss soon. Basically it describes where and how the header should send messages to (Like a IO stream, or a http connection). Once the ClusterManager
has finished initializing the worker processor, we need to register this worker in the header:
- Figure out where to read messages from (
r_s
) and write messages to (w_s
) worker. - Bind the
finalizer
of the worker to tell its cluster manager when it's finalized. - Create an async task to handle messages from the worker.
- Send the header's information to worker so that the worker knows where the header is and how to send messages.
- Wait until the worker confirms and joins the cluster. Otherwise, remove it if time is out.
2.1 How are workers created on my local machine?
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
dir = params[:dir]
exename = params[:exename]
exeflags = params[:exeflags]
bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)`
for i in 1:manager.np
cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker`
io = open(detach(setenv(cmd, dir=dir)), "r+")
write_cookie(io)
wconfig = WorkerConfig()
wconfig.process = io
wconfig.io = io.out
wconfig.enable_threaded_blas = params[:enable_threaded_blas]
push!(launched, wconfig)
end
notify(c)
end
Pretty straightforward, right? A new julia processor is created and then a worker config is properly set. But how does this new processor handle new messages sent from sender or other workers? Note that there's an extra option --worker
in the cmd
.
distributed_mode = (opts.worker == 1) || (opts.nprocs > 0) || (opts.machine_file != C_NULL)
if distributed_mode
let Distributed = require(PkgId(UUID((0x8ba89e20_285c_5b6f, 0x9357_94700520ee1b)), "Distributed"))
Core.eval(Main, :(const Distributed = $Distributed))
Core.eval(Main, :(using .Distributed))
end
invokelatest(Main.Distributed.process_opts, opts)
end
With this option, the newly created julia processor will do some extra work for us to have everything properly set. Basically, it will create a new socked connection and handle messages coming in. You can use netstat -tunlp | grep julia
to see which ports the workers are using.
2.2 How are workers created across different machines?
To create workers across machines, Distributed.jl
provides a SSH based cluster manager. Although the code in the SSHManager
looks very complex, the core idea behind is similar to the LocalManager
. We run the command to create a julia worker processor through SSH and record the process id. Then we can use this id to kill the worker if required.
2.3 A practical example
In ClusterManagers.jl, several common cluster managers are provided. You should definitely check it first if the default LocalManager
and SSHManager
don't apply in your environment. Now let's take a close look at an interesting one: ElasticManager
.
ββββββββββ
βββ€ worker β
β ββββββββββ
βββββββββββββββ connect β ......
β socket β β ββββββββββ
β ββββββββββββΌββ€ worker β
β - address β β ββββββββββ
β - port β β ......
ββββββ²ββββ¬βββββ β ββββββββββ
β β βββ€ worker β
β β ββββββββββ
β β
β β received new connection
watch β β
new conn β β check cookie
β β
β β push into
β β
ββββ΄ββββΌβββ
β pending β
ββββββ¬βββββ
β
β
β
ββββββΌββββββ
β addprocs β
ββββββββββββ
After initialization, the ElasticManageer
created two background tasks: the first one is to watch on new connections, and the second one is to add new processors by reusing the addprocs
function in Distributed.jl
. In the launch
step, it simply take take the pending socket and add it into launched
. And in the manage
step, it simply maintains a dict of active workers.
3. How do workers communicate?
In the above section, we've mentioned that a worker processor will run start_worker
first after initialization, and then wait for messages. But how are messages encoded and interpreted?
βββββββββββββββββββββββ
β ......... β
β βββββββββββββββββββ β
β β BOUNDARY β β
β βββββββββββββββββββ β
β β
β βββββββββββββββββββ β
β β header β β
β β β β
ββββΌββΊ * response_oid β β
β β β * notify_oid β β
β β βββββββββββββββββββ β
β β β
β β βββββββββββββββββββ β
ββββββββββββ β β β β β ββββββββββββββ
β send_msg ββββΌβββΌββΊ builtin message βββΌβββββΊ handle_msg β
ββββββββββββ β β β β β ββββββββββββββ
β β βββββββββββββββββββ β
β β β
β β βββββββββββββββββββ β
ββββΌββΊ BOUNDARY β β
β βββββββββββββββββββ β
β ......... β
βββββββββββββββββββββββ
Messages are serialized into a reader steam seperated by a collection of constant bytes (MSG_BOUNDARY
). Each message has two parts, a header and a message body. In Distributed.jl
, several predefined types of messages are provided. Each message then be deserialized and dispatched to a specific handle_msg
implementation. Each header has exactly two fields, the response_oid
and notify_oid
. Now we are going to study two key concepts in Distributed.jl
: remote call and remote reference.
3.1 Remote reference
In Distributed.jl
, each worker has a unique id (myid()
). To locate objects created by Distributed.jl
, each of them will also have a unique id in that processor.
const REF_ID = Threads.Atomic{Int}(1)
next_ref_id() = Threads.atomic_add!(REF_ID, 1)
struct RRID
whence::Int
id::Int
RRID() = RRID(myid(), next_ref_id())
RRID(whence, id) = new(whence, id)
end
As you can see, by default the id
is auto-increment. And the whence
is set to the current worker. Remember that each AbstractRemoteRef
instance must at least contain these two basic id to locate it.
Future
mutable struct Future <: AbstractRemoteRef
where::Int
whence::Int
id::Int
v::Union{Some{Any}, Nothing}
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) =
(r = new(w,rrid.whence,rrid.id,v); return test_existing_ref(r))
Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],t[4]) # Useful for creating dummy, zeroed-out instances
end
A Future
is an abstract container of a remote object (it can also reside in the current processor). Beyond the whence
and id
, it has two extra fields. The where
indicates where the underlying value v
is stored. Understanding the difference between where
and whence
is crucial. Let's say we're on worker 1
and would like to do a simple calculation of 1+1
on worker 2
. Without the where
field, we have to first send the remote calculation message to worker 2
. Then the worker create a unique RRID
and return it to worker 1
. And when we want to fetch the calculation result v
, we have to send the fetch command to worker 2
again and wait until the calculation is done and passed the result back to worker 1
.
timeline
β
βββββββββββββββββββββββββββββββββ β ββββββββββββββββββββββββββββββββββ
β worker 1 β β β worker 2β
β β β β β
β remotecall F: 1 + 1 ββΌβββΌβββ€βΊ β
β β β β create a unique remote ref: R β
β waiting...... β β β β
β βββββΌβββΌβ pass back R β
β remote_ref R received β β β β
β β β β do the calculation of F β
β do some other calculation β β β β
β β β β ...... β
β ...... β β β β
β β β β β
β fetch result from remote_ref ββΌβββΌβββ€βΊ wait F finish β
β β β β β
β β β β ...... β
β waiting...... β β β β
β βββββΌβββΌβ send back result β
β remote value cached β β β β
β β β β β
βββββββββββββββββββββββββββββββββ β ββββββββββββββββββββββββββββββββββ
βΌ
But if we have a where
field to record where the calculation happens, then the first round could be reduced.
timeline
β
βββββββββββββββββββββββββββββββ β βββββββββββββββββββββ
β worker 1 β β β worker 2 β
β β β β β
β create remote call F ββΌβββΌβββ€βΊ received F & R β
β and remote ref R β β β β
β β β β do calculation β
β do some other calculation β β β β
β β β β ...... β
β ...... β β β β
β β β β β
β fetch reult from where(R) ββΌβββΌβββ€βΊ wait F finish β
β β β β β
β β β β ...... β
β waiting...... β β β β
β βββββΌβββΌβ send back result β
β remote value cached β β β β
βββββββββββββββββββββββββββββββ β βββββββββββββββββββββ
βΌ
RemoteChannel
is similar, except that the underlying value can not be copied back and forth. We can only check whether it is ready and take!
elements from it.
3.2 Remote call
Now let's examine what's happening in the following simple code:
using Distributed
addprocs(2)
x = @spawnat 2 begin
sleep(3)
rand(3)
end
print(x[])
First, we create a worker with addprocs(2)
. Then we try to create a remote call which simply returns a random vector. The @spawnat
macro will wrap the following expression into a parameterless function (usually known as a thunk
) and turn it into a remotecall
. In remotecall
, a Future
is first created to store the result in the future. Then the whence
and id
info of the future is extracted to form the message header (a RRID
). The thunk
is wrapped in a specific CallMsg
and forms the message body. The whole message is serialized and written to the worker's r_stream
. Note that the remotecall
is async, so you can work on some other stuff and fetch the result later. On the worker side, once received a new message, it is deserialized and dispatched to the corresponding handle_msg
call. For CallMsg
, it will create a temp Channel(1)
to store the result and associate it with the RRID
in its global client_refs
. Now when we try to fetch the result with x[]
, it will send another message of remotecall_fetch
with a dedicated function fetch_ref
and wait for the response. Once it receives the result. It will be cached in its :v
field, so that future fetch calls will not do the remote call again.
The following up question is, what about the original data on worker 2
? Since now we have fetched and cached the value, we won't need it anymore.
Actually we can't remove it from worker 2
directly. Let's say we send the future x
to another worker 3
before it fetches the result. If the result is removed from worker 2
immediately in respond to the fetch operation on worker 1
. Then the worker 3
will never get the chance to fetch the result. But we still need to remove it sometime, right? Otherwise the memory usage will keep growing. In fact, the worker where the data of the Future
resides will keep track of the connected workers of this Future
. Everytime the GC hapens with a Future
, we'll try to delete itself from the connected clients. And when no connected clients left, we're safe to remove it from client_refs
.
Note that some expresions can't be executed through @spawnat
(for example using SomePackage
). That's why we have several different message types and handle_msg
implementations. But the whole pipeline is almost the same.
3.3 WorkerPool
and pmap
With the knowledge above, now we know how the Distributed.jl
works. But still it is not very easy to use since we have to deal with the worker id directly. That's why WorkerPool
and pmap
are provided. pmap
tries to divide the workload first and the worker pool can help to leverage computing resource more efficiently. This part is relatively easy to read and understand.
Is Distributed.jl
perfect?
Well, the standard answer is, there's no perfect design, only traceoffs. In my opinion, Distributed.jl
is designed for HPC environments, where all the workers are quite stable. All the functionalities it provides are very fundamental and do not feature usability that much. On top of it, there's a Dagger.jl which is more usable for dynamic graph computing. And I'm also considering implementing a more flexible one. Stay tuned!