Skip to content

Instantly share code, notes, and snippets.

@ottomata
Last active August 29, 2015 14:06
Show Gist options
  • Save ottomata/a7b6046d3b0d39885f83 to your computer and use it in GitHub Desktop.
Save ottomata/a7b6046d3b0d39885f83 to your computer and use it in GitHub Desktop.

Let's import the page and category links table from the plwiki database.

Sqoop will need JAVA_HOME set. We (ops) should make it so this is set for all user shells. #TODO

export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk-amd64

page

We'll use the page table from plwiki as our example The page table looks like:

mysql -uresearch -p -hanalytics-store.eqiad.wmnet -e 'show create table page' plwiki

CREATE TABLE `page` (
  `page_id` int(8) unsigned NOT NULL AUTO_INCREMENT,
  `page_namespace` int(11) NOT NULL DEFAULT '0',
  `page_title` varbinary(255) NOT NULL DEFAULT '',
  `page_restrictions` tinyblob NOT NULL,
  `page_counter` bigint(20) unsigned NOT NULL DEFAULT '0',
  `page_is_redirect` tinyint(1) unsigned NOT NULL DEFAULT '0',
  `page_is_new` tinyint(1) unsigned NOT NULL DEFAULT '0',
  `page_random` double unsigned NOT NULL DEFAULT '0',
  `page_touched` varbinary(14) NOT NULL DEFAULT '',
  `page_links_updated` varbinary(14) DEFAULT NULL,
  `page_latest` int(8) unsigned NOT NULL DEFAULT '0',
  `page_len` int(8) unsigned NOT NULL DEFAULT '0',
  `page_no_title_convert` tinyint(1) NOT NULL DEFAULT '0',
  `page_content_model` varbinary(32) DEFAULT NULL,
  PRIMARY KEY (`page_id`),
  UNIQUE KEY `name_title` (`page_namespace`,`page_title`),
  KEY `page_random` (`page_random`),
  KEY `page_len` (`page_len`),
  KEY `page_redirect_namespace_len` (`page_is_redirect`,`page_namespace`,`page_len`)
) ENGINE=TokuDB AUTO_INCREMENT=3267239 DEFAULT CHARSET=binary `compression`='tokudb_zlib'

Sqoop won't be able to automatically import this table into hive, as it has a hard time with varbinaries and blobs and other types. We have to write a custom query to import the table. We will skip importing blobs, and cast varbinaries to strings. We'll use the following query:

SELECT
  -- List all column names you want imported.
  -- Sqoop will automatically create a table with these columns.
  a.page_id AS page_id,
  CAST(a.page_title AS CHAR(255) CHARSET utf8) AS page_title,  -- we want page_title, so cast it to a CHAR
  a.page_counter AS page_counter,
  a.page_is_redirect as page_is_redirect,
  a.page_is_new as page_is_new,
  a.page_len as page_len
  -- skip everything else...?
FROM page a
-- $CONDITIONS allows Sqoop to split the import into multiple mappers during the import.
-- This will be much faster if you are importing a large table.
WHERE $CONDITIONS;

We'll now pass this query to the sqoop import command.

sqoop import                                                        \
  --connect jdbc:mysql://s1-analytics-slave.eqiad.wmnet/plwiki      \
  --verbose                                                         \
  --target-dir /tmp/$(mktemp -u -p '' -t ${USER}_sqoop_XXXXXX)      \
  --delete-target-dir                                               \
  --username=research --password $password                          \
  --query '
SELECT
  a.page_id AS page_id,
  CAST(a.page_title AS CHAR(255) CHARSET utf8) AS page_title,
  a.page_counter AS page_counter,
  a.page_is_redirect as page_is_redirect,
  a.page_is_new as page_is_new,
  a.page_len as page_len
FROM page a
WHERE $CONDITIONS
'                                                                   \
--split-by a.page_id                                                \
--hive-partition-key project                                        \
--hive-partition-value plwiki                                       \
--hive-import                                                       \
--hive-database otto                                                \
--create-hive-table                                                 \
--hive-table page

--target-dir will be a temporary HDFS directory used during the import.

--split-by tells Sqoop what field on which to split the data for mappers. This should probably just be the primary key.

--hive-partition-key and --hive-partition-value allow you to set a static Hive partition for this import. I've decided to store all page data into this one table, and hardcode the partition project=plwiki for this import. This will allow you to select from the page table and specify the project you want to work with in the where clause.

The remaining hive options tell Sqoop that you want the data imported from MySQL to be stored in a Hive table.

Let's see if it worked!

hive --database otto -e 'show create table page';
...
CREATE  TABLE `page0`(
  `page_id` bigint,
  `page_title` string,
  `page_counter` bigint,
  `page_is_redirect` boolean,
  `page_is_new` boolean,
  `page_len` bigint)
COMMENT 'Imported by sqoop on 2014/09/23 16:54:52'
PARTITIONED BY (
  `project` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\u0001'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://analytics-hadoop/user/hive/warehouse/otto.db/page0'
TBLPROPERTIES (
  'numPartitions'='1',
  'numFiles'='5',
  'transient_lastDdlTime'='1411491294',
  'totalSize'='2777',
  'numRows'='0',
  'rawDataSize'='0')
hive --database otto -e 'select page_title from page limit 10';
...
Aromorfoza
Azot
Amedeo_Avogadro
Uwierzytelnienie
Autoryzacja
Alfabet
AMP
Wieloprocesorowość_asynchroniczna
Apla
Asembler

categorylinks

Same deal.

First our MySQL query:

SELECT
  a.cl_from AS cl_from,
  cast(a.cl_to AS CHAR(255) CHARSET utf8) AS cl_to,
  cast(a.cl_sortkey AS CHAR(230) CHARSET utf8) AS cl_sortkey,
  a.cl_timestamp AS cl_timestamp,
  cast(a.cl_sortkey_prefix AS CHAR(255) CHARSET utf8) AS cl_sortkey_prefix,
  cast(a.cl_collation AS CHAR(32) CHARSET utf8) AS cl_collation,
  cast(a.cl_type AS CHAR(6) CHARSET utf8) AS cl_type
FROM categorylinks a
WHERE $CONDITIONS
;

And, now to sqoop it:

sqoop import                                                        \
  --connect jdbc:mysql://s1-analytics-slave.eqiad.wmnet/plwiki      \
  --verbose                                                         \
  --target-dir /tmp/$(mktemp -u -p '' -t ${USER}_sqoop_XXXXXX)      \
  --delete-target-dir                                               \
  --username=research --password $password                          \
  --query '
SELECT
  a.cl_from AS cl_from,
  cast(a.cl_to AS CHAR(255) CHARSET utf8) AS cl_to,
  cast(a.cl_sortkey AS CHAR(230) CHARSET utf8) AS cl_sortkey,
  a.cl_timestamp AS cl_timestamp,
  cast(a.cl_sortkey_prefix AS CHAR(255) CHARSET utf8) AS cl_sortkey_prefix,
  cast(a.cl_collation AS CHAR(32) CHARSET utf8) AS cl_collation,
  cast(a.cl_type AS CHAR(6) CHARSET utf8) AS cl_type
FROM categorylinks a
WHERE $CONDITIONS
'                                                                   \
--split-by a.cl_from                                                \
--hive-partition-key project                                        \
--hive-partition-value plwiki                                       \
--hive-import                                                       \
--hive-database otto                                                \
--create-hive-table                                                 \
--hive-table categorylinks

Let's check it:

hive --database otto -e 'show create table categorylinks';
...
CREATE  TABLE `categorylinks0`(
  `cl_from` bigint,
  `cl_to` string,
  `cl_sortkey` string,
  `cl_timestamp` string,
  `cl_sortkey_prefix` string,
  `cl_collation` string,
  `cl_type` string)
COMMENT 'Imported by sqoop on 2014/09/23 17:14:40'
PARTITIONED BY (
  `project` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\u0001'
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://analytics-hadoop/user/hive/warehouse/otto.db/categorylinks0'
TBLPROPERTIES (
  'numPartitions'='1',
  'numFiles'='5',
  'transient_lastDdlTime'='1411492482',
  'totalSize'='5198',
  'numRows'='0',
  'rawDataSize'='0')
hive --database otto -e 'select distinct(cl_to), cl_type from categorylinks where project="plwiki" and cl_type = "file" limit 10';
...
Byłe_ilustracje_na_medal	file
Grafiki_do_przetłumaczenia	file
Herby_miast_ukraińskich	file
Ilustracje_na_medal	file
Lokalnie_załadowane_pliki	file
Pliki_o_nieznanym_statusie_prawnym	file
Pliki_oczekujące_na_przeniesienie_do_Commons	file
Portal_Chrześcijaństwo/Ilustracja_miesiąca	file
Portal_Katolicyzm/Ilustracja_tygodnia	file

Cool!

Querying page, categorylinks...and...webrequests!

hive --database otto

SELECT
    p.page_title, count(*) as cnt
FROM page p
INNER JOIN 
    categorylinks cl ON cl.cl_from = p.page_id
    AND cl.cl_type = 'file'
INNER JOIN
    wmf_raw.webrequest w ON regexp_extract(w.uri_path, '^.*/(.+)$', 1) = p.page_title
    AND w.webrequest_source = 'upload' and w.year=2014 and w.month=9 and w.day=22 and w.hour=18
    AND cl.project = 'plwiki'
    AND p.project  = 'plwiki'
GROUP BY p.page_title
ORDER BY cnt desc
LIMIT 1000
;

Importing another project into existing tables

Just change the --hive-partition-value and drop the --hive-create-table flag:

# Import trwiki page table data
sqoop import                                                        \
  --connect jdbc:mysql://s1-analytics-slave.eqiad.wmnet/trwiki      \
  --verbose                                                         \
  --target-dir /tmp/$(mktemp -u -p '' -t ${USER}_sqoop_XXXXXX)      \
  --delete-target-dir                                               \
  --username=research --password $password                          \
  --query '
SELECT
  a.page_id AS page_id,
  CAST(a.page_title AS CHAR(255) CHARSET utf8) AS page_title,
  a.page_counter AS page_counter,
  a.page_is_redirect as page_is_redirect,
  a.page_is_new as page_is_new,
  a.page_len as page_len
FROM page a
WHERE $CONDITIONS
'                                                                   \
--split-by a.page_id                                                \
--hive-partition-key project                                        \
--hive-partition-value trwiki                                       \
--hive-import                                                       \
--hive-database otto                                                \
--hive-table page



# Import trwiki categorylinks table data
sqoop import                                                        \
  --connect jdbc:mysql://s1-analytics-slave.eqiad.wmnet/trwiki      \
  --verbose                                                         \
  --target-dir /tmp/$(mktemp -u -p '' -t ${USER}_sqoop_XXXXXX)      \
  --delete-target-dir                                               \
  --username=research --password $password                          \
  --query '
SELECT
  a.cl_from AS cl_from,
  cast(a.cl_to AS CHAR(255) CHARSET utf8) AS cl_to,
  cast(a.cl_sortkey AS CHAR(230) CHARSET utf8) AS cl_sortkey,
  a.cl_timestamp AS cl_timestamp,
  cast(a.cl_sortkey_prefix AS CHAR(255) CHARSET utf8) AS cl_sortkey_prefix,
  cast(a.cl_collation AS CHAR(32) CHARSET utf8) AS cl_collation,
  cast(a.cl_type AS CHAR(6) CHARSET utf8) AS cl_type
FROM categorylinks a
WHERE $CONDITIONS
'                                                                   \
--split-by a.cl_from                                                \
--hive-partition-key project                                        \
--hive-partition-value trwiki                                       \
--hive-import                                                       \
--hive-database otto                                                \
--hive-table categorylinks
'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment