This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use arrow::{ | |
array::{Int32Array, StringArray}, | |
datatypes::{DataType, Field, Schema as ArrowSchema}, | |
record_batch::RecordBatch, | |
}; | |
use deltalake::operations::DeltaOps; | |
use deltalake::{builder::DeltaTableBuilder, DeltaTable}; | |
use std::collections::HashMap; | |
use std::sync::Arc; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use deltalake::operations::create::CreateBuilder; | |
use deltalake::{DeltaTable, SchemaDataType}; | |
use std::collections::HashMap; | |
async fn create_table(path: String, backend_config: HashMap<String, String>) -> DeltaTable { | |
let builder = CreateBuilder::new() | |
.with_location(path) | |
.with_storage_options(backend_config) | |
.with_column( | |
"id", |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Databricks notebook source | |
# DBTITLE 1,For retrying the example in same session | |
spark.conf.unset("spark.databricks.delta.write.txnAppId") | |
spark.conf.unset("spark.databricks.delta.write.txnVersion") | |
# COMMAND ---------- | |
# DBTITLE 1,Create sample input table | |
spark.range(10).write.format("delta").saveAsTable("numbers") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
last_commit_version = spark.conf.get("spark.databricks.delta.lastCommitVersionInSession") | |
commit_file = f"{last_commit_version.zfill(20)}.json" | |
display_stats_for_commit(f"dbfs:/user/hive/warehouse/source/_delta_log/{commit_file}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%sql | |
OPTIMIZE source ZORDER BY (id) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spark.conf.set("spark.databricks.delta.optimize.maxFileSize", str(1024 * 1024 * 160)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
find_stats_for_records_matching_x_or_y(zorder_organised_files, 2, 3) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def calculate_data_zorder_placement(items: List[DataItem]): | |
files = {} | |
for index, item in enumerate(items): | |
x, y = item.x, item.y | |
xb = f"{x:>03b}" | |
yb = f"{y:>03b}" | |
interleaved = f"{yb[0]}{xb[0]}{yb[1]}{xb[1]}{yb[2]}{xb[2]}" | |
zorder = int(interleaved, 2) | |
file_placement = zorder // 4 | |
if not file_placement in files: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def find_stats_for_records_matching_x_or_y(files, x, y): | |
files_scanned = 0 | |
false_positives = 0 | |
for k in files.keys(): | |
file = files[k] | |
if not ((file.minx <= x and file.maxx >= x) or (file.miny <= y and file.maxy >=y)): | |
continue | |
files_scanned += 1 | |
for item in file.items: | |
if item.x != x and item.y != y: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def adjust_file_info(file: FileInfo, item: DataItem): | |
if file.minx is None or file.minx > item.x: | |
file.minx = item.x | |
if file.miny is None or file.miny > item.y: | |
file.miny = item.y | |
if file.maxx is None or file.maxx < item.x: | |
file.maxx = item.x | |
if file.maxy is None or file.maxy < item.y: | |
file.maxy = item.y | |
def calculate_data_linear_placement(items: List[DataItem]): |
NewerOlder