Skip to content

Instantly share code, notes, and snippets.

@agileone
Forked from jexp/bulk-neo4j-import-original.sh
Created May 11, 2016 02:06
Show Gist options
  • Save agileone/d5c88a0e8377760ff2aac339f0d85385 to your computer and use it in GitHub Desktop.
Save agileone/d5c88a0e8377760ff2aac339f0d85385 to your computer and use it in GitHub Desktop.
Panama Papers Import Scripts for Neo4j and Docker
export NEO4J_HOME=${NEO4J_HOME-~/Downloads/neo4j-community-3.0.1}
if [ ! -f data-csv.zip ]; then
curl -OL https://cloudfront-files-1.publicintegrity.org/offshoreleaks/data-csv.zip
fi
export DATA=${PWD}/import
unzip -o -j data-csv.zip -d $DATA
tr -d '\\' < $DATA/Addresses.csv > $DATA/Addresses_fixed.csv
for i in $DATA/[AIEO]*.csv; do echo $i; sed -i '' -e '1,1 s/node_id/node_id:ID/' $i; done
sed -i -e '1,1 s/node_1,rel_type,node_2/node_id:START_ID,rel_type:TYPE,node_id:END_ID/' $DATA/all_edges.csv
rm -rf panama.db
$NEO4J_HOME/bin/neo4j-import --into panama.db --nodes:Address $DATA/Addresses_fixed.csv --nodes:Entity $DATA/Entities.csv --nodes:Intermediary $DATA/Intermediaries.csv --nodes:Officer $DATA/Officers.csv \
--relationships $DATA/all_edges.csv --ignore-empty-strings true --skip-duplicate-nodes true --skip-bad-relationships true --bad-tolerance 1000000 --multiline-fields=true
$NEO4J_HOME/bin/neo4j-shell -path panama.db -c 'MATCH (n)-[r]->(m) RETURN labels(n),type(r),labels(m),count(*);'
# IMPORT DONE in 23s 361ms. Imported:
# 839434 nodes
# 1265690 relationships
# 8211012 properties
mkdir -p panama
cd panama
# load apoc from https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/tag/1.0.0
mkdir -p plugins
curl -L https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/latest/apoc-1.0.0.jar -o plugins/apoc-1.0.0.jar
# load public panama-papers files from: https://offshoreleaks.icij.org/pages/database
if [ ! -f data-csv.zip ]; then curl -OL https://cloudfront-files-1.publicintegrity.org/offshoreleaks/data-csv.zip; fi
unzip -o -j data-csv.zip -d import
tr -d '\\' < import/Addresses.csv > import/Addresses_fixed.csv
export PORT=`date +%S`123
echo $PORT
export HERE=`pwd`
mkdir -p $HERE/data
rm -rf $HERE/data/*
export CONTAINER=`docker run \
--name neo4j-panama \
--detach \
--publish=$PORT:7474 \
--volume=$HERE/data:/data \
--volume=$HERE/import:/var/lib/neo4j/import \
--volume=$HERE/plugins:/plugins \
--ulimit=nofile=40000:40000 \
--env=NEO4J_dbms_memory_heap_maxSize=5000 \
--env=NEO4J_dbms_memory_pagecache_size=500M \
neo4j:3.0`
docker ps -f name=neo4j-panama
sleep 5
docker exec -i $CONTAINER /var/lib/neo4j/bin/neo4j-shell -f << EOF
match (n) detach delete n;
create constraint on (n:Node) assert n.node_id is unique;
USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM "file:///Addresses_fixed.csv" AS row MERGE (n:Node {node_id:row.node_id}) ON CREATE SET n = row, n:Address;
USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM "file:///Intermediaries.csv" AS row MERGE (n:Node {node_id:row.node_id}) ON CREATE SET n = row, n:Intermediary;
USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM "file:///Entities.csv" AS row MERGE (n:Node {node_id:row.node_id}) ON CREATE SET n = row, n:Entity;
USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM "file:///Officers.csv" AS row MERGE (n:Node {node_id:row.node_id}) ON CREATE SET n = row, n:Officer;
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///all_edges.csv" AS row
WITH row WHERE row.rel_type = "intermediary_of"
MATCH (n1:Node) WHERE n1.node_id = row.node_1
MATCH (n2:Node) WHERE n2.node_id = row.node_2
CREATE (n1)-[:INTERMEDIARY_OF]->(n2);
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///all_edges.csv" AS row
WITH row WHERE row.rel_type = "officer_of"
MATCH (n1:Node) WHERE n1.node_id = row.node_1
MATCH (n2:Node) WHERE n2.node_id = row.node_2
CREATE (n1)-[:OFFICER_OF]->(n2);
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///all_edges.csv" AS row
WITH row WHERE row.rel_type = "registered_address"
MATCH (n1:Node) WHERE n1.node_id = row.node_1
MATCH (n2:Node) WHERE n2.node_id = row.node_2
CREATE (n1)-[:REGISTERED_ADDRESS]->(n2);
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///all_edges.csv" AS row
WITH row WHERE row.rel_type = "similar"
MATCH (n1:Node) WHERE n1.node_id = row.node_1
MATCH (n2:Node) WHERE n2.node_id = row.node_2
CREATE (n1)-[:SIMILAR]->(n2);
USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM "file:///all_edges.csv" AS row
WITH row WHERE row.rel_type = "underlying"
MATCH (n1:Node) WHERE n1.node_id = row.node_1
MATCH (n2:Node) WHERE n2.node_id = row.node_2
CREATE (n1)-[:UNDERLYING]->(n2);
DROP CONSTRAINT ON (n:Node) ASSERT n.node_id IS UNIQUE;
MATCH (n) REMOVE n:Node;
CREATE INDEX ON :Officer(name);
CREATE INDEX ON :Entity(name);
CREATE INDEX ON :Entity(address);
CREATE INDEX ON :Intermediary(name);
CREATE INDEX ON :Address(address);
// stats
MATCH (n)-[r]->(m)
RETURN labels(n),type(r),labels(m),count(*)
ORDER BY count(*) DESC;
schema await
EOF
echo "Neo4j running on $PORT mounted $HERE/data and $HERE/import container: $CONTAINER"
echo "To kill run: docker rm -f $CONTAINER"
CREATE INDEX ON :Intermediary(name);
CREATE INDEX ON :Address(address);
CREATE INDEX ON :Officer(name);
CREATE INDEX ON :Entity(name);
CREATE INDEX ON :Entity(address);
CREATE INDEX ON :Entity(jurisdiction);
CREATE INDEX ON :Entity(incorporation_date);
CREATE INDEX ON :Entity(inactivation_date);
CREATE INDEX ON :Entity(struck_off_date);
CREATE INDEX ON :Entity(service_provider);
CREATE INDEX ON :Entity(original_name);
CREATE INDEX ON :Entity(status);
CREATE INDEX ON :Entity(country_codes);
CREATE INDEX ON :Address(country_codes);
CREATE INDEX ON :Intermediary(country_codes);
CREATE INDEX ON :Officer(country_codes);
// everything below is optional for fun
// mark officers as companies
unwind [" LTD","SURVIVORSHIP"," CORP","LIMITED","INC","FOUNDATION"," S.A.","PORTADOR","TRUST","BEARER","INTERNATIONAL","COMPANY","ANSTALT","INVESTMENTS"," B.V."," AG"] as designation
match (o:Officer)
WHERE NOT o:Company AND toUpper(o.name) CONTAINS designation
SET o:Company;
// set sources as label for faster filtering
MATCH (n) WHERE n.sourceID = "Panama Papers" and NOT n:PP
SET n:PP;
MATCH (n) WHERE n.sourceID = "Offshore Leaks" and NOT n:OSL
SET n:OSL;
// extract country nodes
CREATE CONSTRAINT ON (c:Country) ASSERT c.code IS UNIQUE;
CALL apoc.periodic.commit("
MATCH (n) WHERE exists(n.country_codes)
WITH n limit 50000
WITH n, split(n.country_codes,';') as codes,split(n.countries,';') as countries
FOREACH (idx in range(0,size(codes)-1) |
MERGE (country:Country {code:codes[idx]}) ON CREATE SET country.name = countries[idx]
MERGE (n)-[:LOCATED_IN]->(country)
)
REMOVE n.country_codes, n.countries
RETURN count(*)
",{});
// create a full-text index
CALL apoc.index.addAllNodes('offshore',{
Officer: ["name"],
Intermediary: ["name","address"],
Address: ["address"],
Entity: ["name", "address", "service_provider", "former_name", "company_type"]});
// top-20 officers
match (o:Officer:PP)
WHERE NOT o:Company
return o.name,
size( (o)-[:OFFICER_OF]->()) as deg
order by deg desc limit 100;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment