Last active
August 29, 2015 14:17
-
-
Save BastienDurel/790577ae3dca57dff4c7 to your computer and use it in GitHub Desktop.
inproc test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// -*- mode: c++ -*- | |
#if !defined TEST_INPROC_COMMON_INCLUDED | |
#define TEST_INPROC_COMMON_INCLUDED 1 | |
#include <stdexcept> | |
#include <cstring> | |
#include <zmq.h> | |
#if ZMQ_VERSION_MAJOR < 4 | |
#define zmq_ctx_term zmq_ctx_destroy | |
#endif | |
#if __cplusplus < 201103L | |
#define constexpr const | |
#endif | |
#define WM_VALUE 100000000 | |
extern void * ctx; | |
inline void ERR(std::string&& err) throw(std::exception) { | |
err.append(": ").append(strerror(errno)); | |
throw std::runtime_error(err.c_str()); | |
} | |
struct msg { | |
zmq_msg_t _m; | |
msg() { int rc = zmq_msg_init (&_m); if (rc != 0) ERR("zmq_msg_init"); } | |
explicit msg(int size) { int rc = zmq_msg_init_size (&_m, size); if (rc != 0) ERR("zmq_msg_init_size"); } | |
explicit msg(size_t size) { int rc = zmq_msg_init_size (&_m, (int)size); if (rc != 0) ERR("zmq_msg_init_size"); } | |
msg(const msg&) = delete; | |
~msg() { int rc = zmq_msg_close (&_m); if (rc != 0) ERR("zmq_msg_close"); } | |
operator zmq_msg_t*() { return &_m; } | |
}; | |
inline void set_keepalive_options(void* sock) { | |
#if defined DOKEEPALIVE && DOKEEPALIVE > 0 | |
int rc; | |
const int keepalive = 1; // use TCP SO_KEEPALIVE | |
const int ka_idle = 1; // Start keeplives after this period | |
const int ka_cnt = 1; // Interval between keepalives | |
const int ka_int = 1; // Number of keepalives before death | |
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE, &keepalive, sizeof(keepalive)); | |
if (rc != 0) | |
ERR("zmq_setsockopt"); | |
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE_IDLE, &ka_idle, sizeof(ka_idle)); | |
if (rc != 0) | |
ERR("zmq_setsockopt"); | |
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE_CNT, &ka_cnt, sizeof(ka_cnt)); | |
if (rc != 0) | |
ERR("zmq_setsockopt"); | |
rc = zmq_setsockopt (sock, ZMQ_TCP_KEEPALIVE_INTVL, &ka_int, sizeof(ka_int)); | |
if (rc != 0) | |
ERR("zmq_setsockopt"); | |
#endif | |
} | |
inline void* connect_socket(const char* endpoint) { | |
auto sock = zmq_socket(ctx, ZMQ_PUSH); | |
if (!sock) | |
ERR("zmq_socket"); | |
const int wm = WM_VALUE; | |
int rc = zmq_setsockopt(sock, ZMQ_SNDHWM, &wm, sizeof(wm)); | |
if (rc != 0) | |
ERR("zmq_setsockopt"); | |
set_keepalive_options(sock); | |
rc = zmq_connect(sock, endpoint); | |
if (rc != 0) | |
ERR("zmq_connect"); | |
return sock; | |
} | |
inline void* connect_socket_pull(const char* endpoint) { | |
auto sock = zmq_socket(ctx, ZMQ_PULL); | |
if (!sock) | |
ERR("zmq_socket"); | |
const int wm = WM_VALUE; | |
int rc = zmq_setsockopt(sock, ZMQ_SNDHWM, &wm, sizeof(wm)); | |
if (rc != 0) | |
ERR("zmq_setsockopt"); | |
set_keepalive_options(sock); | |
rc = zmq_connect(sock, endpoint); | |
if (rc != 0) | |
ERR("zmq_connect"); | |
return sock; | |
} | |
inline void* connect_socket_push(const char* endpoint) { | |
return connect_socket(endpoint); | |
} | |
inline void* bind_socket(const char* endpoint) { | |
auto sock = zmq_socket(ctx, ZMQ_PULL); | |
if (!sock) | |
ERR("zmq_socket"); | |
const int wm = WM_VALUE; | |
int rc = zmq_setsockopt(sock, ZMQ_RCVHWM, &wm, sizeof(wm)); | |
if (rc != 0) | |
ERR("zmq_setsockopt"); | |
set_keepalive_options(sock); | |
rc = zmq_bind(sock, endpoint); | |
if (rc != 0) | |
ERR("zmq_connect"); | |
return sock; | |
} | |
inline void* bind_socket_pull(const char* endpoint) { | |
return bind_socket(endpoint); | |
} | |
inline void* bind_socket_push(const char* endpoint) { | |
auto sock = zmq_socket(ctx, ZMQ_PUSH); | |
if (!sock) | |
ERR("zmq_socket"); | |
const int wm = WM_VALUE; | |
int rc = zmq_setsockopt(sock, ZMQ_RCVHWM, &wm, sizeof(wm)); | |
if (rc != 0) | |
ERR("zmq_setsockopt"); | |
set_keepalive_options(sock); | |
rc = zmq_bind(sock, endpoint); | |
if (rc != 0) | |
ERR("zmq_connect"); | |
return sock; | |
} | |
#endif// ~TEST_INPROC_COMMON_INCLUDED |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="utf-8"?> | |
<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> | |
<ItemGroup Label="ProjectConfigurations"> | |
<ProjectConfiguration Include="Debug|x64"> | |
<Configuration>Debug</Configuration> | |
<Platform>x64</Platform> | |
</ProjectConfiguration> | |
<ProjectConfiguration Include="Release|x64"> | |
<Configuration>Release</Configuration> | |
<Platform>x64</Platform> | |
</ProjectConfiguration> | |
</ItemGroup> | |
<PropertyGroup Label="Globals"> | |
<ProjectGuid>{27E6F8CB-2F11-49C0-83E3-4C5AE66157D0}</ProjectGuid> | |
<Keyword>Win32Proj</Keyword> | |
<RootNamespace>inproc</RootNamespace> | |
</PropertyGroup> | |
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" /> | |
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration"> | |
<ConfigurationType>Application</ConfigurationType> | |
<UseDebugLibraries>true</UseDebugLibraries> | |
<PlatformToolset>v120</PlatformToolset> | |
<CharacterSet>Unicode</CharacterSet> | |
</PropertyGroup> | |
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration"> | |
<ConfigurationType>Application</ConfigurationType> | |
<UseDebugLibraries>false</UseDebugLibraries> | |
<PlatformToolset>v120</PlatformToolset> | |
<WholeProgramOptimization>true</WholeProgramOptimization> | |
<CharacterSet>Unicode</CharacterSet> | |
</PropertyGroup> | |
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" /> | |
<ImportGroup Label="ExtensionSettings"> | |
</ImportGroup> | |
<ImportGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="PropertySheets"> | |
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" /> | |
</ImportGroup> | |
<ImportGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="PropertySheets"> | |
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" /> | |
</ImportGroup> | |
<PropertyGroup Label="UserMacros" /> | |
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> | |
<LinkIncremental>true</LinkIncremental> | |
<IncludePath>C:\Program Files\ZeroMQ 4.0.4\include;$(IncludePath)</IncludePath> | |
<LibraryPath>C:\Program Files\ZeroMQ 4.0.4\lib;$(LibraryPath)</LibraryPath> | |
</PropertyGroup> | |
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> | |
<LinkIncremental>false</LinkIncremental> | |
<IncludePath>C:\Program Files\ZeroMQ 4.0.4\include;$(IncludePath)</IncludePath> | |
<LibraryPath>C:\Program Files\ZeroMQ 4.0.4\lib;$(LibraryPath)</LibraryPath> | |
</PropertyGroup> | |
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> | |
<ClCompile> | |
<PrecompiledHeader> | |
</PrecompiledHeader> | |
<WarningLevel>Level3</WarningLevel> | |
<Optimization>Disabled</Optimization> | |
<PreprocessorDefinitions>WIN32;_DEBUG;_CONSOLE;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> | |
</ClCompile> | |
<Link> | |
<SubSystem>Console</SubSystem> | |
<GenerateDebugInformation>true</GenerateDebugInformation> | |
<AdditionalDependencies>libzmq-v120-mt-gd-4_0_4.lib;%(AdditionalDependencies)</AdditionalDependencies> | |
</Link> | |
</ItemDefinitionGroup> | |
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> | |
<ClCompile> | |
<WarningLevel>Level3</WarningLevel> | |
<PrecompiledHeader> | |
</PrecompiledHeader> | |
<Optimization>MaxSpeed</Optimization> | |
<FunctionLevelLinking>true</FunctionLevelLinking> | |
<IntrinsicFunctions>true</IntrinsicFunctions> | |
<PreprocessorDefinitions>WIN32;NDEBUG;_CONSOLE;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> | |
</ClCompile> | |
<Link> | |
<SubSystem>Console</SubSystem> | |
<GenerateDebugInformation>true</GenerateDebugInformation> | |
<EnableCOMDATFolding>true</EnableCOMDATFolding> | |
<OptimizeReferences>true</OptimizeReferences> | |
<AdditionalDependencies>libzmq-v120-mt-4_0_4.lib;%(AdditionalDependencies)</AdditionalDependencies> | |
</Link> | |
</ItemDefinitionGroup> | |
<ItemGroup> | |
<Text Include="ReadMe.txt" /> | |
</ItemGroup> | |
<ItemGroup> | |
<ClInclude Include="stdafx.h" /> | |
<ClInclude Include="targetver.h" /> | |
</ItemGroup> | |
<ItemGroup> | |
<ClCompile Include="main.cpp" /> | |
</ItemGroup> | |
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> | |
<ImportGroup Label="ExtensionTargets"> | |
</ImportGroup> | |
</Project> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <chrono> | |
#include <thread> | |
#include <memory> | |
#include <atomic> | |
#include <cassert> | |
#include "common.h" | |
#if !defined NUM_THREADS | |
constexpr int NUM_THREADS { 2 }; | |
#endif | |
volatile bool thr_start { false }; | |
std::atomic_int init_count { 0 }; | |
#if defined WORK | |
#define MSG_COUNT 5000 | |
#else | |
#define MSG_COUNT 10000000 | |
#endif | |
void* ctx { nullptr }; | |
constexpr const char * endpoint { "inproc://foo" }; | |
void push_worker(int num); | |
void pull_worker(int& count); | |
int main() | |
{ | |
ctx = zmq_ctx_new(); | |
assert(ctx); | |
std::thread pull[NUM_THREADS]; | |
int count[NUM_THREADS] = {0}; | |
for (int i = 0; i < NUM_THREADS; ++i) { | |
pull[i] = std::thread(pull_worker, std::ref(count[i])); | |
} | |
std::thread push(push_worker, MSG_COUNT); | |
constexpr int to_init = NUM_THREADS + 1; | |
while (init_count.load() < to_init) | |
std::this_thread::yield(); | |
auto start = std::chrono::high_resolution_clock::now(); | |
thr_start = true; | |
std::this_thread::yield(); | |
push.join(); | |
for (int i = 0; i < NUM_THREADS; ++i) { | |
pull[i].join(); | |
} | |
int rc = zmq_ctx_term(ctx); | |
if (rc != 0) | |
ERR("zmq_ctx_term"); | |
auto end = std::chrono::high_resolution_clock::now(); | |
using namespace std::chrono; | |
std::cout << "done in " | |
<< duration_cast<milliseconds>(end - start).count() | |
<< " ms for " << NUM_THREADS << " threads" << std::endl; | |
std::cout << "Counts: " << std::endl; | |
int total = 0; | |
for (int i = 0; i < NUM_THREADS; ++i) { | |
total += count[i]; | |
std::cout << "Thread# " << i << ": " << count[i] << std::endl; | |
} | |
std::cout << "Total: " << total << std::endl; | |
#if defined _MSC_VER | |
int k; std::cin >> k; | |
#endif | |
return 0; | |
} | |
void _push_worker(int num) { | |
auto sock = bind_socket_push(endpoint); | |
const std::string hello{"hello"}; | |
int rc { 0 }; | |
init_count++; | |
while (!thr_start) | |
;// spinlock without yield, should start first | |
for (int i = 0; i < num; ++i) { | |
msg message{hello.size()}; | |
memmove(zmq_msg_data(message), hello.c_str(), hello.size()); | |
const int size = zmq_msg_size(message); | |
rc = zmq_msg_send(message, sock, 0); | |
if (rc != size) | |
ERR("zmq_msg_send"); | |
} | |
const std::string end{"end"}; | |
for (int i = 0; i < NUM_THREADS; ++i) { | |
msg message{end.size()}; | |
memmove(zmq_msg_data(message), end.c_str(), end.size()); | |
const int size = zmq_msg_size(message); | |
rc = zmq_msg_send(message, sock, 0); | |
if (rc != size) | |
ERR("zmq_msg_send"); | |
} | |
rc = zmq_close(sock); | |
if (rc != 0) | |
ERR("zmq_close"); | |
} | |
void push_worker(int num) { | |
try { | |
_push_worker(num); | |
} | |
catch (const std::exception& ex) { | |
std::cerr << "[front_worker]" << ex.what() << std::endl; | |
} | |
} | |
void _pull_worker(int& count) { | |
auto sock = connect_socket_pull(endpoint); | |
int rc { 0 }; | |
int lin = 200; | |
bool run = true; | |
rc = zmq_setsockopt(sock, ZMQ_LINGER, &lin, sizeof(lin)); | |
if (rc != 0) | |
ERR("zmq_setsockopt"); | |
init_count++; | |
while (!thr_start) | |
std::this_thread::yield(); | |
while (run) { | |
zmq_msg_t message; | |
rc = zmq_msg_init(&message); | |
if (rc != 0) | |
ERR("zmq_msg_init"); | |
//constexpr int flags = ZMQ_DONTWAIT; | |
constexpr int flags = 0; | |
rc = zmq_msg_recv(&message, sock, flags); | |
if (rc < 0) { | |
if (errno == ETERM) | |
break; | |
if (errno != EAGAIN) | |
ERR("zmq_msg_recv"); | |
rc = zmq_msg_close (&message); | |
if (rc != 0) | |
ERR("zmq_msg_close"); | |
std::this_thread::yield(); | |
continue; | |
} | |
char* m = static_cast<char*>(zmq_msg_data(&message)); | |
if (rc == 3 && strncmp(m, "end", 3) == 0) { | |
run = false; | |
} | |
else | |
++count; | |
#if defined WORK | |
std::this_thread::sleep_for(std::chrono::milliseconds{1}); | |
#endif | |
rc = zmq_msg_close (&message); | |
if (rc != 0) | |
ERR("zmq_msg_close"); | |
} | |
rc = zmq_close(sock); | |
if (rc != 0) | |
ERR("zmq_close"); | |
} | |
void pull_worker(int& count) { | |
try { | |
_pull_worker(count); | |
} | |
catch (const std::exception& ex) { | |
std::cerr << "[pull_worker]" << ex.what() << std::endl; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SRC=main.cpp | |
OBJ=$(SRC:.cpp=.o) | |
OPT=-g -O2 | |
ifneq ($(THREADS),) | |
OPT+=-DNUM_THREADS=$(THREADS) | |
endif | |
ifeq ($(WORK),1) | |
OPT+=-DWORK | |
endif | |
CXXFLAGS=$(shell pkg-config --cflags libzmq) --std=c++11 $(OPT) | |
LDFLAGS=$(shell pkg-config --libs libzmq) $(OPT) | |
NAME=inproc-test | |
all: $(NAME) | |
test: $(NAME) | |
./$(NAME) | |
$(NAME): $(OBJ) | |
$(CXX) -o $@ $(OBJ) $(LDFLAGS) | |
$(OBJ): common.h | |
clean: | |
rm -f $(OBJ) *~ | |
fclean: clean | |
rm -f $(NAME) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment