Skip to content

Instantly share code, notes, and snippets.

@kokosing
Created December 2, 2018 21:15
Show Gist options
  • Save kokosing/42f3cbffa49ba0cd011eff1e7af85b6e to your computer and use it in GitHub Desktop.
Save kokosing/42f3cbffa49ba0cd011eff1e7af85b6e to your computer and use it in GitHub Desktop.
ElasticsearchTableDescriptionProvider
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java
index 8c35fc9615..ed0b5adc29 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java
@@ -70,20 +70,19 @@ public class ElasticsearchClient
private static final Logger LOG = Logger.get(ElasticsearchClient.class);
private final ObjectMapper objecMapper = new ObjectMapperProvider().get();
- private final Map<SchemaTableName, ElasticsearchTableDescription> tableDescriptions;
+ private final ElasticsearchTableDescriptionProvider tableDescriptions;
private final Map<String, TransportClient> clients = new HashMap<>();
private final Map<ElasticsearchTableDescription, List<ColumnMetadata>> tableColumns = new HashMap<>();
private final Duration timeout;
@Inject
- public ElasticsearchClient(Map<SchemaTableName, ElasticsearchTableDescription> descriptions, Duration requestTimeout)
+ public ElasticsearchClient(ElasticsearchTableDescriptionProvider descriptions, Duration requestTimeout)
throws IOException
{
tableDescriptions = requireNonNull(descriptions, "description is null");
timeout = requireNonNull(requestTimeout, "requestTimeout is null");
- for (Entry<SchemaTableName, ElasticsearchTableDescription> entry : tableDescriptions.entrySet()) {
- ElasticsearchTableDescription tableDescription = entry.getValue();
+ for (ElasticsearchTableDescription tableDescription : tableDescriptions.getAllTableDescriptions()) {
if (!clients.containsKey(tableDescription.getClusterName())) {
Settings settings = Settings.builder().put("cluster.name", tableDescription.getClusterName()).build();
TransportAddress address = new TransportAddress(InetAddress.getByName(tableDescription.getHost()), tableDescription.getPort());
@@ -110,14 +109,14 @@ public class ElasticsearchClient
public List<String> listSchemas()
{
- return tableDescriptions.keySet().stream()
+ return tableDescriptions.getAllSchemaTableNames().stream()
.map(SchemaTableName::getSchemaName)
.collect(toImmutableList());
}
public List<SchemaTableName> listTables(Optional<String> schemaName)
{
- return tableDescriptions.keySet()
+ return tableDescriptions.getAllSchemaTableNames()
.stream()
.filter(schemaTableName -> !schemaName.isPresent() || schemaTableName.getSchemaName().equals(schemaName.get()))
.collect(toImmutableList());
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java
index 73ef0433d5..9a43c860de 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java
@@ -15,29 +15,26 @@ package com.facebook.presto.elasticsearch;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.NodeManager;
-import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorContext;
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.facebook.presto.spi.type.TypeManager;
import com.google.inject.Injector;
import com.google.inject.Scopes;
-import com.google.inject.TypeLiteral;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.json.JsonModule;
import java.util.Map;
import java.util.Optional;
-import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
public class ElasticsearchConnectorFactory
implements ConnectorFactory
{
- private final Optional<Supplier<Map<SchemaTableName, ElasticsearchTableDescription>>> tableDescriptionSupplier;
+ private final Optional<ElasticsearchTableDescriptionProvider> tableDescriptionSupplier;
- ElasticsearchConnectorFactory(Optional<Supplier<Map<SchemaTableName, ElasticsearchTableDescription>>> tableDescriptionSupplier)
+ ElasticsearchConnectorFactory(Optional<ElasticsearchTableDescriptionProvider> tableDescriptionSupplier)
{
this.tableDescriptionSupplier = requireNonNull(tableDescriptionSupplier, "tableDescriptionSupplier is null");
}
@@ -69,10 +66,10 @@ public class ElasticsearchConnectorFactory
binder.bind(NodeManager.class).toInstance(context.getNodeManager());
if (tableDescriptionSupplier.isPresent()) {
- binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, ElasticsearchTableDescription>>>() {}).toInstance(tableDescriptionSupplier.get());
+ binder.bind(ElasticsearchTableDescriptionProvider.class).toInstance(tableDescriptionSupplier.get());
}
else {
- binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, ElasticsearchTableDescription>>>() {}).to(ElasticsearchTableDescriptionSupplier.class).in(Scopes.SINGLETON);
+ binder.bind(ElasticsearchTableDescriptionProvider.class).to(ElasticsearchTableDescriptionProvider.class).in(Scopes.SINGLETON);
}
});
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorModule.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorModule.java
index c1bcf43e2a..65d5339fce 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorModule.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorModule.java
@@ -14,7 +14,6 @@
package com.facebook.presto.elasticsearch;
import com.facebook.presto.decoder.DecoderModule;
-import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.fasterxml.jackson.databind.DeserializationContext;
@@ -28,8 +27,6 @@ import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.IOException;
-import java.util.Map;
-import java.util.function.Supplier;
import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
import static com.google.common.base.Preconditions.checkArgument;
@@ -59,10 +56,10 @@ public class ElasticsearchConnectorModule
@Singleton
@Provides
- public static ElasticsearchClient createElasticsearchClient(ElasticsearchConnectorConfig config, Supplier<Map<SchemaTableName, ElasticsearchTableDescription>> elasticsearchTableDescriptionSupplier)
+ public static ElasticsearchClient createElasticsearchClient(ElasticsearchConnectorConfig config, ElasticsearchTableDescriptionProvider elasticsearchTableDescriptionProvider)
throws IOException
{
- return new ElasticsearchClient(elasticsearchTableDescriptionSupplier.get(), config.getRequestTimeout());
+ return new ElasticsearchClient(elasticsearchTableDescriptionProvider, config.getRequestTimeout());
}
private static final class TypeDeserializer
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java
index d511b5b994..4d27eb0de7 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java
@@ -30,11 +30,11 @@ import static java.util.Objects.requireNonNull;
public class ElasticsearchPlugin
implements Plugin
{
- private Optional<Supplier<Map<SchemaTableName, ElasticsearchTableDescription>>> tableDescriptionSupplier = Optional.empty();
+ private Optional<ElasticsearchTableDescriptionProvider> tableDescriptionSupplier = Optional.empty();
public ElasticsearchPlugin() {}
- public ElasticsearchPlugin(Supplier<Map<SchemaTableName, ElasticsearchTableDescription>> tableDescriptionSupplier)
+ public ElasticsearchPlugin(ElasticsearchTableDescriptionProvider tableDescriptionSupplier)
{
requireNonNull(tableDescriptionSupplier, "tableDescriptionSupplier is null");
this.tableDescriptionSupplier = Optional.of(tableDescriptionSupplier);
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableDescriptionProvider.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableDescriptionProvider.java
index d913960fe9..3e168e5042 100644
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableDescriptionProvider.java
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableDescriptionProvider.java
@@ -36,13 +36,12 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.nio.file.Files.readAllBytes;
-public class DefaultElasticsearchTableDescriptionProvider
- implements ElasticsearchTableDescriptionProvider
+public class ElasticsearchTableDescriptionProvider
{
private final Map<SchemaTableName, ElasticsearchTableDescription> tableDefinitions;
@Inject
- DefaultElasticsearchTableDescriptionProvider(ElasticsearchConnectorConfig config, JsonCodec<ElasticsearchTableDescription> codec)
+ ElasticsearchTableDescriptionProvider(ElasticsearchConnectorConfig config, JsonCodec<ElasticsearchTableDescription> codec)
{
this.tableDefinitions = new HashMap<>();
@@ -88,19 +87,16 @@ public class DefaultElasticsearchTableDescriptionProvider
return new SchemaTableName(parts.get(0), parts.get(1));
}
- @Override
public ElasticsearchTableDescription get(SchemaTableName schemaTableName)
{
return tableDefinitions.get(schemaTableName);
}
- @Override
public Set<SchemaTableName> getAllSchemaTableNames()
{
return tableDefinitions.keySet();
}
- @Override
public Set<ElasticsearchTableDescription> getAllTableDescriptions()
{
return ImmutableSet.copyOf(tableDefinitions.values());
diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchQueryRunner.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchQueryRunner.java
index fdedaaa3d0..6903fa7fd8 100644
--- a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchQueryRunner.java
+++ b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchQueryRunner.java
@@ -35,7 +35,6 @@ import java.net.URL;
import java.util.HashMap;
import java.util.Map;
-import static com.facebook.presto.elasticsearch.ElasticsearchTableDescriptionSupplier.createElasticsearchTableDescriptions;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static com.facebook.presto.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static com.google.common.io.Resources.getResource;
@@ -68,7 +67,7 @@ public final class ElasticsearchQueryRunner
embeddedElasticsearch.start();
- Map<SchemaTableName, ElasticsearchTableDescription> tableDescriptions = createTableDescriptions(queryRunner.getCoordinator().getMetadata(), tables);
+ ElasticsearchTableDescriptionProvider tableDescriptions = createTableDescriptions(queryRunner.getCoordinator().getMetadata(), tables);
installElasticsearchPlugin(queryRunner, tables, tableDescriptions);
@@ -89,7 +88,7 @@ public final class ElasticsearchQueryRunner
}
}
- private static Map<SchemaTableName, ElasticsearchTableDescription> createTableDescriptions(Metadata metadata, Iterable<TpchTable<?>> tables)
+ private static ElasticsearchTableDescriptionProvider createTableDescriptions(Metadata metadata, Iterable<TpchTable<?>> tables)
throws Exception
{
JsonCodec<ElasticsearchTableDescription> codec = new CodecSupplier<>(ElasticsearchTableDescription.class, metadata).get();
@@ -103,13 +102,17 @@ public final class ElasticsearchQueryRunner
tableNames.add(TPCH_SCHEMA + "." + table.getTableName());
}
- return createElasticsearchTableDescriptions(new File(metadataUri), TPCH_SCHEMA, tableNames.build(), codec);
+ ElasticsearchConnectorConfig config = new ElasticsearchConnectorConfig()
+ .setTableDescriptionDir(new File(metadataUri))
+ .setDefaultSchema(TPCH_SCHEMA)
+ .setTableNames(Joiner.on(",").join(tableNames.build()));
+ return new ElasticsearchTableDescriptionProvider(config, codec);
}
- private static void installElasticsearchPlugin(QueryRunner queryRunner, Iterable<TpchTable<?>> tables, Map<SchemaTableName, ElasticsearchTableDescription> tableDescriptions)
+ private static void installElasticsearchPlugin(QueryRunner queryRunner, Iterable<TpchTable<?>> tables, ElasticsearchTableDescriptionProvider tableDescriptions)
throws Exception
{
- ElasticsearchPlugin plugin = new ElasticsearchPlugin(() -> tableDescriptions);
+ ElasticsearchPlugin plugin = new ElasticsearchPlugin(tableDescriptions);
queryRunner.installPlugin(plugin);
ImmutableList.Builder<SchemaTableName> tableNames = ImmutableList.builder();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment