Here is an example that I performed a Louvain algorithm, it's basically what I had done in this post: https://siwei.io/nebula-livejournal/
Nebula Console:
CREATE SPACE louvain (partition_num=15, replica_factor=1, vid_type=FIXED_STRING(256)) comment="louvain test";
create tag user() comment = 'user';
create edge relation(weight int) COMMENT = 'user relation';
INSERT VERTEX user() VALUE "110":(), "211": (), "312": (), "413": (), "514":(), "615":();
INSERT VERTEX user() VALUE "710":(), "811": (), "912": (), "1013": (), "1114":(), "1215":();
INSERT VERTEX user() VALUE "1310":(), "1411": (), "1512": (), "1613": (), "1714":(), "1815":();
INSERT VERTEX user() VALUE "1910":(), "2011": (), "2112": (), "2213": (), "2314":(), "2415":();
INSERT VERTEX user() VALUE "2510":(), "2611": (), "2712": (), "2813": (), "2914":(), "3015":();
INSERT VERTEX user() VALUE "3110":(), "3211": (), "3312": (), "3413": (), "3514":(), "3615":();
INSERT VERTEX user() VALUE "3710":(), "3811": (), "3912": (), "4013": (), "4114":(), "4215":();
INSERT VERTEX user() VALUE "4310":(), "4411": (), "4512": (), "4613": (), "4714":(), "4815":();
INSERT VERTEX user() VALUE "4910":(), "5011": (), "5112": (), "5213": (), "5314":(), "5415":();
INSERT VERTEX user() VALUE "5510":(), "5611": (), "5712": (), "5813": (), "5914":(), "6015":();
INSERT EDGE relation(weight) VALUES "110"->"211":(1);
INSERT EDGE relation(weight) VALUES "312"->"710":(2);
INSERT EDGE relation(weight) VALUES "811"->"912":(3);
INSERT EDGE relation(weight) VALUES "1013"->"1114":(4);
INSERT EDGE relation(weight) VALUES "1215"->"1310":(5);
INSERT EDGE relation(weight) VALUES "1411"->"1512":(6);
INSERT EDGE relation(weight) VALUES "1613"->"1714":(7);
INSERT EDGE relation(weight) VALUES "1815"->"1910":(8);
INSERT EDGE relation(weight) VALUES "2011"->"2112":(9);
INSERT EDGE relation(weight) VALUES "2213"->"2314":(10);
INSERT EDGE relation(weight) VALUES "2415"->"2510":(11);
INSERT EDGE relation(weight) VALUES "2611"->"2712":(12);
INSERT EDGE relation(weight) VALUES "2813"->"6015" :(13);
INSERT EDGE relation(weight) VALUES "2914"->"3015":(14);
INSERT EDGE relation(weight) VALUES "3110"->"3211":(15);
INSERT EDGE relation(weight) VALUES "3312"->"3413":(16);
INSERT EDGE relation(weight) VALUES "3514"->"3615":(17);
INSERT EDGE relation(weight) VALUES "3710"->"3811":(18);
INSERT EDGE relation(weight) VALUES "3912"->"4013":(19);
INSERT EDGE relation(weight) VALUES "4114"->"4215":(20);
INSERT EDGE relation(weight) VALUES "413"->"514":(21);
INSERT EDGE relation(weight) VALUES "615"->"710":(22);
INSERT EDGE relation(weight) VALUES "811"->"912":(23);
INSERT EDGE relation(weight) VALUES "1013"->"1114":(24);
INSERT EDGE relation(weight) VALUES "1215"->"1310":(25);
INSERT EDGE relation(weight) VALUES "1411"->"1512":(26);
INSERT EDGE relation(weight) VALUES "1613"->"1714":(27);
INSERT EDGE relation(weight) VALUES "1815"->"1910":(28);
INSERT EDGE relation(weight) VALUES "2011"->"2112":(29);
INSERT EDGE relation(weight) VALUES "2213"->"2314":(30);
INSERT EDGE relation(weight) VALUES "2415"->"2510":(31);
INSERT EDGE relation(weight) VALUES "2611"->"2712":(32);
INSERT EDGE relation(weight) VALUES "2813"->"2914":(33);
INSERT EDGE relation(weight) VALUES "3015"->"3110":(34);
INSERT EDGE relation(weight) VALUES "3211"->"3312":(35);
INSERT EDGE relation(weight) VALUES "3413"->"3514":(36);
INSERT EDGE relation(weight) VALUES "3615"->"3710":(37);
INSERT EDGE relation(weight) VALUES "3811"->"3912":(38);
INSERT EDGE relation(weight) VALUES "4013"->"4114":(39);
INSERT EDGE relation(weight) VALUES "4215"->"4310":(40);
INSERT EDGE relation(weight) VALUES "4411"->"4512":(41);
INSERT EDGE relation(weight) VALUES "4613"->"4714":(42);
INSERT EDGE relation(weight) VALUES "4815"->"4910":(43);
INSERT EDGE relation(weight) VALUES "5011"->"5112":(44);
INSERT EDGE relation(weight) VALUES "5213"->"5314":(45);
INSERT EDGE relation(weight) VALUES "5415"->"710":(46);
INSERT EDGE relation(weight) VALUES "811"->"912":(47);
INSERT EDGE relation(weight) VALUES "1013"->"1114":(48);
INSERT EDGE relation(weight) VALUES "1215"->"1310":(49);
INSERT EDGE relation(weight) VALUES "1411"->"1512":(50);
INSERT EDGE relation(weight) VALUES "1613"->"1714":(51);
INSERT EDGE relation(weight) VALUES "1815"->"1910":(52);
INSERT EDGE relation(weight) VALUES "2011"->"2112":(53);
INSERT EDGE relation(weight) VALUES "2213"->"2314":(54);
INSERT EDGE relation(weight) VALUES "2415"->"2510":(55);
INSERT EDGE relation(weight) VALUES "2611"->"2712":(56);
INSERT EDGE relation(weight) VALUES "2813"->"2914":(57);
INSERT EDGE relation(weight) VALUES "3015"->"3110":(58);
INSERT EDGE relation(weight) VALUES "3211"->"3312":(59);
INSERT EDGE relation(weight) VALUES "3413"->"3514":(60);
INSERT EDGE relation(weight) VALUES "3615"->"3710":(61);
INSERT EDGE relation(weight) VALUES "3811"->"3912":(62);
INSERT EDGE relation(weight) VALUES "4013"->"4114":(63);
INSERT EDGE relation(weight) VALUES "4215"->"5510":(64);
INSERT EDGE relation(weight) VALUES "5611"->"5712":(65);
INSERT EDGE relation(weight) VALUES "5813"->"5914":(66);
create algorithm env in docker
cd ~
mkdir -p test/nebula-algorithm
cd test/nebula-algorithm
docker run --name spark-master --network nebula-docker-compose_nebula-net \
-h spark-master -e ENABLE_INIT_DAEMON=false -d \
-v ${HOME}/test/nebula-algorithm/:/root \
bde2020/spark-master:2.4.5-hadoop2.7
wget https://repo1.maven.org/maven2/com/vesoft/nebula-algorithm/2.6.2/nebula-algorithm-2.6.2.jar
vim algo-louvain.conf
docker exec -it spark-master bash
cd /root
/spark/bin/spark-submit --master "local" --conf spark.rpc.askTimeout=6000s \
--class com.vesoft.nebula.algorithm.Main \
--driver-memory 16g nebula-algorithm-2.6.2.jar \
-p algo-louvain.conf
...
22/01/10 10:34:05 INFO TaskSetManager: Starting task 0.0 in stage 796.0 (TID 1000, localhost, executor driver, partition 0, ANY, 7767 bytes)
22/01/10 10:34:05 INFO Executor: Running task 0.0 in stage 796.0 (TID 1000)
22/01/10 10:34:05 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks including 2 local blocks and 0 remote blocks
22/01/10 10:34:05 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
22/01/10 10:34:05 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
22/01/10 10:34:05 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
22/01/10 10:34:05 INFO FileOutputCommitter: Saved output of task 'attempt_20220110103405_0796_m_000000_1000' to file:/output/_temporary/0/task_20220110103405_0796_m_000000
22/01/10 10:34:05 INFO SparkHadoopMapRedUtil: attempt_20220110103405_0796_m_000000_1000: Committed
22/01/10 10:34:05 INFO Executor: Finished task 0.0 in stage 796.0 (TID 1000). 1654 bytes result sent to driver
22/01/10 10:34:05 INFO TaskSetManager: Finished task 0.0 in stage 796.0 (TID 1000) in 120 ms on localhost (executor driver) (1/1)
22/01/10 10:34:05 INFO TaskSchedulerImpl: Removed TaskSet 796.0, whose tasks have all completed, from pool
22/01/10 10:34:05 INFO DAGScheduler: ResultStage 796 (csv at AlgoWriter.scala:53) finished in 0.147 s
22/01/10 10:34:05 INFO DAGScheduler: Job 22 finished: csv at AlgoWriter.scala:53, took 0.309399 s
22/01/10 10:34:05 INFO FileFormatWriter: Write Job 658734e4-ce53-4ca8-92cd-6d7f9421fc54 committed.
22/01/10 10:34:05 INFO FileFormatWriter: Finished processing stats for write job 658734e4-ce53-4ca8-92cd-6d7f9421fc54.
22/01/10 10:34:05 INFO SparkContext: Invoking stop() from shutdown hook
22/01/10 10:34:05 INFO SparkUI: Stopped Spark web UI at http://spark-master:4040
22/01/10 10:34:06 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/01/10 10:34:06 INFO MemoryStore: MemoryStore cleared
22/01/10 10:34:06 INFO BlockManager: BlockManager stopped
22/01/10 10:34:06 INFO BlockManagerMaster: BlockManagerMaster stopped
22/01/10 10:34:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/01/10 10:34:06 INFO SparkContext: Successfully stopped SparkContext
22/01/10 10:34:06 INFO ShutdownHookManager: Shutdown hook called
22/01/10 10:34:06 INFO ShutdownHookManager: Deleting directory /tmp/spark-0bdbb853-cbc8-4876-827c-937e1748f6a9
22/01/10 10:34:06 INFO ShutdownHookManager: Deleting directory /tmp/spark-0ab58035-9c32-4b2e-a649-4a34d67a4d06
bash-5.0# ls -l /output/
total 4
-rw-r--r-- 1 root root 0 Jan 10 10:34 _SUCCESS
-rw-r--r-- 1 root root 192 Jan 10 10:34 part-00000-01c6be1f-b8be-4e68-b708-156bab8ec2b6-c000.csv
bash-5.0# head /output/part-00000-01c6be1f-b8be-4e68-b708-156bab8ec2b6-c000.csv
_id,louvain
6015,2813
2914,2813
2813,2813
3514,3015
3413,3015
3312,3015
4114,3015
3211,3015
ref:
algo-louvain.conf
{
# Spark relation config
spark: {
app: {
name: louvain
# spark.app.partitionNum
partitionNum:10
}
master:local
}
data: {
# data source. optional of nebula,csv,json
source: nebula
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
sink: csv
# if your algorithm needs weight
hasWeight: true
}
# Nebula Graph relation config
nebula: {
# algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
read: {
# Nebula metad server address, multiple addresses are split by English comma
metaAddress: "172.20.0.3:9559"
# Nebula space
space: louvain
# Nebula edge types, multiple labels means that data from multiple edges will union together
labels: ["relation"]
# Nebula edge property name for each edge type, this property will be as weight col for algorithm.
# Make sure the weightCols are corresponding to labels.
weightCols: ["weight"]
}
# algo result sink into Nebula. If data.sink is nebula, then this nebula.write config can be valid.
write:{
# Nebula graphd server address, multiple addresses are split by English comma
graphAddress: "graphd:9669"
# Nebula metad server address, multiple addresses are split by English comma
metaAddress: "172.20.0.3:9559,172.20.0.4:9559,172.20.0.2:9559"
user:root
pswd:nebula
# Nebula space name
space:algo
# Nebula tag name, the algorithm result will be write into this tag
tag:louvain
type:insert
}
}
local: {
# algo's data source from Nebula. If data.source is csv or json, then this local.read can be valid.
read:{
filePath: "hdfs://10.1.1.168:9000/edge/work_for.csv"
# srcId column
srcId:"_c0"
# dstId column
dstId:"_c1"
# weight column
#weight: "col3"
# if csv file has header
header: false
# csv file's delimiter
delimiter:","
}
# algo result sink into local file. If data.sink is csv or text, then this local.write can be valid.
write:{
resultPath:/output/
}
}
algorithm: {
# the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent,
# labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount,
# betweenness]
executeAlgo: louvain
# Louvain parameter
louvain: {
maxIter: 20
internalIter: 10
tol: 0.5
}
# connected component parameter.
connectedcomponent: {
maxIter: 20
}
# LabelPropagation parameter
labelpropagation: {
maxIter: 20
}
# ShortestPaths parameter
shortestpaths: {
# several vertices to compute the shortest path to all vertices.
landmarks: "1"
}
# Vertex degree statistics parameter
degreestatic: {}
# KCore parameter
kcore:{
maxIter:10
degree:1
}
# Trianglecount parameter
trianglecount:{}
# Betweenness centrality parameter
betweenness:{
maxIter:5
}
}
}