Skip to content

Instantly share code, notes, and snippets.

@jgaskins
Last active January 30, 2022 19:07
Show Gist options
  • Save jgaskins/14826ec20cf415d74bac40da6f9bd82f to your computer and use it in GitHub Desktop.
Save jgaskins/14826ec20cf415d74bac40da6f9bd82f to your computer and use it in GitHub Desktop.
Elasticsearch Example in Crystal
require "http"
require "json"
require "uuid/json"
require "db"
require "interro" # github: jgaskins/interro
require "faker" # github: askn/faker
ES = ElasticSearch::Client.new
pg = DB.open("postgres:///")
Interro.config { |c| c.db = pg }
# Delete+create your index if needed
# ES.index.delete "users"
# ES.index.create "users"
# Uncomment this to populate the DB and ES index
# pg.exec "DROP TABLE IF EXISTS users"
# pg.exec <<-SQL
# CREATE TABLE users(
# id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
# name TEXT NOT NULL,
# created_at TIMESTAMPTZ NOT NULL DEFAULT now()
# )
# SQL
# 1_000.times do
# UserQuery.new.create name: Faker::Name.name
# end
ES.search("users", "Jamie", as: User)
.hits
.hits
struct User
include DB::Serializable # For Interro
include JSON::Serializable # For ElasticSearch
getter id : UUID
getter name : String
getter created_at : Time
end
struct UserQuery < Interro::QueryBuilder(User)
table "users"
def create(name : String)
user = insert name: name
ensure
if user
ES.doc.index "users", id: user.id, doc: user
end
end
end
module ElasticSearch
class Exception < ::Exception
end
class Client
def self.new
new URI.parse("http:///")
end
def initialize(@uri : URI)
@pool = DB::Pool(HTTP::Client).new(max_idle_pool_size: 25) do
http = HTTP::Client.new(@uri.host || "localhost", @uri.port || 9200)
http.before_request do |request|
request.headers.add "content-type", "application/json"
request.headers
end
http
end
end
def index
Index.new(self)
end
def doc
Doc.new(self)
end
def search(index_name : String, simple_query_string query : String, as type : T.class) forall T
response = @pool.checkout(&.post("/#{index_name}/_search", body: {
query: {
simple_query_string: {
query: query,
default_operator: "and",
},
},
}.to_json))
if response.success?
SearchResult(T).from_json response.body
else
raise Exception.new "#{response.status}: #{response.body}"
end
end
protected def post(path : String, body : String)
@pool.checkout(&.post(path, body: body))
end
protected def put(path : String, body : String? = nil)
@pool.checkout(&.put(path, body: body))
end
protected def delete(path : String)
@pool.checkout(&.delete(path))
end
end
struct SearchResult(T)
include JSON::Serializable
@[JSON::Field(converter: ::ElasticSearch::MillisecondsTimeSpan)]
getter took : Time::Span
getter timed_out : Bool
@[JSON::Field(key: "_shards")]
getter shards : Shards
getter hits : Hits(T)
struct Shards
include JSON::Serializable
getter total : Int64
getter successful : Int64
getter skipped : Int64
getter failed : Int64
end
struct Hits(T)
include JSON::Serializable
getter total : Totals
getter max_score : Float64?
getter hits : Array(Hit(T))
end
struct Totals
include JSON::Serializable
getter value : Int64
getter relation : String
end
struct Hit(T)
include JSON::Serializable
@[JSON::Field(key: "_index")]
getter index : String
@[JSON::Field(key: "_type")]
getter type : String
@[JSON::Field(key: "_id")]
getter id : String
@[JSON::Field(key: "_score")]
getter score : Float64
@[JSON::Field(key: "_source")]
getter source : T
end
end
struct Doc
def initialize(@client : Client)
end
def index(index_name : String, doc)
response = @client.post "/#{index_name}/_doc", doc.to_json
if response.success?
# TODO: Make this a concrete type
JSON.parse(response.body)
else
raise Exception.new("#{response.status}: #{response.body}")
end
end
def index(index_name : String, id, doc)
response = @client.post "/#{index_name}/_doc/#{id}", doc.to_json
if response.success?
# TODO: Make this a concrete type
JSON.parse(response.body)
else
raise Exception.new("#{response.status}: #{response.body}")
end
end
end
struct Index
def initialize(@client : Client)
end
def create(name : String)
@client.put "/#{name}"
end
def delete(name : String)
@client.delete "/#{name}"
end
end
module MillisecondsTimeSpan
def self.from_json(pull : JSON::PullParser)
pull.read_int.milliseconds
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment