Last active
January 30, 2022 19:07
-
-
Save jgaskins/14826ec20cf415d74bac40da6f9bd82f to your computer and use it in GitHub Desktop.
Elasticsearch Example in Crystal
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "http" | |
require "json" | |
require "uuid/json" | |
require "db" | |
require "interro" # github: jgaskins/interro | |
require "faker" # github: askn/faker | |
ES = ElasticSearch::Client.new | |
pg = DB.open("postgres:///") | |
Interro.config { |c| c.db = pg } | |
# Delete+create your index if needed | |
# ES.index.delete "users" | |
# ES.index.create "users" | |
# Uncomment this to populate the DB and ES index | |
# pg.exec "DROP TABLE IF EXISTS users" | |
# pg.exec <<-SQL | |
# CREATE TABLE users( | |
# id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
# name TEXT NOT NULL, | |
# created_at TIMESTAMPTZ NOT NULL DEFAULT now() | |
# ) | |
# SQL | |
# 1_000.times do | |
# UserQuery.new.create name: Faker::Name.name | |
# end | |
ES.search("users", "Jamie", as: User) | |
.hits | |
.hits | |
struct User | |
include DB::Serializable # For Interro | |
include JSON::Serializable # For ElasticSearch | |
getter id : UUID | |
getter name : String | |
getter created_at : Time | |
end | |
struct UserQuery < Interro::QueryBuilder(User) | |
table "users" | |
def create(name : String) | |
user = insert name: name | |
ensure | |
if user | |
ES.doc.index "users", id: user.id, doc: user | |
end | |
end | |
end | |
module ElasticSearch | |
class Exception < ::Exception | |
end | |
class Client | |
def self.new | |
new URI.parse("http:///") | |
end | |
def initialize(@uri : URI) | |
@pool = DB::Pool(HTTP::Client).new(max_idle_pool_size: 25) do | |
http = HTTP::Client.new(@uri.host || "localhost", @uri.port || 9200) | |
http.before_request do |request| | |
request.headers.add "content-type", "application/json" | |
request.headers | |
end | |
http | |
end | |
end | |
def index | |
Index.new(self) | |
end | |
def doc | |
Doc.new(self) | |
end | |
def search(index_name : String, simple_query_string query : String, as type : T.class) forall T | |
response = @pool.checkout(&.post("/#{index_name}/_search", body: { | |
query: { | |
simple_query_string: { | |
query: query, | |
default_operator: "and", | |
}, | |
}, | |
}.to_json)) | |
if response.success? | |
SearchResult(T).from_json response.body | |
else | |
raise Exception.new "#{response.status}: #{response.body}" | |
end | |
end | |
protected def post(path : String, body : String) | |
@pool.checkout(&.post(path, body: body)) | |
end | |
protected def put(path : String, body : String? = nil) | |
@pool.checkout(&.put(path, body: body)) | |
end | |
protected def delete(path : String) | |
@pool.checkout(&.delete(path)) | |
end | |
end | |
struct SearchResult(T) | |
include JSON::Serializable | |
@[JSON::Field(converter: ::ElasticSearch::MillisecondsTimeSpan)] | |
getter took : Time::Span | |
getter timed_out : Bool | |
@[JSON::Field(key: "_shards")] | |
getter shards : Shards | |
getter hits : Hits(T) | |
struct Shards | |
include JSON::Serializable | |
getter total : Int64 | |
getter successful : Int64 | |
getter skipped : Int64 | |
getter failed : Int64 | |
end | |
struct Hits(T) | |
include JSON::Serializable | |
getter total : Totals | |
getter max_score : Float64? | |
getter hits : Array(Hit(T)) | |
end | |
struct Totals | |
include JSON::Serializable | |
getter value : Int64 | |
getter relation : String | |
end | |
struct Hit(T) | |
include JSON::Serializable | |
@[JSON::Field(key: "_index")] | |
getter index : String | |
@[JSON::Field(key: "_type")] | |
getter type : String | |
@[JSON::Field(key: "_id")] | |
getter id : String | |
@[JSON::Field(key: "_score")] | |
getter score : Float64 | |
@[JSON::Field(key: "_source")] | |
getter source : T | |
end | |
end | |
struct Doc | |
def initialize(@client : Client) | |
end | |
def index(index_name : String, doc) | |
response = @client.post "/#{index_name}/_doc", doc.to_json | |
if response.success? | |
# TODO: Make this a concrete type | |
JSON.parse(response.body) | |
else | |
raise Exception.new("#{response.status}: #{response.body}") | |
end | |
end | |
def index(index_name : String, id, doc) | |
response = @client.post "/#{index_name}/_doc/#{id}", doc.to_json | |
if response.success? | |
# TODO: Make this a concrete type | |
JSON.parse(response.body) | |
else | |
raise Exception.new("#{response.status}: #{response.body}") | |
end | |
end | |
end | |
struct Index | |
def initialize(@client : Client) | |
end | |
def create(name : String) | |
@client.put "/#{name}" | |
end | |
def delete(name : String) | |
@client.delete "/#{name}" | |
end | |
end | |
module MillisecondsTimeSpan | |
def self.from_json(pull : JSON::PullParser) | |
pull.read_int.milliseconds | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment