Skip to content

Instantly share code, notes, and snippets.

@mdavezac
Last active December 29, 2015 14:29
Show Gist options
  • Save mdavezac/7684651 to your computer and use it in GitHub Desktop.
Save mdavezac/7684651 to your computer and use it in GitHub Desktop.
Trying out julia + mpi
const openmpi = find_library(["libmpi.so", "libmpi.dylib"])
const mpich = find_library(["libmpich.so", "libmpich.dylib"])
if openmpi != ""
function init_openmpi()
l = Cint[0]
handle = dlopen(openmpi, RTLD_GLOBAL)
mpi_init = dlsym(handle, :MPI_Init)
mpi_finalize_symbol = dlsym(handle, :MPI_Finalize)
a = ccall(mpi_init, Int, (Ptr{Cint}, Ptr{Cint}), l, l)
function mpi_finalize()
a = ccall(mpi_finalize_symbol, Int, ())
dlclose(handle)
end
atexit(mpi_finalize)
return handle
end
const openmpi_handle = init_openmpi()
for name = [:int, :long, :float, :double, :char, :unsigned_char, :unsigned_short]
@eval const $(parse("mpi_$name")) = cglobal(dlsym(openmpi_handle, $(parse(":ompi_mpi_$name"))))
end
for name = [:world, :self]
@eval const $name = cglobal(dlsym(openmpi_handle, $(parse(":ompi_mpi_comm_$name"))))
end
# Creates first argument to ccall
function mpi_args(name::Symbol)
return dlsym(openmpi_handle, name)
end
const mpi_success = 0
elseif mpich != ""
function init_mpich()
l = Cint[0]
@eval begin
a = @eval ccall((:MPI_Init, $mpich), Int, (Ptr{Cint}, Ptr{Cint}), l, l)
function mpi_finalize()
a = ccall((:MPI_Finalize, $mpich), Int, ())
end
end
atexit(mpi_finalize)
end
init_mpich()
const world = 0x44000000
# Creates first argument to ccall
function mpi_args(name::Symbol)
return (name, "libmpich.so")
end
end
type MPIException <: Exception end
@eval begin
function _world_size()
l = Cint[0]
result = ccall($(mpi_args(:MPI_Comm_size)), Cint, (Ptr{Void}, Ptr{Cint}), $world, l)
if result != mpi_success throw(MPIException) end
return l[1]
end
function _world_rank()
l = Cint[0]
result = ccall($(mpi_args(:MPI_Comm_rank)), Cint, (Ptr{Void}, Ptr{Cint}), $world, l)
if result != mpi_success throw(MPIException) end
return l[1] + 1
end
end
const world_rank = _world_rank()
const world_size = _world_size()
# Collects ip addresses across all processes
function get_ip_addresses()
# Address of the machine
address = "$(string(Base.getipaddr()))\0"
# Send size of the data
ownsize = Cint[convert(Cint, length(address))]
allsizes = Cint[0 for i = 1:world_size]
result = @eval ccall(
$(mpi_args(:MPI_Allgather)),
Cint,
(Ptr{Cint}, Cint, Ptr{Void}, Ptr{Cint}, Cint, Ptr{Void}, Ptr{Void}),
$ownsize, 1, $mpi_int, $allsizes, 1, $mpi_int, world
)
if result != mpi_success throw(MPIException) end
# Now collect addresses
maxsize = maximum(allsizes)
addresses = repeat("\0", maxsize*world_size)
result = @eval ccall(
$(mpi_args(:MPI_Allgather)),
Cint,
(Ptr{Cchar}, Cint, Ptr{Void}, Ptr{Cchar}, Cint, Ptr{Void}, Ptr{Void}),
$address, $maxsize, $mpi_char, $addresses, $maxsize, $mpi_char, world
)
if result != mpi_success throw(MPIException) end
result = String[]
j = 1
for i = 1:world_size
address = addresses[j:j+allsizes[i]-1]
j += allsizes[i]
push!(result, address[1:findfirst(address, '\0')-1])
end
result
end
# Start mpi workers
# If a worker, then only returns at end of program.
# If root node, then returns ports and hostnames
function transform_mpi_to_workers()
hosts = get_ip_addresses()
if world_rank == 1
port = Cint[0]
ports = Cint[0 for i = 1: world_size]
result = @eval ccall(
$(mpi_args(:MPI_Gather)),
Cint,
(Ptr{Cchar}, Cint, Ptr{Void}, Ptr{Cchar}, Cint, Ptr{Void}, Cint, Ptr{Void}),
$port, 1, $mpi_int, $ports, 1, $mpi_int, 0, world
)
return collect(zip(hosts[2:end], ports[2:end]))
else
(actual_port,sock) = listenany(uint16(9009))
port = Cint[actual_port]
result = @eval ccall(
$(mpi_args(:MPI_Gather)),
Cint,
(Ptr{Cchar}, Cint, Ptr{Void}, Ptr{Cchar}, Cint, Ptr{Void}, Cint, Ptr{Void}),
$port, 1, $mpi_int, $port, 1, $mpi_int, 0, world
)
sock.ccb = Base.accept_handler
Base.disable_threaded_libs()
ccall(:jl_install_sigint_handler, Void, ())
global const Scheduler = current_task()
try
Base.check_master_connect(60.0)
Base.event_loop(false)
catch err
print(STDERR, "unhandled exception on $(myid()): $(err)\nexiting.\n")
end
close(sock)
exit(0)
end
end
immutable MPIManager <: ClusterManager
launch::Function
manage::Function
MPIManager() = new(launch_mpi_workers, manage_mpi_workers)
end
# Pretends to launch mpi workers.
# Gets requisite information
launch_mpi_workers(cman::MPIManager, np::Integer, config::Dict) =
(:host_port, map((x) -> tuple(x..., config), transform_mpi_to_workers()))
# Does a little cluster management.
function manage_mpi_workers(id::Integer, config::Dict, op::Symbol)
if op == :interrupt
@spawnat id throw(InterruptException())
end
end
function addprocs_mpi(cman::MPIManager = MPIManager())
if world_size == 1 return end
config={ :dir=>JULIA_HOME,
:exename=>nothing,
:exeflags=>nothing,
:tunnel=>false,
:sshflags=>nothing
}
Base.add_workers(Base.PGRP, Base.start_cluster_workers(world_size - 1, config, cman))
end
macro mpi_barrier(comm)
quote
result = ccall($(mpi_args(:MPI_Barrier)), Cint, (Ptr{Void}, ), $comm)
if result != mpi_success throw(MPIException) end
end
end
addresses = get_ip_addresses()
for rank = 1:world_size
if rank == world_rank println("$world_rank/$world_size: $addresses ") end
@mpi_barrier world
end
addprocs_mpi()
if world_rank == 1
println("Hello")
@spawn println("my world, $world_rank")
print("$(nprocs())/$(workers())")
interrupt()
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment