Created
December 2, 2018 21:15
-
-
Save kokosing/42f3cbffa49ba0cd011eff1e7af85b6e to your computer and use it in GitHub Desktop.
ElasticsearchTableDescriptionProvider
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java | |
index 8c35fc9615..ed0b5adc29 100644 | |
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java | |
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchClient.java | |
@@ -70,20 +70,19 @@ public class ElasticsearchClient | |
private static final Logger LOG = Logger.get(ElasticsearchClient.class); | |
private final ObjectMapper objecMapper = new ObjectMapperProvider().get(); | |
- private final Map<SchemaTableName, ElasticsearchTableDescription> tableDescriptions; | |
+ private final ElasticsearchTableDescriptionProvider tableDescriptions; | |
private final Map<String, TransportClient> clients = new HashMap<>(); | |
private final Map<ElasticsearchTableDescription, List<ColumnMetadata>> tableColumns = new HashMap<>(); | |
private final Duration timeout; | |
@Inject | |
- public ElasticsearchClient(Map<SchemaTableName, ElasticsearchTableDescription> descriptions, Duration requestTimeout) | |
+ public ElasticsearchClient(ElasticsearchTableDescriptionProvider descriptions, Duration requestTimeout) | |
throws IOException | |
{ | |
tableDescriptions = requireNonNull(descriptions, "description is null"); | |
timeout = requireNonNull(requestTimeout, "requestTimeout is null"); | |
- for (Entry<SchemaTableName, ElasticsearchTableDescription> entry : tableDescriptions.entrySet()) { | |
- ElasticsearchTableDescription tableDescription = entry.getValue(); | |
+ for (ElasticsearchTableDescription tableDescription : tableDescriptions.getAllTableDescriptions()) { | |
if (!clients.containsKey(tableDescription.getClusterName())) { | |
Settings settings = Settings.builder().put("cluster.name", tableDescription.getClusterName()).build(); | |
TransportAddress address = new TransportAddress(InetAddress.getByName(tableDescription.getHost()), tableDescription.getPort()); | |
@@ -110,14 +109,14 @@ public class ElasticsearchClient | |
public List<String> listSchemas() | |
{ | |
- return tableDescriptions.keySet().stream() | |
+ return tableDescriptions.getAllSchemaTableNames().stream() | |
.map(SchemaTableName::getSchemaName) | |
.collect(toImmutableList()); | |
} | |
public List<SchemaTableName> listTables(Optional<String> schemaName) | |
{ | |
- return tableDescriptions.keySet() | |
+ return tableDescriptions.getAllSchemaTableNames() | |
.stream() | |
.filter(schemaTableName -> !schemaName.isPresent() || schemaTableName.getSchemaName().equals(schemaName.get())) | |
.collect(toImmutableList()); | |
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java | |
index 73ef0433d5..9a43c860de 100644 | |
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java | |
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorFactory.java | |
@@ -15,29 +15,26 @@ package com.facebook.presto.elasticsearch; | |
import com.facebook.presto.spi.ConnectorHandleResolver; | |
import com.facebook.presto.spi.NodeManager; | |
-import com.facebook.presto.spi.SchemaTableName; | |
import com.facebook.presto.spi.connector.Connector; | |
import com.facebook.presto.spi.connector.ConnectorContext; | |
import com.facebook.presto.spi.connector.ConnectorFactory; | |
import com.facebook.presto.spi.type.TypeManager; | |
import com.google.inject.Injector; | |
import com.google.inject.Scopes; | |
-import com.google.inject.TypeLiteral; | |
import io.airlift.bootstrap.Bootstrap; | |
import io.airlift.json.JsonModule; | |
import java.util.Map; | |
import java.util.Optional; | |
-import java.util.function.Supplier; | |
import static java.util.Objects.requireNonNull; | |
public class ElasticsearchConnectorFactory | |
implements ConnectorFactory | |
{ | |
- private final Optional<Supplier<Map<SchemaTableName, ElasticsearchTableDescription>>> tableDescriptionSupplier; | |
+ private final Optional<ElasticsearchTableDescriptionProvider> tableDescriptionSupplier; | |
- ElasticsearchConnectorFactory(Optional<Supplier<Map<SchemaTableName, ElasticsearchTableDescription>>> tableDescriptionSupplier) | |
+ ElasticsearchConnectorFactory(Optional<ElasticsearchTableDescriptionProvider> tableDescriptionSupplier) | |
{ | |
this.tableDescriptionSupplier = requireNonNull(tableDescriptionSupplier, "tableDescriptionSupplier is null"); | |
} | |
@@ -69,10 +66,10 @@ public class ElasticsearchConnectorFactory | |
binder.bind(NodeManager.class).toInstance(context.getNodeManager()); | |
if (tableDescriptionSupplier.isPresent()) { | |
- binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, ElasticsearchTableDescription>>>() {}).toInstance(tableDescriptionSupplier.get()); | |
+ binder.bind(ElasticsearchTableDescriptionProvider.class).toInstance(tableDescriptionSupplier.get()); | |
} | |
else { | |
- binder.bind(new TypeLiteral<Supplier<Map<SchemaTableName, ElasticsearchTableDescription>>>() {}).to(ElasticsearchTableDescriptionSupplier.class).in(Scopes.SINGLETON); | |
+ binder.bind(ElasticsearchTableDescriptionProvider.class).to(ElasticsearchTableDescriptionProvider.class).in(Scopes.SINGLETON); | |
} | |
}); | |
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorModule.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorModule.java | |
index c1bcf43e2a..65d5339fce 100644 | |
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorModule.java | |
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchConnectorModule.java | |
@@ -14,7 +14,6 @@ | |
package com.facebook.presto.elasticsearch; | |
import com.facebook.presto.decoder.DecoderModule; | |
-import com.facebook.presto.spi.SchemaTableName; | |
import com.facebook.presto.spi.type.Type; | |
import com.facebook.presto.spi.type.TypeManager; | |
import com.fasterxml.jackson.databind.DeserializationContext; | |
@@ -28,8 +27,6 @@ import javax.inject.Inject; | |
import javax.inject.Singleton; | |
import java.io.IOException; | |
-import java.util.Map; | |
-import java.util.function.Supplier; | |
import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature; | |
import static com.google.common.base.Preconditions.checkArgument; | |
@@ -59,10 +56,10 @@ public class ElasticsearchConnectorModule | |
@Singleton | |
@Provides | |
- public static ElasticsearchClient createElasticsearchClient(ElasticsearchConnectorConfig config, Supplier<Map<SchemaTableName, ElasticsearchTableDescription>> elasticsearchTableDescriptionSupplier) | |
+ public static ElasticsearchClient createElasticsearchClient(ElasticsearchConnectorConfig config, ElasticsearchTableDescriptionProvider elasticsearchTableDescriptionProvider) | |
throws IOException | |
{ | |
- return new ElasticsearchClient(elasticsearchTableDescriptionSupplier.get(), config.getRequestTimeout()); | |
+ return new ElasticsearchClient(elasticsearchTableDescriptionProvider, config.getRequestTimeout()); | |
} | |
private static final class TypeDeserializer | |
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java | |
index d511b5b994..4d27eb0de7 100644 | |
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java | |
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchPlugin.java | |
@@ -30,11 +30,11 @@ import static java.util.Objects.requireNonNull; | |
public class ElasticsearchPlugin | |
implements Plugin | |
{ | |
- private Optional<Supplier<Map<SchemaTableName, ElasticsearchTableDescription>>> tableDescriptionSupplier = Optional.empty(); | |
+ private Optional<ElasticsearchTableDescriptionProvider> tableDescriptionSupplier = Optional.empty(); | |
public ElasticsearchPlugin() {} | |
- public ElasticsearchPlugin(Supplier<Map<SchemaTableName, ElasticsearchTableDescription>> tableDescriptionSupplier) | |
+ public ElasticsearchPlugin(ElasticsearchTableDescriptionProvider tableDescriptionSupplier) | |
{ | |
requireNonNull(tableDescriptionSupplier, "tableDescriptionSupplier is null"); | |
this.tableDescriptionSupplier = Optional.of(tableDescriptionSupplier); | |
diff --git a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableDescriptionProvider.java b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableDescriptionProvider.java | |
index d913960fe9..3e168e5042 100644 | |
--- a/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableDescriptionProvider.java | |
+++ b/presto-elasticsearch/src/main/java/com/facebook/presto/elasticsearch/ElasticsearchTableDescriptionProvider.java | |
@@ -36,13 +36,12 @@ import static com.google.common.base.Preconditions.checkArgument; | |
import static com.google.common.base.Strings.isNullOrEmpty; | |
import static java.nio.file.Files.readAllBytes; | |
-public class DefaultElasticsearchTableDescriptionProvider | |
- implements ElasticsearchTableDescriptionProvider | |
+public class ElasticsearchTableDescriptionProvider | |
{ | |
private final Map<SchemaTableName, ElasticsearchTableDescription> tableDefinitions; | |
@Inject | |
- DefaultElasticsearchTableDescriptionProvider(ElasticsearchConnectorConfig config, JsonCodec<ElasticsearchTableDescription> codec) | |
+ ElasticsearchTableDescriptionProvider(ElasticsearchConnectorConfig config, JsonCodec<ElasticsearchTableDescription> codec) | |
{ | |
this.tableDefinitions = new HashMap<>(); | |
@@ -88,19 +87,16 @@ public class DefaultElasticsearchTableDescriptionProvider | |
return new SchemaTableName(parts.get(0), parts.get(1)); | |
} | |
- @Override | |
public ElasticsearchTableDescription get(SchemaTableName schemaTableName) | |
{ | |
return tableDefinitions.get(schemaTableName); | |
} | |
- @Override | |
public Set<SchemaTableName> getAllSchemaTableNames() | |
{ | |
return tableDefinitions.keySet(); | |
} | |
- @Override | |
public Set<ElasticsearchTableDescription> getAllTableDescriptions() | |
{ | |
return ImmutableSet.copyOf(tableDefinitions.values()); | |
diff --git a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchQueryRunner.java b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchQueryRunner.java | |
index fdedaaa3d0..6903fa7fd8 100644 | |
--- a/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchQueryRunner.java | |
+++ b/presto-elasticsearch/src/test/java/com/facebook/presto/elasticsearch/ElasticsearchQueryRunner.java | |
@@ -35,7 +35,6 @@ import java.net.URL; | |
import java.util.HashMap; | |
import java.util.Map; | |
-import static com.facebook.presto.elasticsearch.ElasticsearchTableDescriptionSupplier.createElasticsearchTableDescriptions; | |
import static com.facebook.presto.testing.TestingSession.testSessionBuilder; | |
import static com.facebook.presto.tpch.TpchMetadata.TINY_SCHEMA_NAME; | |
import static com.google.common.io.Resources.getResource; | |
@@ -68,7 +67,7 @@ public final class ElasticsearchQueryRunner | |
embeddedElasticsearch.start(); | |
- Map<SchemaTableName, ElasticsearchTableDescription> tableDescriptions = createTableDescriptions(queryRunner.getCoordinator().getMetadata(), tables); | |
+ ElasticsearchTableDescriptionProvider tableDescriptions = createTableDescriptions(queryRunner.getCoordinator().getMetadata(), tables); | |
installElasticsearchPlugin(queryRunner, tables, tableDescriptions); | |
@@ -89,7 +88,7 @@ public final class ElasticsearchQueryRunner | |
} | |
} | |
- private static Map<SchemaTableName, ElasticsearchTableDescription> createTableDescriptions(Metadata metadata, Iterable<TpchTable<?>> tables) | |
+ private static ElasticsearchTableDescriptionProvider createTableDescriptions(Metadata metadata, Iterable<TpchTable<?>> tables) | |
throws Exception | |
{ | |
JsonCodec<ElasticsearchTableDescription> codec = new CodecSupplier<>(ElasticsearchTableDescription.class, metadata).get(); | |
@@ -103,13 +102,17 @@ public final class ElasticsearchQueryRunner | |
tableNames.add(TPCH_SCHEMA + "." + table.getTableName()); | |
} | |
- return createElasticsearchTableDescriptions(new File(metadataUri), TPCH_SCHEMA, tableNames.build(), codec); | |
+ ElasticsearchConnectorConfig config = new ElasticsearchConnectorConfig() | |
+ .setTableDescriptionDir(new File(metadataUri)) | |
+ .setDefaultSchema(TPCH_SCHEMA) | |
+ .setTableNames(Joiner.on(",").join(tableNames.build())); | |
+ return new ElasticsearchTableDescriptionProvider(config, codec); | |
} | |
- private static void installElasticsearchPlugin(QueryRunner queryRunner, Iterable<TpchTable<?>> tables, Map<SchemaTableName, ElasticsearchTableDescription> tableDescriptions) | |
+ private static void installElasticsearchPlugin(QueryRunner queryRunner, Iterable<TpchTable<?>> tables, ElasticsearchTableDescriptionProvider tableDescriptions) | |
throws Exception | |
{ | |
- ElasticsearchPlugin plugin = new ElasticsearchPlugin(() -> tableDescriptions); | |
+ ElasticsearchPlugin plugin = new ElasticsearchPlugin(tableDescriptions); | |
queryRunner.installPlugin(plugin); | |
ImmutableList.Builder<SchemaTableName> tableNames = ImmutableList.builder(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment