Let's import the page and category links table from the plwiki database.
Sqoop will need JAVA_HOME set. We (ops) should make it so this is set for all user shells. #TODO
export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk-amd64We'll use the page table from plwiki as our example The page table looks like:
mysql -uresearch -p -hanalytics-store.eqiad.wmnet -e 'show create table page' plwiki
CREATE TABLE `page` (
`page_id` int(8) unsigned NOT NULL AUTO_INCREMENT,
`page_namespace` int(11) NOT NULL DEFAULT '0',
`page_title` varbinary(255) NOT NULL DEFAULT '',
`page_restrictions` tinyblob NOT NULL,
`page_counter` bigint(20) unsigned NOT NULL DEFAULT '0',
`page_is_redirect` tinyint(1) unsigned NOT NULL DEFAULT '0',
`page_is_new` tinyint(1) unsigned NOT NULL DEFAULT '0',
`page_random` double unsigned NOT NULL DEFAULT '0',
`page_touched` varbinary(14) NOT NULL DEFAULT '',
`page_links_updated` varbinary(14) DEFAULT NULL,
`page_latest` int(8) unsigned NOT NULL DEFAULT '0',
`page_len` int(8) unsigned NOT NULL DEFAULT '0',
`page_no_title_convert` tinyint(1) NOT NULL DEFAULT '0',
`page_content_model` varbinary(32) DEFAULT NULL,
PRIMARY KEY (`page_id`),
UNIQUE KEY `name_title` (`page_namespace`,`page_title`),
KEY `page_random` (`page_random`),
KEY `page_len` (`page_len`),
KEY `page_redirect_namespace_len` (`page_is_redirect`,`page_namespace`,`page_len`)
) ENGINE=TokuDB AUTO_INCREMENT=3267239 DEFAULT CHARSET=binary `compression`='tokudb_zlib'Sqoop won't be able to automatically import this table into hive, as it has a hard time with varbinaries and blobs and other types. We have to write a custom query to import the table. We will skip importing blobs, and cast varbinaries to strings. We'll use the following query:
SELECT
-- List all column names you want imported.
-- Sqoop will automatically create a table with these columns.
a.page_id AS page_id,
CAST(a.page_title AS CHAR(255) CHARSET utf8) AS page_title, -- we want page_title, so cast it to a CHAR
a.page_counter AS page_counter,
a.page_is_redirect as page_is_redirect,
a.page_is_new as page_is_new,
a.page_len as page_len
-- skip everything else...?
FROM page a
-- $CONDITIONS allows Sqoop to split the import into multiple mappers during the import.
-- This will be much faster if you are importing a large table.
WHERE $CONDITIONS;We'll now pass this query to the sqoop import command.
sqoop import \
--connect jdbc:mysql://s1-analytics-slave.eqiad.wmnet/plwiki \
--verbose \
--target-dir /tmp/$(mktemp -u -p '' -t ${USER}_sqoop_XXXXXX) \
--delete-target-dir \
--username=research --password $password \
--query '
SELECT
a.page_id AS page_id,
CAST(a.page_title AS CHAR(255) CHARSET utf8) AS page_title,
a.page_counter AS page_counter,
a.page_is_redirect as page_is_redirect,
a.page_is_new as page_is_new,
a.page_len as page_len
FROM page a
WHERE $CONDITIONS
' \
--split-by a.page_id \
--hive-partition-key project \
--hive-partition-value plwiki \
--hive-import \
--hive-database otto \
--create-hive-table \
--hive-table page--target-dir will be a temporary HDFS directory used during the import.
--split-by tells Sqoop what field on which to split the data for mappers. This should probably just be the primary key.
--hive-partition-key and --hive-partition-value allow you to set a static Hive partition for this import. I've decided to store all page data into this one table, and hardcode the partition project=plwiki for this import. This will allow you to select from the page table and specify the project you want to work with in the where clause.
The remaining hive options tell Sqoop that you want the data imported from MySQL to be stored in a Hive table.
Let's see if it worked!
hive --database otto -e 'show create table page';
...
CREATE TABLE `page0`(
`page_id` bigint,
`page_title` string,
`page_counter` bigint,
`page_is_redirect` boolean,
`page_is_new` boolean,
`page_len` bigint)
COMMENT 'Imported by sqoop on 2014/09/23 16:54:52'
PARTITIONED BY (
`project` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\u0001'
LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://analytics-hadoop/user/hive/warehouse/otto.db/page0'
TBLPROPERTIES (
'numPartitions'='1',
'numFiles'='5',
'transient_lastDdlTime'='1411491294',
'totalSize'='2777',
'numRows'='0',
'rawDataSize'='0')hive --database otto -e 'select page_title from page limit 10';
...
Aromorfoza
Azot
Amedeo_Avogadro
Uwierzytelnienie
Autoryzacja
Alfabet
AMP
Wieloprocesorowość_asynchroniczna
Apla
AsemblerSame deal.
First our MySQL query:
SELECT
a.cl_from AS cl_from,
cast(a.cl_to AS CHAR(255) CHARSET utf8) AS cl_to,
cast(a.cl_sortkey AS CHAR(230) CHARSET utf8) AS cl_sortkey,
a.cl_timestamp AS cl_timestamp,
cast(a.cl_sortkey_prefix AS CHAR(255) CHARSET utf8) AS cl_sortkey_prefix,
cast(a.cl_collation AS CHAR(32) CHARSET utf8) AS cl_collation,
cast(a.cl_type AS CHAR(6) CHARSET utf8) AS cl_type
FROM categorylinks a
WHERE $CONDITIONS
;And, now to sqoop it:
sqoop import \
--connect jdbc:mysql://s1-analytics-slave.eqiad.wmnet/plwiki \
--verbose \
--target-dir /tmp/$(mktemp -u -p '' -t ${USER}_sqoop_XXXXXX) \
--delete-target-dir \
--username=research --password $password \
--query '
SELECT
a.cl_from AS cl_from,
cast(a.cl_to AS CHAR(255) CHARSET utf8) AS cl_to,
cast(a.cl_sortkey AS CHAR(230) CHARSET utf8) AS cl_sortkey,
a.cl_timestamp AS cl_timestamp,
cast(a.cl_sortkey_prefix AS CHAR(255) CHARSET utf8) AS cl_sortkey_prefix,
cast(a.cl_collation AS CHAR(32) CHARSET utf8) AS cl_collation,
cast(a.cl_type AS CHAR(6) CHARSET utf8) AS cl_type
FROM categorylinks a
WHERE $CONDITIONS
' \
--split-by a.cl_from \
--hive-partition-key project \
--hive-partition-value plwiki \
--hive-import \
--hive-database otto \
--create-hive-table \
--hive-table categorylinksLet's check it:
hive --database otto -e 'show create table categorylinks';
...
CREATE TABLE `categorylinks0`(
`cl_from` bigint,
`cl_to` string,
`cl_sortkey` string,
`cl_timestamp` string,
`cl_sortkey_prefix` string,
`cl_collation` string,
`cl_type` string)
COMMENT 'Imported by sqoop on 2014/09/23 17:14:40'
PARTITIONED BY (
`project` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\u0001'
LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://analytics-hadoop/user/hive/warehouse/otto.db/categorylinks0'
TBLPROPERTIES (
'numPartitions'='1',
'numFiles'='5',
'transient_lastDdlTime'='1411492482',
'totalSize'='5198',
'numRows'='0',
'rawDataSize'='0')hive --database otto -e 'select distinct(cl_to), cl_type from categorylinks where project="plwiki" and cl_type = "file" limit 10';
...
Byłe_ilustracje_na_medal file
Grafiki_do_przetłumaczenia file
Herby_miast_ukraińskich file
Ilustracje_na_medal file
Lokalnie_załadowane_pliki file
Pliki_o_nieznanym_statusie_prawnym file
Pliki_oczekujące_na_przeniesienie_do_Commons file
Portal_Chrześcijaństwo/Ilustracja_miesiąca file
Portal_Katolicyzm/Ilustracja_tygodnia fileCool!
hive --database otto
SELECT
p.page_title, count(*) as cnt
FROM page p
INNER JOIN
categorylinks cl ON cl.cl_from = p.page_id
AND cl.cl_type = 'file'
INNER JOIN
wmf_raw.webrequest w ON regexp_extract(w.uri_path, '^.*/(.+)$', 1) = p.page_title
AND w.webrequest_source = 'upload' and w.year=2014 and w.month=9 and w.day=22 and w.hour=18
AND cl.project = 'plwiki'
AND p.project = 'plwiki'
GROUP BY p.page_title
ORDER BY cnt desc
LIMIT 1000
;Just change the --hive-partition-value and drop the --hive-create-table flag:
# Import trwiki page table data
sqoop import \
--connect jdbc:mysql://s1-analytics-slave.eqiad.wmnet/trwiki \
--verbose \
--target-dir /tmp/$(mktemp -u -p '' -t ${USER}_sqoop_XXXXXX) \
--delete-target-dir \
--username=research --password $password \
--query '
SELECT
a.page_id AS page_id,
CAST(a.page_title AS CHAR(255) CHARSET utf8) AS page_title,
a.page_counter AS page_counter,
a.page_is_redirect as page_is_redirect,
a.page_is_new as page_is_new,
a.page_len as page_len
FROM page a
WHERE $CONDITIONS
' \
--split-by a.page_id \
--hive-partition-key project \
--hive-partition-value trwiki \
--hive-import \
--hive-database otto \
--hive-table page
# Import trwiki categorylinks table data
sqoop import \
--connect jdbc:mysql://s1-analytics-slave.eqiad.wmnet/trwiki \
--verbose \
--target-dir /tmp/$(mktemp -u -p '' -t ${USER}_sqoop_XXXXXX) \
--delete-target-dir \
--username=research --password $password \
--query '
SELECT
a.cl_from AS cl_from,
cast(a.cl_to AS CHAR(255) CHARSET utf8) AS cl_to,
cast(a.cl_sortkey AS CHAR(230) CHARSET utf8) AS cl_sortkey,
a.cl_timestamp AS cl_timestamp,
cast(a.cl_sortkey_prefix AS CHAR(255) CHARSET utf8) AS cl_sortkey_prefix,
cast(a.cl_collation AS CHAR(32) CHARSET utf8) AS cl_collation,
cast(a.cl_type AS CHAR(6) CHARSET utf8) AS cl_type
FROM categorylinks a
WHERE $CONDITIONS
' \
--split-by a.cl_from \
--hive-partition-key project \
--hive-partition-value trwiki \
--hive-import \
--hive-database otto \
--hive-table categorylinks
'