A Deep Dive into Distributed.jl
Distributed.jl is a standard library in Julia to do multi-processing and distributed computing. The distributed-computing section in the official julia docs already provides a nice introduction on how to use it. I assume most of you have skimmed through it. So here I'll mainly focus on how this package is designed and implemented, hoping that you'll have a better understanding after this talk.
My talk will be organized in the Q&A style. So feel free to raise more questions in the end of this talk if anything is still unclear to you.
1. How to initialize Distributed.jl?
The easiest way is to start the julia with an extra parameter -p auto. Then based on the number of logical cores on your machine, the same number of julia processors will be created and connected on your machine. Or you can specify the number explicitly.
βββββββββββββββ
ββββββΊ myid() == 2 β
β βββββββββββββββ
βββββββββββββββ β
β julia -p 3 β β βββββββββββββββ
β ββββββΌβββββΊ myid() == 3 β
β myid() == 1 β β βββββββββββββββ
βββββββββββββββ β
β βββββββββββββββ
ββββββΊ myid() == 4 β
βββββββββββββββ
To create multiple processors, we can provide a file like this:
println(run(`cat $(joinpath(@__DIR__, "local_machines"))`))2*127.0.0.1
2*127.0.0.1And set it to the --machine-file argument. In the above file, we use the localhost 127.0.0.1 for simplicity.
βββββββββββββββ
ββββΊ myid() == 2 β
ββββββββββ β βββββββββββββββ
ββββΊ Node 1 ββββ€
β ββββββββββ β βββββββββββββββ
β ββββΊ myid() == 3 β
β βββββββββββββββ
ββββββββββββββββββββββββ β
β julia --machine-file β β
β ββββ€
β myid() == 1 β β βββββββββββββββ
ββββββββββββββββββββββββ β ββββΊ myid() == 4 β
β ββββββββββ β βββββββββββββββ
ββββΊ Node 2 ββββ€
ββββββββββ β βββββββββββββββ
ββββΊ myid() == 5 β
βββββββββββββββOf course, you can also set them dynamically with addprocs.
1.1 Why is there an extra processor created?
It's mentioned in the distributed-computing section from the official doc that:
Communication in Julia is generally "one-sided", meaning that the programmer needs to explicitly manage only one process in a two-process operation.
This means that, by design the processor we created to launch workers will serve as the header. Its main goal is to dispatch jobs to workers instead of doing computing on its own. So in most cases, we just execute code on the header and tell workers what they need to do. That is what "one-sided" means above.
So the header processor is just the extra one we created.
2. How are the workers created?
This can be narrowed down to several more specific questions. But before answering them, let's see what's happening when we load the package with using Distributed first.
function __init__()
init_parallel()
endfunction init_parallel()
start_gc_msgs_task()
# start in "head node" mode, if worker, will override later.
global PGRP
global LPROC
LPROC.id = 1
@assert isempty(PGRP.workers)
register_worker(LPROC)
endLet's ignore the first line for now. We'll discuss start_gc_msgs_task later. Two important global variables are initialized here, the first one is LPROC (Local Processor for short), which serves like an identifier of the current processor. The second one is PGRP, (Processor Group for short). It records the workers we'll create later. It will be initialized with the only one element LPROC here. And that's why you'll see only one worker with id 1 is returned from workers().
julia> using Distributedjulia> workers()1-element Vector{Int64}: 1
The function to create workers is provided by addprocs. By default, it will be dispatched to addprocs_locked(manager::ClusterManager; kw...) (Here ClusterManager is an abstract type, we'll introduce two typical implementations in Distributed.jl soon). Since two main public APIs are involved in its implementation, let's examine the code in detail here:
t_launch = @async launch(manager, params, launched, launch_ntfy)
@sync begin
while true
if isempty(launched)
istaskdone(t_launch) && break
@async (sleep(1); notify(launch_ntfy))
wait(launch_ntfy)
end
if !isempty(launched)
wconfig = popfirst!(launched)
let wconfig=wconfig
@async setup_launched_worker(manager, wconfig, launched_q)
end
end
end
endFirst, it tries to call the launch method implemented by specific ClusterManager asynchronously. Then the main task will periodically check whether the launch task has been finshed or not every second. If not, it'll try to setup the the connection with workers which has already been added into launched by the ClusterManager. Note that the outer @sync will guarantee all the setup_launched_worker calls are finished (all connections between header and worker are initialized).
So what should launch do? As the doc says:
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
Implemented by cluster managers. For every Julia worker launched by this function, it should append a
WorkerConfigentry tolaunchedand notifylaunch_ntfy. The function MUST exit once all workers, requested bymanagerhave been launched.paramsis a dictionary of all keyword argumentsaddprocswas called with.
But what is WorkerConfig? Its definition in Distributed.jl is specifically tight with two ClusterManager and we'll discuss soon. Basically it describes where and how the header should send messages to (Like a IO stream, or a http connection). Once the ClusterManager has finished initializing the worker processor, we need to register this worker in the header:
- Figure out where to read messages from (
r_s) and write messages to (w_s) worker. - Bind the
finalizerof the worker to tell its cluster manager when it's finalized. - Create an async task to handle messages from the worker.
- Send the header's information to worker so that the worker knows where the header is and how to send messages.
- Wait until the worker confirms and joins the cluster. Otherwise, remove it if time is out.
2.1 How are workers created on my local machine?
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
dir = params[:dir]
exename = params[:exename]
exeflags = params[:exeflags]
bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)`
for i in 1:manager.np
cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker`
io = open(detach(setenv(cmd, dir=dir)), "r+")
write_cookie(io)
wconfig = WorkerConfig()
wconfig.process = io
wconfig.io = io.out
wconfig.enable_threaded_blas = params[:enable_threaded_blas]
push!(launched, wconfig)
end
notify(c)
endPretty straightforward, right? A new julia processor is created and then a worker config is properly set. But how does this new processor handle new messages sent from sender or other workers? Note that there's an extra option --worker in the cmd.
distributed_mode = (opts.worker == 1) || (opts.nprocs > 0) || (opts.machine_file != C_NULL)
if distributed_mode
let Distributed = require(PkgId(UUID((0x8ba89e20_285c_5b6f, 0x9357_94700520ee1b)), "Distributed"))
Core.eval(Main, :(const Distributed = $Distributed))
Core.eval(Main, :(using .Distributed))
end
invokelatest(Main.Distributed.process_opts, opts)
endWith this option, the newly created julia processor will do some extra work for us to have everything properly set. Basically, it will create a new socked connection and handle messages coming in. You can use netstat -tunlp | grep julia to see which ports the workers are using.
2.2 How are workers created across different machines?
To create workers across machines, Distributed.jl provides a SSH based cluster manager. Although the code in the SSHManager looks very complex, the core idea behind is similar to the LocalManager. We run the command to create a julia worker processor through SSH and record the process id. Then we can use this id to kill the worker if required.
2.3 A practical example
In ClusterManagers.jl, several common cluster managers are provided. You should definitely check it first if the default LocalManager and SSHManager don't apply in your environment. Now let's take a close look at an interesting one: ElasticManager.
ββββββββββ
βββ€ worker β
β ββββββββββ
βββββββββββββββ connect β ......
β socket β β ββββββββββ
β ββββββββββββΌββ€ worker β
β - address β β ββββββββββ
β - port β β ......
ββββββ²ββββ¬βββββ β ββββββββββ
β β βββ€ worker β
β β ββββββββββ
β β
β β received new connection
watch β β
new conn β β check cookie
β β
β β push into
β β
ββββ΄ββββΌβββ
β pending β
ββββββ¬βββββ
β
β
β
ββββββΌββββββ
β addprocs β
ββββββββββββAfter initialization, the ElasticManageer created two background tasks: the first one is to watch on new connections, and the second one is to add new processors by reusing the addprocs function in Distributed.jl. In the launch step, it simply take take the pending socket and add it into launched. And in the manage step, it simply maintains a dict of active workers.
3. How do workers communicate?
In the above section, we've mentioned that a worker processor will run start_worker first after initialization, and then wait for messages. But how are messages encoded and interpreted?
βββββββββββββββββββββββ
β ......... β
β βββββββββββββββββββ β
β β BOUNDARY β β
β βββββββββββββββββββ β
β β
β βββββββββββββββββββ β
β β header β β
β β β β
ββββΌββΊ * response_oid β β
β β β * notify_oid β β
β β βββββββββββββββββββ β
β β β
β β βββββββββββββββββββ β
ββββββββββββ β β β β β ββββββββββββββ
β send_msg ββββΌβββΌββΊ builtin message βββΌβββββΊ handle_msg β
ββββββββββββ β β β β β ββββββββββββββ
β β βββββββββββββββββββ β
β β β
β β βββββββββββββββββββ β
ββββΌββΊ BOUNDARY β β
β βββββββββββββββββββ β
β ......... β
βββββββββββββββββββββββ
Messages are serialized into a reader steam seperated by a collection of constant bytes (MSG_BOUNDARY). Each message has two parts, a header and a message body. In Distributed.jl, several predefined types of messages are provided. Each message then be deserialized and dispatched to a specific handle_msg implementation. Each header has exactly two fields, the response_oid and notify_oid. Now we are going to study two key concepts in Distributed.jl: remote call and remote reference.
3.1 Remote reference
In Distributed.jl, each worker has a unique id (myid()). To locate objects created by Distributed.jl, each of them will also have a unique id in that processor.
const REF_ID = Threads.Atomic{Int}(1)
next_ref_id() = Threads.atomic_add!(REF_ID, 1)
struct RRID
whence::Int
id::Int
RRID() = RRID(myid(), next_ref_id())
RRID(whence, id) = new(whence, id)
endAs you can see, by default the id is auto-increment. And the whence is set to the current worker. Remember that each AbstractRemoteRef instance must at least contain these two basic id to locate it.
Future
mutable struct Future <: AbstractRemoteRef
where::Int
whence::Int
id::Int
v::Union{Some{Any}, Nothing}
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) =
(r = new(w,rrid.whence,rrid.id,v); return test_existing_ref(r))
Future(t::NTuple{4, Any}) = new(t[1],t[2],t[3],t[4]) # Useful for creating dummy, zeroed-out instances
endA Future is an abstract container of a remote object (it can also reside in the current processor). Beyond the whence and id, it has two extra fields. The where indicates where the underlying value v is stored. Understanding the difference between where and whence is crucial. Let's say we're on worker 1 and would like to do a simple calculation of 1+1 on worker 2. Without the where field, we have to first send the remote calculation message to worker 2. Then the worker create a unique RRID and return it to worker 1. And when we want to fetch the calculation result v, we have to send the fetch command to worker 2 again and wait until the calculation is done and passed the result back to worker 1.
timeline
β
βββββββββββββββββββββββββββββββββ β ββββββββββββββββββββββββββββββββββ
β worker 1 β β β worker 2β
β β β β β
β remotecall F: 1 + 1 ββΌβββΌβββ€βΊ β
β β β β create a unique remote ref: R β
β waiting...... β β β β
β βββββΌβββΌβ pass back R β
β remote_ref R received β β β β
β β β β do the calculation of F β
β do some other calculation β β β β
β β β β ...... β
β ...... β β β β
β β β β β
β fetch result from remote_ref ββΌβββΌβββ€βΊ wait F finish β
β β β β β
β β β β ...... β
β waiting...... β β β β
β βββββΌβββΌβ send back result β
β remote value cached β β β β
β β β β β
βββββββββββββββββββββββββββββββββ β ββββββββββββββββββββββββββββββββββ
βΌBut if we have a where field to record where the calculation happens, then the first round could be reduced.
timeline
β
βββββββββββββββββββββββββββββββ β βββββββββββββββββββββ
β worker 1 β β β worker 2 β
β β β β β
β create remote call F ββΌβββΌβββ€βΊ received F & R β
β and remote ref R β β β β
β β β β do calculation β
β do some other calculation β β β β
β β β β ...... β
β ...... β β β β
β β β β β
β fetch reult from where(R) ββΌβββΌβββ€βΊ wait F finish β
β β β β β
β β β β ...... β
β waiting...... β β β β
β βββββΌβββΌβ send back result β
β remote value cached β β β β
βββββββββββββββββββββββββββββββ β βββββββββββββββββββββ
βΌRemoteChannel is similar, except that the underlying value can not be copied back and forth. We can only check whether it is ready and take! elements from it.
3.2 Remote call
Now let's examine what's happening in the following simple code:
using Distributed
addprocs(2)
x = @spawnat 2 begin
sleep(3)
rand(3)
end
print(x[])First, we create a worker with addprocs(2). Then we try to create a remote call which simply returns a random vector. The @spawnat macro will wrap the following expression into a parameterless function (usually known as a thunk) and turn it into a remotecall. In remotecall, a Future is first created to store the result in the future. Then the whence and id info of the future is extracted to form the message header (a RRID). The thunk is wrapped in a specific CallMsg and forms the message body. The whole message is serialized and written to the worker's r_stream. Note that the remotecall is async, so you can work on some other stuff and fetch the result later. On the worker side, once received a new message, it is deserialized and dispatched to the corresponding handle_msg call. For CallMsg, it will create a temp Channel(1) to store the result and associate it with the RRID in its global client_refs. Now when we try to fetch the result with x[], it will send another message of remotecall_fetch with a dedicated function fetch_ref and wait for the response. Once it receives the result. It will be cached in its :v field, so that future fetch calls will not do the remote call again.
The following up question is, what about the original data on worker 2? Since now we have fetched and cached the value, we won't need it anymore.
Actually we can't remove it from worker 2 directly. Let's say we send the future x to another worker 3 before it fetches the result. If the result is removed from worker 2 immediately in respond to the fetch operation on worker 1. Then the worker 3 will never get the chance to fetch the result. But we still need to remove it sometime, right? Otherwise the memory usage will keep growing. In fact, the worker where the data of the Future resides will keep track of the connected workers of this Future. Everytime the GC hapens with a Future, we'll try to delete itself from the connected clients. And when no connected clients left, we're safe to remove it from client_refs.
Note that some expresions can't be executed through @spawnat (for example using SomePackage). That's why we have several different message types and handle_msg implementations. But the whole pipeline is almost the same.
3.3 WorkerPool and pmap
With the knowledge above, now we know how the Distributed.jl works. But still it is not very easy to use since we have to deal with the worker id directly. That's why WorkerPool and pmap are provided. pmap tries to divide the workload first and the worker pool can help to leverage computing resource more efficiently. This part is relatively easy to read and understand.
Is Distributed.jl perfect?
Well, the standard answer is, there's no perfect design, only traceoffs. In my opinion, Distributed.jl is designed for HPC environments, where all the workers are quite stable. All the functionalities it provides are very fundamental and do not feature usability that much. On top of it, there's a Dagger.jl which is more usable for dynamic graph computing. And I'm also considering implementing a more flexible one. Stay tuned!