Created
September 6, 2019 05:08
-
-
Save codebrain/9356a8c6db1c887bbd34706a6e77a66b to your computer and use it in GitHub Desktop.
Building a realtime address search with the Australian G-NAF dataset
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Licensed under the Apache License, Version 2.0 (the "License"); | |
// you may not use this file except in compliance with the License. | |
// You may obtain a copy of the License at | |
// | |
// http://www.apache.org/licenses/LICENSE-2.0 | |
// | |
// Unless required by applicable law or agreed to in writing, software | |
// distributed under the License is distributed on an "AS IS" BASIS, | |
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
// See the License for the specific language governing permissions and | |
// limitations under the License. | |
// Ensure the paths to these DLLS are set correctly | |
#r "Elasticsearch.Net.dll" | |
#r "Nest.dll" | |
#r "System.Diagnostics.DiagnosticSource.dll" | |
#r "System.Buffers.dll" | |
open Nest | |
open System | |
open System.Data.SqlClient | |
open System.IO | |
open System.Text.RegularExpressions | |
open System.Globalization | |
open System.Threading | |
// Variables to change | |
let unzippedGNAFDir = @"D:\AUG19_GNAF_PipeSeparatedValue\G-NAF"// Will require approx 2GB of disk space | |
let sqlConnectionString = @"Server=SQLEXPRESS;Database=gnaf;Integrated Security=true;" // Will require approx 12GB of disk space | |
let elasticsearch = new Uri("http://localhost:9200") // Will require approx 8GB of disk space | |
let elasticsearchIndex = "address" | |
// Log with timestamp | |
let logFormat = Printf.TextWriterFormat<int->int->int->string->unit>("[%02d:%02d:%02d] %s") | |
let logf format = | |
let time = DateTime.Now | |
Printf.ksprintf (fun s -> printfn logFormat time.Hour time.Minute time.Second s) format | |
let log message = | |
let time = DateTime.Now | |
printfn logFormat time.Hour time.Minute time.Second message | |
// Address types for Elasticsearch | |
type AddressComponent = | |
{ BuildingName: string | |
Number: string | |
Street: string | |
Locality: string | |
State: string | |
Postcode: string } | |
let toDisplay (address:AddressComponent) = | |
seq { | |
yield address.Number | |
yield address.Street + "," | |
yield address.Locality | |
yield address.State | |
yield address.Postcode | |
} | |
|> String.concat " " | |
type Address = | |
{ Id: string | |
Display: string | |
Location: GeoLocation | |
Component: AddressComponent } | |
// SQL operations | |
let initSql(sqlConnection:SqlConnection) = | |
let getCommand file = | |
let fixLine (line:string) = line.Replace("CREATE OR REPLACE VIEW ADDRESS_VIEW", "CREATE VIEW ADDRESS_VIEW") | |
let fixLines file = File.ReadAllLines(file) |> Array.map fixLine | |
String.Join(Environment.NewLine, fixLines file) | |
let tableScriptsDir = Path.Combine(unzippedGNAFDir, @"Extras\GNAF_TableCreation_Scripts") | |
let createTables = Path.Combine(tableScriptsDir, "create_tables_sqlserver.sql") | |
let constraints = Path.Combine(tableScriptsDir, "add_fk_constraints.sql") | |
let createView = Path.Combine(unzippedGNAFDir, @"Extras\GNAF_View_Scripts\address_view.sql") | |
log "Initialising SQL database" | |
for setupFile in [| createTables; constraints; createView |] do | |
let commandText = getCommand setupFile | |
let command = new SqlCommand(commandText, sqlConnection) | |
if command.ExecuteNonQuery() <> -1 then failwith (sprintf "Received failure return value for %s" setupFile) | |
sqlConnection | |
let indexSql(sqlConnection:SqlConnection) = | |
let indexFiles(dir, regex) = | |
let bulkSqlInsert command table = | |
let command = new SqlCommand(command, sqlConnection) | |
command.CommandTimeout <- 600 | |
let returnValue = command.ExecuteNonQuery() | |
if returnValue = 0 then failwith (sprintf "No records inserted into %s" table) | |
else logf "Inserted %i records into %s" returnValue table | |
for file in Directory.EnumerateFiles(dir) do | |
let fileInfo = FileInfo file | |
let rMatch = Regex.Match(fileInfo.Name, regex) | |
let table = rMatch.Groups.["table"].Value | |
let bulkInsert = sprintf "BULK INSERT %s FROM '%s' WITH (FIELDTERMINATOR = '|', FIRSTROW = 2)" table fileInfo.FullName | |
bulkSqlInsert bulkInsert table | |
log "Indexing Authority Code data" | |
let dataAuthorityDir = Path.Combine(unzippedGNAFDir, @"G-NAF AUGUST 2019\Authority Code") | |
indexFiles (dataAuthorityDir, "^Authority_Code_(?<table>.*?)_psv.psv$") | |
log "Indexing State data" | |
let dataStandardDir = Path.Combine(unzippedGNAFDir, @"G-NAF AUGUST 2019\Standard") | |
indexFiles (dataStandardDir, "^[^_]*_(?<table>.*?)_psv.psv$") | |
sqlConnection | |
// Elasticsearch operations | |
let createIndex (elasticClient:ElasticClient) = | |
let properties = new Properties<Address>() | |
properties.Add(PropertyName.op_Implicit "display", new SearchAsYouTypeProperty()) | |
properties.Add(PropertyName.op_Implicit "location", new GeoPointProperty()) | |
let mapping = new TypeMapping() | |
mapping.Properties <- properties | |
let settings = new IndexSettings() | |
settings.NumberOfReplicas <- Nullable 0 | |
settings.NumberOfShards <- Nullable 1 | |
let createIndexRequest = new CreateIndexRequest(IndexName.op_Implicit elasticsearchIndex) | |
createIndexRequest.Settings <- settings | |
createIndexRequest.Mappings <- mapping | |
logf "Creating index %s" elasticsearchIndex | |
let indexResponse = elasticClient.Indices.Create(createIndexRequest) | |
logf "Created index %O" indexResponse | |
let bulkIndex (elasticClient:ElasticClient) (sqlConnection:SqlConnection) = | |
let timeout = TimeSpan.FromMinutes(60.0) | |
let currentPage = ref 0 | |
let perBulkRequest = Nullable 10000 | |
let backoffTime = "30s" | |
let backoffRetries = Nullable 2 | |
let parallelism = Nullable 4 | |
let columnValue column (reader:SqlDataReader) = | |
let ordinal = reader.GetOrdinal(column) | |
if reader.IsDBNull(ordinal) then String.Empty | |
else reader.[ordinal].ToString() | |
let columnDecimalValue column (reader:SqlDataReader) = | |
reader.GetOrdinal(column) |> reader.GetDecimal | |
let address (reader:SqlDataReader) = | |
let addressNumber (reader:SqlDataReader) = | |
let addressPart item (reader:SqlDataReader) = | |
seq { | |
yield columnValue (item + "_PREFIX") reader | |
yield columnValue item reader | |
yield columnValue (item + "_SUFFIX") reader | |
} | |
|> Seq.filter (fun elem -> String.length elem > 0) | |
|> String.concat " " | |
let addressPartWrapped item prefix suffix (reader:SqlDataReader) = | |
let joined = addressPart item reader | |
if String.length joined > 0 then prefix + joined + suffix | |
else joined | |
let lotNumber = addressPartWrapped "LOT_NUMBER" "Lot " "" reader | |
let flatNumber = addressPartWrapped "FLAT_NUMBER" "" "/" reader | |
let numberFirst = addressPart "NUMBER_FIRST" reader | |
let numberLast = addressPartWrapped "NUMBER_LAST" "-" "" reader | |
[| lotNumber; flatNumber; numberFirst; numberLast |] | |
|> Seq.filter (fun elem -> String.length elem > 0) | |
|> String.concat "" | |
let culture = new CultureInfo("en-AU", false) | |
let toTitleCase (column:string) = | |
let (|Prefix|_|) prefix (candidate:string) = | |
if candidate.StartsWith(prefix) then Some(candidate.Substring(prefix.Length)) | |
else None | |
let titleCase = culture.TextInfo.ToTitleCase(column.ToLowerInvariant()) | |
match titleCase with | |
| Prefix "Mc" trailing -> "Mc" + culture.TextInfo.ToTitleCase(trailing) | |
| _ -> titleCase | |
let number = addressNumber reader | |
let street = columnValue "STREET_NAME" reader |> toTitleCase | |
let streetNameSuffix = columnValue "STREET_SUFFIX_TYPE" reader |> toTitleCase | |
let streetName = | |
let streetTypeCode = columnValue "STREET_TYPE_CODE" reader | |
if String.length streetNameSuffix > 0 then sprintf "%s %s" streetTypeCode streetNameSuffix else streetTypeCode | |
|> toTitleCase | |
let locality = columnValue "LOCALITY_NAME" reader |> toTitleCase | |
let state = columnValue "STATE_ABBREVIATION" reader | |
let postcode = columnValue "POSTCODE" reader | |
let buildingName = columnValue "BUILDING_NAME" reader |> toTitleCase | |
{ BuildingName = buildingName | |
Number = number | |
Street = sprintf "%s %s" street streetName | |
Locality = locality | |
State = state | |
Postcode = postcode } | |
let readCommand = new SqlCommand("SELECT * FROM ADDRESS_VIEW", sqlConnection) | |
readCommand.CommandTimeout <- 600 | |
let reader = readCommand.ExecuteReader() | |
let results = | |
seq { | |
while reader.Read() do | |
let id = columnValue "ADDRESS_DETAIL_PID" reader | |
let lat = columnDecimalValue "LATITUDE" reader | |
let lon = columnDecimalValue "LONGITUDE" reader | |
let addressParts = address reader | |
yield { | |
Id = id | |
Display = addressParts |> toDisplay | |
Location = GeoLocation.TryCreate((double)lat, (double)lon) | |
Component = addressParts | |
} | |
} | |
log "Bulk indexing into Elasticsearch" | |
elasticClient.BulkAll(results, fun (b:BulkAllDescriptor<Address>) -> | |
b.Index(IndexName.op_Implicit elasticsearchIndex) | |
.BackOffTime(Time.op_Implicit (backoffTime)) | |
.BackOffRetries(backoffRetries) | |
.RefreshOnCompleted() | |
.MaxDegreeOfParallelism(parallelism) | |
.Size(perBulkRequest) :> IBulkAllRequest<Address>).Wait(timeout, fun next -> | |
let page = Interlocked.Increment(currentPage) | |
logf "%i addresses indexed" (page * perBulkRequest.Value) | |
) |> ignore | |
log "Bulk indexing complete" | |
let displayResults (searchResponse:ISearchResponse<Address>) = | |
for hit in searchResponse.Hits do | |
Console.WriteLine hit.Source.Display | |
Console.WriteLine("Took: {0}ms", searchResponse.Took) | |
let searchAsYouTypeDemo (elasticClient:ElasticClient) = | |
let search text = | |
let query = new MultiMatchQuery() | |
query.Query <- text | |
query.Type <- Nullable.op_Implicit TextQueryType.BoolPrefix | |
query.Fields <- Fields.op_Implicit "display,display._2gram,display._3gram" | |
let request = new SearchRequest<Address>() | |
request.Query <- new QueryContainer(query) | |
let searchAsYouTypeResponse = elasticClient.Search<Address>(request) | |
displayResults searchAsYouTypeResponse | |
let readLine () = | |
Console.Write "\nEnter search (or type 'quit'): " | |
Console.ReadLine() | |
let readlines = Seq.initInfinite (fun _ -> readLine()) | |
let run item = if item = "quit" | |
then Some(item) | |
else | |
search item | |
None | |
Seq.pick run readlines |> ignore | |
elasticClient | |
let geoSearchDemo (elasticClient:ElasticClient) = | |
let query = new GeoDistanceQuery() | |
query.Field <- Field.op_Implicit "location" | |
query.Distance <- Distance.op_Implicit "20km" | |
query.Location <- new GeoLocation(-25.3444, 131.0369) | |
let request = new SearchRequest<Address>() | |
request.Query <- new QueryContainer(query) | |
let geoSearchResponse = elasticClient.Search<Address>(request) | |
displayResults geoSearchResponse | |
// Execute script | |
let connectionSettings = new ConnectionSettings(elasticsearch) | |
connectionSettings.DefaultIndex(elasticsearchIndex) | |
let elasticClient = new ElasticClient(connectionSettings) | |
elasticClient | |
|> createIndex | |
let sqlConnection = new SqlConnection(sqlConnectionString) | |
sqlConnection.Open() | |
sqlConnection | |
|> initSql | |
|> indexSql | |
|> bulkIndex elasticClient | |
sqlConnection.Close() | |
elasticClient | |
|> searchAsYouTypeDemo | |
|> geoSearchDemo |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment