Created
September 11, 2016 06:46
-
-
Save jackghm/b2ea9d5536755288ca5f48cd182731b1 to your computer and use it in GitHub Desktop.
nodeSegUDFgen.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io._ | |
/* | |
Argument 0 is the output file from the following | |
vsql -h hostNme -U jackg -w <redacted> -d pstl -At -c "select get_projection_segments('jackg.testHashNodes_p1_b0');" -o /tmp/get_projection_segments.out | |
Argument 1 is the output file from the following | |
run vSQL to create an output file of the node names and Private IP address used to ssh too. | |
typically we run on an Edge Node, then ssh to private Vertica host IP Address to execute local vsql commands | |
vsql -h hostNme -U jackg -w <redacted> -d pstl -At -c "select b.node_name, a.ip_address from v_monitor.network_interfaces a join v_catalog.nodes b on (a.node_name = b.node_name) where b.node_state = 'UP' and a.interface = 'eno16777736' and a.ip_address_family = 'ipv4' order by a.node_name asc;" -o /tmp/NodeIP.out | |
*/ | |
object nodeSegUDFgen { | |
def main(args: Array[String]): Unit = { | |
if (args.isEmpty) { | |
println(s"ERROR: expected 2 arguments. A Path to get_projection_segments file and path to IP/Nodes filename\n") | |
System.exit(1) | |
} | |
import Control._ | |
val fileIn = args(0) // /tmp/get_projection_segments.out | |
val sNodeNamesFilePath = args(1) // /tmp/NodeIP.out | |
val fJava = new java.io.File(fileIn) | |
val sInsSelParallelConf = fJava.getParent + "/InsSelParallel.sh.conf" // get the parent directory path of the file | |
val segUDF = s"CREATE OR REPLACE FUNCTION test.segmentation(hashint Integer) RETURN INT AS BEGIN RETURN (4294967295 & hashINT); END;\n" | |
case class nodeSeg(index: Int, nodeName: String, segLow: Long, segHigh: Long) | |
// To avoid using JDBC we read the get_projection_segments output from a file. | |
// The file from get_projection_segments output contains 3 rows that we read into arrays of strings. | |
// We also read a file containing the nodenames and internal IpAddresses | |
// used in our parallel export/import migration code (not shown here) | |
import scala.io._ | |
val bufferedSourceNodeIPs = Source.fromFile(sNodeNamesFilePath) | |
val aLinesNodeIP = using(bufferedSourceNodeIPs) { bufferedSourceNodeIPs => bufferedSourceNodeIPs.getLines.toArray } | |
import scala.collection.mutable.Map | |
val nodeMap = Map[String, String]() | |
for ((line, elementNum) <- aLinesNodeIP.zipWithIndex) { | |
val aNode: Array[String] = line.split('|').map(_.trim) | |
nodeMap += (aNode(0) -> aNode(1)) | |
} | |
val bufferedSource = Source.fromFile(fileIn) | |
val aLines = using(bufferedSource) { bufferedSource => bufferedSource.getLines.toArray } | |
val aNodes: Array[String] = aLines(0).split('|').map(_.trim) | |
val aSegHigh: Array[Long] = aLines(1).split('|').map(_.trim.toLong) | |
val aSegLow: Array[Long] = aLines(2).split('|').map(_.trim.toLong) | |
// prealloc an array of n case classes (null) objects | |
val aNodeSeg = new Array[nodeSeg](aNodes.length) | |
// fill our array with case class objects | |
for ((node, elementNum) <- aNodes.zipWithIndex) { | |
aNodeSeg(elementNum) = nodeSeg(elementNum, node, aSegLow(elementNum), aSegHigh(elementNum)) | |
} | |
val aNodeSegSorted = aNodeSeg.sortBy(r => (r.segLow)) | |
val sNodeSegUDFgen_sql = args(0) + ".sql" | |
var segNodes = s"CREATE OR REPLACE FUNCTION test.segmentationNode(seg INT) RETURN varchar(128) AS BEGIN RETURN (\nCASE" | |
val fNodeSegUDFgen_sql = new File(sNodeSegUDFgen_sql) | |
val bwNodeSegUDFgen_sql = new BufferedWriter(new FileWriter(fNodeSegUDFgen_sql)) | |
val fInsSelParallel_conf = new File(sInsSelParallelConf) | |
val bwInsSelParallel_conf = new BufferedWriter(new FileWriter(fInsSelParallel_conf)) | |
// build our SQL UDF string | |
var sNodeNameAndIPaddr = "" | |
for ((node, elementNum) <- aNodeSegSorted.zipWithIndex) { | |
sNodeNameAndIPaddr += s"${elementNum + 1}|${node.nodeName}|${nodeMap(node.nodeName)}\n" | |
if (node.segHigh < node.segLow) { | |
// special case logic for lower and upper bounds from Vertica Eng | |
segNodes += s"\n WHEN ( (seg between 0 and ${node.segHigh}) or (seg >= ${node.segLow})) then '${node.nodeName}'" | |
} | |
else { | |
segNodes += s"\n WHEN (seg between ${node.segLow}\tand ${node.segHigh})\tthen '${node.nodeName}'" | |
} | |
} | |
segNodes += " ELSE 'unk' END);\nEND;\n" | |
using(bwNodeSegUDFgen_sql) { | |
bwNodeSegUDFgen_sql => { | |
bwNodeSegUDFgen_sql.write(segUDF) | |
bwNodeSegUDFgen_sql.append(segNodes) | |
} | |
} | |
println(s"INFO: vSQL UDF scripts created in file (${sNodeSegUDFgen_sql})") | |
using(bwInsSelParallel_conf) { | |
bwInsSelParallel_conf => { | |
bwInsSelParallel_conf.write(sNodeNameAndIPaddr) | |
} | |
} | |
println(s"INFO: InsSelParallel.conf file created (${sInsSelParallelConf})") | |
} | |
object Control { | |
def using[A <: {def close() : Unit}, B](param: A)(f: A => B): B = | |
try { | |
f(param) | |
} | |
finally { | |
param.close() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment