Created
June 13, 2018 01:44
-
-
Save takaishi/730ff0ae0ba77edffb2c2e5c4bee879f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/CMakeLists.txt b/CMakeLists.txt | |
index 510202c7..64669273 100644 | |
--- a/CMakeLists.txt | |
+++ b/CMakeLists.txt | |
@@ -48,6 +48,7 @@ option(FLB_SQLDB "Enable SQL embedded DB" No) | |
option(FLB_HTTP_SERVER "Enable HTTP Server" No) | |
option(FLB_BACKTRACE "Enable stacktrace support" Yes) | |
option(FLB_LUAJIT "Enable Lua Scripting support" Yes) | |
+option(FLB_MRUBY "Enable mruby Scripting support" Yes) | |
# Metrics: Experimental Feature, disabled by default on 0.12 series | |
# but enabled in the upcoming 0.13 release. Note that development | |
@@ -121,6 +122,7 @@ option(FLB_FILTER_THROTTLE "Enable throttle filter" Yes) | |
option(FLB_FILTER_RECORD_MODIFIER "Enable record_modifier filter" Yes) | |
option(FLB_FILTER_NEST "Enable nest filter" Yes) | |
option(FLB_FILTER_LUA "Enable Lua scripting filter" Yes) | |
+option(FLB_FILTER_MRUBY "Enable Lua scripting filter" Yes) | |
# Enable all features | |
if(FLB_ALL) | |
@@ -190,6 +192,7 @@ include_directories( | |
lib/sha1 | |
lib/msgpack-2.1.3/include | |
lib/luajit-2.0.5/src/ | |
+ lib/mruby/src/ | |
${MONKEY_INCLUDE_DIR} | |
) | |
@@ -441,6 +444,20 @@ if(FLB_LUAJIT) | |
FLB_DEFINITION(FLB_HAVE_LUAJIT) | |
endif() | |
+if(FLB_MRUBY) | |
+ set(MRUBY_PATH ${CMAKE_CURRENT_SOURCE_DIR}/lib/mruby) | |
+ ExternalProject_Add(mruby | |
+ SOURCE_DIR ${MRUBY_PATH} | |
+ CONFIGURE_COMMAND pwd | |
+ BUILD_COMMAND cd ${MRUBY_PATH} && rake all | |
+ INSTALL_COMMAND cp ${MRUBY_PATH}/build/host/lib/libmruby.a "${CMAKE_CURRENT_BINARY_DIR}/lib/libmruby.a") | |
+ add_library(libmruby STATIC IMPORTED GLOBAL) | |
+ set_target_properties(libmruby PROPERTIES IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/lib/libmruby.a") | |
+ add_dependencies(libmruby mruby) | |
+ include_directories("${CMAKE_CURRENT_SOURCE_DIR}/lib/mruby/include/") | |
+ FLB_DEFINITION(FLB_HAVE_MRUBY) | |
+endif() | |
+ | |
# Pthread Local Storage | |
# ===================== | |
# By default we expect the compiler already support thread local storage | |
diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt | |
index 5aeda7fc..96089a55 100644 | |
--- a/plugins/CMakeLists.txt | |
+++ b/plugins/CMakeLists.txt | |
@@ -159,6 +159,10 @@ if(FLB_LUAJIT) | |
REGISTER_FILTER_PLUGIN("filter_lua") | |
endif() | |
+if(FLB_MRUBY) | |
+ REGISTER_FILTER_PLUGIN("filter_mruby") | |
+endif() | |
+ | |
REGISTER_FILTER_PLUGIN("filter_record_modifier") | |
REGISTER_FILTER_PLUGIN("filter_nest") | |
REGISTER_FILTER_PLUGIN("filter_modify") | |
diff --git a/plugins/filter_mruby/CMakeLists.txt b/plugins/filter_mruby/CMakeLists.txt | |
new file mode 100644 | |
index 00000000..8aeb9303 | |
--- /dev/null | |
+++ b/plugins/filter_mruby/CMakeLists.txt | |
@@ -0,0 +1,5 @@ | |
+set(src | |
+ mruby_config.c | |
+ mruby.c) | |
+ | |
+FLB_PLUGIN(filter_mruby "${src}" "m") | |
diff --git a/plugins/filter_mruby/mruby.c b/plugins/filter_mruby/mruby.c | |
new file mode 100644 | |
index 00000000..e17d656a | |
--- /dev/null | |
+++ b/plugins/filter_mruby/mruby.c | |
@@ -0,0 +1,177 @@ | |
+#include <fluent-bit/flb_config.h> | |
+#include <msgpack.h> | |
+#include <fluent-bit.h> | |
+#include <fluent-bit/flb_time.h> | |
+#include <mruby/hash.h> | |
+#include <mruby/include/mruby/variable.h> | |
+#include <mruby/include/mruby/array.h> | |
+ | |
+#include "mruby_config.h" | |
+ | |
+#define FLB_FILTER_MODIFIED 1 | |
+#define FLB_FILTER_NOTOUCH 2 | |
+ | |
+void mrb_tommsgpack(mrb_state *state, mrb_value value, msgpack_packer *pck) | |
+{ | |
+ enum mrb_vtype type = mrb_type(value); | |
+ | |
+ if (mrb_undef_p(value) || mrb_nil_p(value)) { | |
+ printf("undef or nil"); | |
+ } | |
+ | |
+ switch (type) { | |
+ case MRB_TT_STRING: { | |
+ char *c = RSTRING_PTR(value); | |
+ msgpack_pack_str(pck, strlen(c)); | |
+ msgpack_pack_str_body(pck, c, strlen(c)); | |
+ break; | |
+ } | |
+ case MRB_TT_HASH: { | |
+ mrb_value keys = mrb_hash_keys(state, value); | |
+ int len = RARRAY_LEN(keys); | |
+ msgpack_pack_map(pck, len); | |
+ for (int i = 0; i < len; i++) { | |
+ mrb_value key = mrb_ary_ref(state, keys, i); | |
+ mrb_tommsgpack(state, key, pck); | |
+ mrb_tommsgpack(state, mrb_hash_get(state, value, key), pck); | |
+ } | |
+ break; | |
+ } | |
+ } | |
+} | |
+ | |
+mrb_value msgpack_obj_to_mrb_value(mrb_state *mrb, msgpack_object *record) | |
+{ | |
+ int size, i; | |
+ char *s; | |
+ mrb_value mrb_v; | |
+ | |
+ switch(record->type) { | |
+ case MSGPACK_OBJECT_STR: | |
+ s = flb_malloc(record->via.str.size); | |
+ strncpy(s, record->via.str.ptr, record->via.str.size); | |
+ s[record->via.str.size] = '\0'; | |
+ mrb_v = mrb_str_new_cstr(mrb, s); | |
+ break; | |
+ case MSGPACK_OBJECT_MAP: | |
+ size = record->via.map.size; | |
+ if (size != 0) { | |
+ msgpack_object_kv *p = record->via.map.ptr; | |
+ for (i = 0; i < size; i++) { | |
+ msgpack_object *key = &(p+i)->key; | |
+ msgpack_object *val = &(p+i)->val; | |
+ mrb_v = mrb_hash_new(mrb); | |
+ mrb_hash_set(mrb, mrb_v, msgpack_obj_to_mrb_value(mrb, key), msgpack_obj_to_mrb_value(mrb, val)); | |
+ } | |
+ } | |
+ break; | |
+ default: | |
+ break; | |
+ } | |
+ return mrb_v; | |
+} | |
+ | |
+static int cb_mruby_init(struct flb_filter_instance *f_ins, | |
+ struct flb_config *config, | |
+ void *data) | |
+{ | |
+ struct mruby_filter *ctx; | |
+ struct mf_t *mf; | |
+ mrb_value obj; | |
+ | |
+ // Create mrb_state | |
+ mf = flb_calloc(1, sizeof(struct mf_t)); | |
+ mf->mrb_state = mrb_open(); | |
+ mf->mrb_state->ud = mf; | |
+ | |
+ // Create context | |
+ ctx = flb_calloc(1, sizeof(struct mruby_filter)); | |
+ ctx->mf = mf; | |
+ ctx->call = flb_filter_get_property("call", f_ins); | |
+ | |
+ // Load mruby script | |
+ FILE* fp = fopen(flb_filter_get_property("script", f_ins), "r"); | |
+ obj = mrb_load_file(mf->mrb_state, fp); | |
+ ctx->mf->obj = obj; | |
+ fclose(fp); | |
+ | |
+ // Set context | |
+ flb_filter_set_context(f_ins, ctx); | |
+ | |
+ return 0; | |
+} | |
+ | |
+static int cb_mruby_filter(void *data, size_t bytes, | |
+ char *tag, int tag_len, | |
+ void **out_buf, size_t *out_bytes, | |
+ struct flb_filter_instance *f_ins, | |
+ void *filter_context, | |
+ struct flb_config *config) | |
+{ | |
+ struct mruby_filter *ctx = filter_context; | |
+ | |
+ size_t off = 0; | |
+ double ts; | |
+ struct flb_time t; | |
+ msgpack_object *p; | |
+ msgpack_sbuffer tmp_sbuf; | |
+ msgpack_packer tmp_pck; | |
+ msgpack_unpacked result; | |
+ | |
+ msgpack_sbuffer_init(&tmp_sbuf); | |
+ msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); | |
+ | |
+ msgpack_unpacked_init(&result); | |
+ while (msgpack_unpack_next(&result, data, bytes, &off)) { | |
+ msgpack_packer data_pck; | |
+ msgpack_sbuffer data_sbuf; | |
+ mrb_state *mrb_state; | |
+ mrb_value value; | |
+ | |
+ mrb_state = ctx->mf->mrb_state; | |
+ | |
+ msgpack_sbuffer_init(&data_sbuf); | |
+ msgpack_packer_init(&data_pck, &data_sbuf, msgpack_sbuffer_write); | |
+ | |
+ flb_time_pop_from_msgpack(&t, &result, &p); | |
+ ts = flb_time_to_double(&t); | |
+ | |
+ ctx->mf->ts = ts; | |
+ ctx->mf->tag = tag; | |
+ ctx->mf->record = p; | |
+ | |
+ value = mrb_funcall(mrb_state, ctx->mf->obj, ctx->call, 3, mrb_str_new_cstr(mrb_state, tag), mrb_float_value(mrb_state, ts), msgpack_obj_to_mrb_value(mrb_state, p)); | |
+ | |
+ msgpack_pack_array(&tmp_pck, 2); | |
+ flb_time_from_double(&t, ts); | |
+ flb_time_append_to_msgpack(&t, &tmp_pck, 0); | |
+ mrb_tommsgpack(mrb_state, value, &tmp_pck); | |
+ | |
+ msgpack_sbuffer_destroy(&data_sbuf); | |
+ } | |
+ msgpack_unpacked_destroy(&result); | |
+ | |
+ *out_buf = tmp_sbuf.data; | |
+ *out_bytes = tmp_sbuf.size; | |
+ | |
+ return FLB_FILTER_MODIFIED; | |
+} | |
+ | |
+static int cb_mruby_exit(void *data, struct flb_config *config) | |
+{ | |
+ struct mruby_filter *ctx; | |
+ | |
+ ctx = data; | |
+ mrb_close(ctx->mf->mrb_state); | |
+ free(ctx->mf); | |
+ return 0; | |
+} | |
+ | |
+struct flb_filter_plugin filter_mruby_plugin = { | |
+ .name = "mruby", | |
+ .description = "mruby Scriptiong Filter", | |
+ .cb_init = cb_mruby_init, | |
+ .cb_filter = cb_mruby_filter, | |
+ .cb_exit = cb_mruby_exit, | |
+ .flags = 0 | |
+}; | |
\ No newline at end of file | |
diff --git a/plugins/filter_mruby/mruby_config.c b/plugins/filter_mruby/mruby_config.c | |
new file mode 100644 | |
index 00000000..d5e24c6c | |
--- /dev/null | |
+++ b/plugins/filter_mruby/mruby_config.c | |
@@ -0,0 +1 @@ | |
+#include "mruby_config.h" | |
\ No newline at end of file | |
diff --git a/plugins/filter_mruby/mruby_config.h b/plugins/filter_mruby/mruby_config.h | |
new file mode 100644 | |
index 00000000..02ef4280 | |
--- /dev/null | |
+++ b/plugins/filter_mruby/mruby_config.h | |
@@ -0,0 +1,25 @@ | |
+#include <fluent-bit/flb_info.h> | |
+#include <fluent-bit/flb_filter.h> | |
+#include <fluent-bit/flb_luajit.h> | |
+#include <fluent-bit/flb_sds.h> | |
+ | |
+#include "mruby.h" | |
+#include "mruby/compile.h" | |
+#include "mruby/string.h" | |
+ | |
+typedef struct mf_t { | |
+ double ts; | |
+ char *tag; | |
+ msgpack_object *record; | |
+ mrb_value obj; | |
+ struct mrb_state *mrb_state; | |
+ | |
+} mf; | |
+ | |
+struct mruby_filter { | |
+ flb_sds_t call; | |
+ struct mf_t *mf; | |
+}; | |
+ | |
+MRB_API mrb_value mrb_str_new_cstr(mrb_state*, const char*); | |
+mrb_value msgpack_obj_to_mrb_value(mrb_state*, msgpack_object*); | |
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt | |
index 4796ce5e..7a7be35b 100644 | |
--- a/src/CMakeLists.txt | |
+++ b/src/CMakeLists.txt | |
@@ -136,6 +136,12 @@ if(FLB_LUAJIT) | |
"libluajit") | |
endif() | |
+if(FLB_MRUBY) | |
+ set(extra_libs | |
+ ${extra_libs} | |
+ "libmruby") | |
+endif() | |
+ | |
if(FLB_SQLDB) | |
set(src | |
${src} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment