Skip to content

Instantly share code, notes, and snippets.

@andriika
Last active April 19, 2024 08:25
Show Gist options
  • Save andriika/e9f3c34c4e29ace79806af5c2f318a88 to your computer and use it in GitHub Desktop.
Save andriika/e9f3c34c4e29ace79806af5c2f318a88 to your computer and use it in GitHub Desktop.
Apache Calcite get started example. Enables SQL over in-memory json objects stored in java.util.Map.

Apache Calcite get started example. Enables SQL over in-memory json objects stored in java.util.Map.

<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-core</artifactId>
    <version>1.23.0</version>
</dependency>
        Class.forName("org.apache.calcite.jdbc.Driver");

        Properties info = new Properties();
        info.setProperty("lex", "JAVA");
        Connection connection = DriverManager.getConnection("jdbc:calcite:", info);

        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
        SchemaPlus rootSchema = calciteConnection.getRootSchema();
        Schema schema = new CustomSchema();
        rootSchema.add("hr", schema);

        Statement statement = calciteConnection.createStatement();
        ResultSet rs = statement.executeQuery("select * from hr.employees as e where e.age >= 30");

        while (rs.next()) {
            long id = rs.getLong("id");
            String name = rs.getString("name");
            int age = rs.getInt("age");
            System.out.println("id: " + id + "; name: " + name + "; age: " + age);
        }

        rs.close();
        statement.close();
        connection.close();
package poc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class CustomSchema extends AbstractSchema {
private static final ObjectMapper mapper = new ObjectMapper();
private static final Map<Object, ObjectNode> employees = new HashMap<>();
static {
employees.put(1L, mapper.createObjectNode().put("name", "john").put("age", 30));
employees.put(2L, mapper.createObjectNode().put("name", "jane").put("age", 25));
employees.put(3L, mapper.createObjectNode().put("name", "cole").put("age", 50));
}
@Override
protected Map<String, Table> getTableMap() {
return Collections.singletonMap("employees", new CustomTable(employees));
}
}
package poc;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import commons.StreamIterable;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.type.SqlTypeName;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class CustomTable extends AbstractTable implements ScannableTable {
private final Map<Object, ObjectNode> data;
private final List<String> fieldNames;
private final List<SqlTypeName> fieldTypes;
public CustomTable(Map<Object, ObjectNode> data) {
this.data = data;
List<String> names = new ArrayList<>();
names.add("id");
names.add("name");
names.add("age");
this.fieldNames = names;
List<SqlTypeName> types = new ArrayList<>();
types.add(SqlTypeName.BIGINT);
types.add(SqlTypeName.VARCHAR);
types.add(SqlTypeName.INTEGER);
this.fieldTypes = types;
}
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
// https://github.com/apache/calcite/blob/fa8349069d141d3c75bafa06d5fb8800711ec8d6/example/csv/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator.java#L111
List<RelDataType> types = fieldTypes.stream().map(typeFactory::createSqlType).collect(Collectors.toList());
return typeFactory.createStructType(types, fieldNames);
}
@Override
public Enumerable<Object[]> scan(DataContext root) {
Stream<Object[]> dataStream = data.entrySet().stream().map(this::toObjectArray);
return Linq4j.asEnumerable(new StreamIterable<>(dataStream));
}
private Object[] toObjectArray(Map.Entry<Object, ObjectNode> item) {
Object[] res = new Object[fieldNames.size()];
res[0] = item.getKey();
for (int i = 1; i < fieldNames.size(); i++) {
JsonNode v = item.getValue().get(fieldNames.get(i));
SqlTypeName type = fieldTypes.get(i);
switch (type) {
case VARCHAR:
res[i] = v.textValue();
break;
case INTEGER:
res[i] = v.intValue();
break;
default:
throw new RuntimeException("unsupported sql type: " + type);
}
}
return res;
}
}
@arjunsk
Copy link

arjunsk commented Nov 30, 2022

Thank you so much for this!

You missed adding this file.

public final class StreamIterable<T> implements Iterable<T> {

    private final Stream<T> stream;

    StreamIterable(Stream<T> stream) {
        this.stream = stream;
    }

    @Override
    public Iterator<T> iterator() {
        return stream.iterator();
    }

    public static <T> StreamIterable<T> of(Stream<T> stream) {
        return new StreamIterable<>(stream);
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment