Created
July 28, 2020 04:01
-
-
Save koolay/19f4434143aaf9b1d5c7c8410f0c31d3 to your computer and use it in GitHub Desktop.
demo of arrow-flight+dremio+vaex
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import namedtuple | |
import vaex | |
import time | |
import orjson | |
import os | |
import psutil | |
from pyarrow import flight | |
import pyarrow as pa | |
from graphql.error import GraphQLError, format_error as format_error_default | |
from graphql.execution import ExecutionResult, execute | |
from typing import Any, Callable, Collection, Dict, List, Optional, Type, Union | |
# import vaex, pandas as pd | |
# df_pandas = pd.from_csv('test.csv') | |
FormattedResult = namedtuple("FormattedResult", "result status_code") | |
ServerResponse = namedtuple("ServerResponse", "body status_code") | |
GraphQLResponse = namedtuple("GraphQLResponse", "results params") | |
process = psutil.Process(os.getpid()) | |
class HttpDremioClientAuthHandler(flight.ClientAuthHandler): | |
def __init__(self, username, password): | |
super(flight.ClientAuthHandler, self).__init__() | |
self.basic_auth = flight.BasicAuth(username, password) | |
self.token = None | |
def authenticate(self, outgoing, incoming): | |
auth = self.basic_auth.serialize() | |
outgoing.write(auth) | |
self.token = incoming.read() | |
def get_token(self): | |
return self.token | |
def read_from_dremio() -> vaex.dataframe.DataFrameArrays: | |
username = 'admin' | |
password = 'changeMe!' | |
sql = '''SELECT * FROM "local_mysql".imdb.title_basics where runtimeMinutes>0 limit 10000''' | |
client = flight.FlightClient('grpc+tcp://localhost:47470') | |
client.authenticate(HttpDremioClientAuthHandler(username, password)) | |
info = client.get_flight_info(flight.FlightDescriptor.for_command(sql)) | |
st = time.time() | |
reader = client.do_get(info.endpoints[0].ticket) | |
batches = [] | |
while True: | |
try: | |
batch, metadata = reader.read_chunk() | |
batches.append(batch) | |
except StopIteration: | |
break | |
print('total: ', len(batches)) | |
arrow_table = pa.Table.from_batches(batches) | |
t2 = time.time() | |
print('read take: ', t2 - st) | |
print('memory:', process.memory_info().rss) # in bytes | |
# df_pandas = arrow_table.to_pandas() | |
# df = vaex.from_pandas(df_pandas) | |
df = vaex.from_arrow_table(arrow_table) | |
t3 = time.time() | |
print('convert take:', t3 - t2) | |
print('memory:', process.memory_info().rss) # in bytes | |
print('print take:', time.time() - t3) | |
return df | |
def json_encode(data: Union[Dict, List], pretty: bool = False) -> str: | |
"""Serialize the given data(a dictionary or a list) using JSON. | |
The given data (a dictionary or a list) will be serialized using JSON | |
and returned as a string that will be nicely formatted if you set pretty=True. | |
""" | |
return orjson.dumps(data, option=orjson.OPT_NAIVE_UTC | orjson.OPT_SERIALIZE_NUMPY).decode('utf-8') | |
def encode_execution_results( | |
execution_results: List[Optional[ExecutionResult]], | |
format_error: Callable[[GraphQLError], Dict] = format_error_default, | |
is_batch: bool = False, | |
encode: Callable[[Dict], Any] = json_encode, | |
) -> ServerResponse: | |
"""Serialize the ExecutionResults. | |
This function takes the ExecutionResults that are returned by run_http_query() | |
and serializes them using JSON to produce an HTTP response. | |
If you set is_batch=True, then all ExecutionResults will be returned, otherwise only | |
the first one will be used. You can also pass a custom function that formats the | |
errors in the ExecutionResults, expecting a dictionary as result and another custom | |
function that is used to serialize the output. | |
Returns a ServerResponse tuple with the serialized response as the first item and | |
a status code of 200 or 400 in case any result was invalid as the second item. | |
""" | |
results = [format_execution_result(execution_result, format_error) for execution_result in execution_results] | |
result, status_codes = zip(*results) | |
status_code = max(status_codes) | |
if not is_batch: | |
result = result[0] | |
return ServerResponse(encode(result), status_code) | |
def format_error(error): | |
return { | |
"msg": error.message, | |
"loc": "{}:{}".format(error.locations[0].line, error.locations[0].column), | |
"pth": "/".join(error.path or []), | |
} | |
def format_execution_result( | |
execution_result: Optional[ExecutionResult], | |
format_error: Optional[Callable[[GraphQLError], Dict]] = format_error_default, | |
) -> FormattedResult: | |
"""Format an execution result into a GraphQLResponse. | |
This converts the given execution result into a FormattedResult that contains | |
the ExecutionResult converted to a dictionary and an appropriate status code. | |
""" | |
status_code = 200 | |
response: Optional[Dict[str, Any]] = None | |
if execution_result: | |
if execution_result.errors: | |
fe = [format_error(e) for e in execution_result.errors] # type: ignore | |
response = {"errors": fe} | |
if execution_result.errors and any(not getattr(e, "path", None) for e in execution_result.errors): | |
status_code = 400 | |
else: | |
response["data"] = execution_result.data | |
else: | |
response = {"data": execution_result.data} | |
return FormattedResult(response, status_code) | |
if __name__ == "__main__": | |
print(process.memory_info().rss) # in bytes | |
df = read_from_dremio() | |
t1 = time.time() | |
result = df.graphql.execute( | |
""" | |
{ | |
df(where: {runtimeMinutes: {_gt: 0}}){ | |
row(offset: 0, limit: 10) { | |
titleType | |
primaryTitle | |
originalTitle | |
isAdult | |
runtimeMinutes | |
} | |
} | |
}""" | |
) | |
t2 = time.time() | |
print('query take:', t2 - t1) | |
body, status_code = encode_execution_results([result], format_error, False) | |
print('encode take:', time.time() - t2) | |
print(body) | |
print(status_code) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment