Last active
December 29, 2015 14:29
-
-
Save mdavezac/7684651 to your computer and use it in GitHub Desktop.
Trying out julia + mpi
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const openmpi = find_library(["libmpi.so", "libmpi.dylib"]) | |
const mpich = find_library(["libmpich.so", "libmpich.dylib"]) | |
if openmpi != "" | |
function init_openmpi() | |
l = Cint[0] | |
handle = dlopen(openmpi, RTLD_GLOBAL) | |
mpi_init = dlsym(handle, :MPI_Init) | |
mpi_finalize_symbol = dlsym(handle, :MPI_Finalize) | |
a = ccall(mpi_init, Int, (Ptr{Cint}, Ptr{Cint}), l, l) | |
function mpi_finalize() | |
a = ccall(mpi_finalize_symbol, Int, ()) | |
dlclose(handle) | |
end | |
atexit(mpi_finalize) | |
return handle | |
end | |
const openmpi_handle = init_openmpi() | |
for name = [:int, :long, :float, :double, :char, :unsigned_char, :unsigned_short] | |
@eval const $(parse("mpi_$name")) = cglobal(dlsym(openmpi_handle, $(parse(":ompi_mpi_$name")))) | |
end | |
for name = [:world, :self] | |
@eval const $name = cglobal(dlsym(openmpi_handle, $(parse(":ompi_mpi_comm_$name")))) | |
end | |
# Creates first argument to ccall | |
function mpi_args(name::Symbol) | |
return dlsym(openmpi_handle, name) | |
end | |
const mpi_success = 0 | |
elseif mpich != "" | |
function init_mpich() | |
l = Cint[0] | |
@eval begin | |
a = @eval ccall((:MPI_Init, $mpich), Int, (Ptr{Cint}, Ptr{Cint}), l, l) | |
function mpi_finalize() | |
a = ccall((:MPI_Finalize, $mpich), Int, ()) | |
end | |
end | |
atexit(mpi_finalize) | |
end | |
init_mpich() | |
const world = 0x44000000 | |
# Creates first argument to ccall | |
function mpi_args(name::Symbol) | |
return (name, "libmpich.so") | |
end | |
end | |
type MPIException <: Exception end | |
@eval begin | |
function _world_size() | |
l = Cint[0] | |
result = ccall($(mpi_args(:MPI_Comm_size)), Cint, (Ptr{Void}, Ptr{Cint}), $world, l) | |
if result != mpi_success throw(MPIException) end | |
return l[1] | |
end | |
function _world_rank() | |
l = Cint[0] | |
result = ccall($(mpi_args(:MPI_Comm_rank)), Cint, (Ptr{Void}, Ptr{Cint}), $world, l) | |
if result != mpi_success throw(MPIException) end | |
return l[1] + 1 | |
end | |
end | |
const world_rank = _world_rank() | |
const world_size = _world_size() | |
# Collects ip addresses across all processes | |
function get_ip_addresses() | |
# Address of the machine | |
address = "$(string(Base.getipaddr()))\0" | |
# Send size of the data | |
ownsize = Cint[convert(Cint, length(address))] | |
allsizes = Cint[0 for i = 1:world_size] | |
result = @eval ccall( | |
$(mpi_args(:MPI_Allgather)), | |
Cint, | |
(Ptr{Cint}, Cint, Ptr{Void}, Ptr{Cint}, Cint, Ptr{Void}, Ptr{Void}), | |
$ownsize, 1, $mpi_int, $allsizes, 1, $mpi_int, world | |
) | |
if result != mpi_success throw(MPIException) end | |
# Now collect addresses | |
maxsize = maximum(allsizes) | |
addresses = repeat("\0", maxsize*world_size) | |
result = @eval ccall( | |
$(mpi_args(:MPI_Allgather)), | |
Cint, | |
(Ptr{Cchar}, Cint, Ptr{Void}, Ptr{Cchar}, Cint, Ptr{Void}, Ptr{Void}), | |
$address, $maxsize, $mpi_char, $addresses, $maxsize, $mpi_char, world | |
) | |
if result != mpi_success throw(MPIException) end | |
result = String[] | |
j = 1 | |
for i = 1:world_size | |
address = addresses[j:j+allsizes[i]-1] | |
j += allsizes[i] | |
push!(result, address[1:findfirst(address, '\0')-1]) | |
end | |
result | |
end | |
# Start mpi workers | |
# If a worker, then only returns at end of program. | |
# If root node, then returns ports and hostnames | |
function transform_mpi_to_workers() | |
hosts = get_ip_addresses() | |
if world_rank == 1 | |
port = Cint[0] | |
ports = Cint[0 for i = 1: world_size] | |
result = @eval ccall( | |
$(mpi_args(:MPI_Gather)), | |
Cint, | |
(Ptr{Cchar}, Cint, Ptr{Void}, Ptr{Cchar}, Cint, Ptr{Void}, Cint, Ptr{Void}), | |
$port, 1, $mpi_int, $ports, 1, $mpi_int, 0, world | |
) | |
return collect(zip(hosts[2:end], ports[2:end])) | |
else | |
(actual_port,sock) = listenany(uint16(9009)) | |
port = Cint[actual_port] | |
result = @eval ccall( | |
$(mpi_args(:MPI_Gather)), | |
Cint, | |
(Ptr{Cchar}, Cint, Ptr{Void}, Ptr{Cchar}, Cint, Ptr{Void}, Cint, Ptr{Void}), | |
$port, 1, $mpi_int, $port, 1, $mpi_int, 0, world | |
) | |
sock.ccb = Base.accept_handler | |
Base.disable_threaded_libs() | |
ccall(:jl_install_sigint_handler, Void, ()) | |
global const Scheduler = current_task() | |
try | |
Base.check_master_connect(60.0) | |
Base.event_loop(false) | |
catch err | |
print(STDERR, "unhandled exception on $(myid()): $(err)\nexiting.\n") | |
end | |
close(sock) | |
exit(0) | |
end | |
end | |
immutable MPIManager <: ClusterManager | |
launch::Function | |
manage::Function | |
MPIManager() = new(launch_mpi_workers, manage_mpi_workers) | |
end | |
# Pretends to launch mpi workers. | |
# Gets requisite information | |
launch_mpi_workers(cman::MPIManager, np::Integer, config::Dict) = | |
(:host_port, map((x) -> tuple(x..., config), transform_mpi_to_workers())) | |
# Does a little cluster management. | |
function manage_mpi_workers(id::Integer, config::Dict, op::Symbol) | |
if op == :interrupt | |
@spawnat id throw(InterruptException()) | |
end | |
end | |
function addprocs_mpi(cman::MPIManager = MPIManager()) | |
if world_size == 1 return end | |
config={ :dir=>JULIA_HOME, | |
:exename=>nothing, | |
:exeflags=>nothing, | |
:tunnel=>false, | |
:sshflags=>nothing | |
} | |
Base.add_workers(Base.PGRP, Base.start_cluster_workers(world_size - 1, config, cman)) | |
end | |
macro mpi_barrier(comm) | |
quote | |
result = ccall($(mpi_args(:MPI_Barrier)), Cint, (Ptr{Void}, ), $comm) | |
if result != mpi_success throw(MPIException) end | |
end | |
end | |
addresses = get_ip_addresses() | |
for rank = 1:world_size | |
if rank == world_rank println("$world_rank/$world_size: $addresses ") end | |
@mpi_barrier world | |
end | |
addprocs_mpi() | |
if world_rank == 1 | |
println("Hello") | |
@spawn println("my world, $world_rank") | |
print("$(nprocs())/$(workers())") | |
interrupt() | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment