Skip to content

Instantly share code, notes, and snippets.

@fblundun
Last active May 13, 2016 07:33
Show Gist options
  • Save fblundun/867b48d9510836531de79193f3f86d82 to your computer and use it in GitHub Desktop.
Save fblundun/867b48d9510836531de79193f3f86d82 to your computer and use it in GitHub Desktop.
Bad Rows Documentation

This is a draft pending the release of Release 81

How to fix up and reprocess invalid events

Overview

When a raw event fails Scala Hadoop Enrich validation, it is written out to a bad rows bucket in a format which looks something like this:

{
  "line": "2015-06-09\t09:56:09\tNRT12\t831\t211.14.8.250\tGET...",
  "errors": [{
    "level": "error",
    "message": "Could not find schema with key iglu:com.acme/myevent/jsonschema/1-0-0 in any repository"
  }]
}

The line field is the original TSV input to the enrichment process; the errors field is an array of errors describing why the event failed validation.

The Scala Hadoop Bad Rows jar lets you extract the line field and mutate it using custom JavaScript to fix whatever was wrong with it so you can reprocess it by running Scala Hadoop Enrich again.

A use case

Scala Hadoop Enrich will fail incoming events originating on a page with a noncompliant URL. So you might want to reprocess all bad rows which failed for this reason.

To do this, you need to write a process function which takes two arguments: the raw line TSV string and an array of error message strings. It should return null for bad rows which you don't want to reprocess, and a fixed-up raw line TSV string otherwise.

To do this you could use the following JavaScript:

function process(event, errors) {
	// Only reprocess if:
	// 1. there is only one validation error and
	// 2. the error references RFC 2396, which specifies what makes a URL valid.
	if (errors.length < 2 && /RFC 2396/.test(errors[0])) {
		var fields = tsvToArray(event);
		fields[9] = 'http://www.placeholder.com';
		return arrayToTsv(fields);
	} else {
		return null;
	}
}

The Snowplow raw event TSV format is based on the Cloudfront Access Log format, whose details are available here. The tenth field is the one which contains the URL of the web page which sent the event, so we need to extract that field and replace it with something which will pass validation (in this case http://www.placeholder.com). tsvToArray and arrayToTsv are helper functions which are automatically provided.

Finally, since your function needs to be passed to EMR as an argument, you need to base64-encode it. Our encoded script looks like this:

ZnVuY3Rpb24gcHJvY2VzcyhldmVudCwgZXJyb3JzKSB7CiAgICAvLyBPbmx5IHJlcHJvY2VzcyBpZjoKICAgIC8vIDEuIHRoZXJlIGlzIG9ubHkgb25lIHZhbGlkYXRpb24gZXJyb3IgYW5kCiAgICAvLyAyLiB0aGUgZXJyb3IgcmVmZXJlbmNlcyBSRkMgMjM5Niwgd2hpY2ggc3BlY2lmaWVzIHdoYXQgbWFrZXMgYSBVUkwgdmFsaWQuCiAgICBpZiAoZXJyb3JzLmxlbmd0aCA8IDIgJiYgL1JGQyAyMzk2Ly50ZXN0KGVycm9yc1swXSkpIHsKICAgICAgICB2YXIgZmllbGRzID0gdHN2VG9BcnJheShldmVudCk7CiAgICAgICAgZmllbGRzWzldID0gJ2h0dHA6Ly93d3cucGxhY2Vob2xkZXIuY29tJ1w7CiAgICAgICAgcmV0dXJuIGFycmF5VG9Uc3YoZmllbGRzKTsKICAgIH0gZWxzZSB7CiAgICAgICAgcmV0dXJuIG51bGw7CiAgICB9Cn0K

Determining the input

The bad rows bucket is organized like this:

s3://path/to/bad/
    run=2015-10-07-15-25-53/
        part-00000
        part-00001
        part-00002
        part-00003
    run=2015-10-08-15-25-53/
        part-00000
        part-00001
        part-00002
        part-00003
    run=2015-10-09-15-25-53/
        part-00000
        part-00001
        part-00002
        part-00003

In other words, all the bad rows created by a given enrichment job end up in the same bucket, and that bucket is timestamped based on when the job was begun.

The bad rows job takes as its --input argument a list of comma-separated S3 bucket patterns. The way these patterns work is described here. So to run the job on all bad rows from between the 6th of September and the 9th of September 2015, and also on all bad rows from 2016, you would use the following argument:

s3://path/to/bad/run=2015-09-0[6-9]-*,s3://path/to/bad/run=2016-*

Running the jar

Assuming you have the AWS CLI installed, here is an example of how to create a cluster and run the jar:

aws emr create-cluster --applications Name=Hadoop --ec2-attributes '{
	"KeyName":"yourkeyname",
	"InstanceProfile":"EMR_EC2_DefaultRole",
	"AvailabilityZone":"us-east-1d",
	"EmrManagedSlaveSecurityGroup":"sg-2f9aba4b",
	"EmrManagedMasterSecurityGroup":"sg-2e9aba4a"
}' --service-role EMR_DefaultRole --enable-debugging --release-label emr-4.1.0 --log-uri 's3n://my-badrows/newlogs/' --steps '[
{
	"Args":[
		"com.snowplowanalytics.hadoop.scalding.SnowplowBadRowsJob",
		"--input",
		"s3://my-out/enriched/bad/run=2015-12-*,s3://my-out/enriched/bad/run=2016-01-*",
		"--output",
		"s3://my-badrows/out",
		"--script",
		"ZnVuY3Rpb24gcHJvY2VzcyhldmVudCwgZXJyb3JzKSB7CiAgICAvLyBPbmx5IHJlcHJvY2VzcyBpZjoKICAgIC8vIDEuIHRoZXJlIGlzIG9ubHkgb25lIHZhbGlkYXRpb24gZXJyb3IgYW5kCiAgICAvLyAyLiB0aGUgZXJyb3IgcmVmZXJlbmNlcyBSRkMgMjM5Niwgd2hpY2ggc3BlY2lmaWVzIHdoYXQgbWFrZXMgYSBVUkwgdmFsaWQuCiAgICBpZiAoZXJyb3JzLmxlbmd0aCA8IDIgJiYgL1JGQyAyMzk2Ly50ZXN0KGVycm9yc1swXSkpIHsKICAgICAgICB2YXIgZmllbGRzID0gdHN2VG9BcnJheShldmVudCk7CiAgICAgICAgZmllbGRzWzldID0gJ2h0dHA6Ly93d3cucGxhY2Vob2xkZXIuY29tJ1w7CiAgICAgICAgcmV0dXJuIGFycmF5VG9Uc3YoZmllbGRzKTsKICAgIH0gZWxzZSB7CiAgICAgICAgcmV0dXJuIG51bGw7CiAgICB9Cn0K"
	],
	"Type":"CUSTOM_JAR",
	"ActionOnFailure":"CONTINUE",
	"Jar":"s3://snowplow-hosted-assets/3-enrich/hadoop-event-recovery/snowplow-hadoop-event-recovery-0.2.0-rc1.jar",
	"Properties":"",
	"Name":"Fix up bad rows"
}]' --name 'clicluster' --instance-groups '[
	{
		"InstanceCount":1,
		"InstanceGroupType":"MASTER",
		"InstanceType":"m1.medium",
		"Name":"MASTER"
	},
	{
		"InstanceCount":2,
		"InstanceGroupType":"CORE",
		"InstanceType":"m1.medium",
		"Name":"CORE"
	}
]'

This assumes that:

  • You have created an EC2 Key Pair named yourkeyname
  • Your bad rows bucket is s3://my-out/bad/enriched/
  • You have created a bucket called s3://my-badrows/newlogs for EMR logs
  • You have a bucket called s3://my-badrows into which the mutated bad rows will be written
  • You want to reprocess all bad rows for the months of December 2015 and January 2016 (represented by the patterns 2015-12-* and 2016-01-* respectively).

Checking the result

If the job runs without errors, the fixed-up raw events will be available in s3://my-badrows/out for reprocessing.

TODO: flesh this out

Caveats

Reprocessing bad rows in this way can cause duplicate events in a couple of ways.

First, an enrichment job can fail partway through having written out some bad rows. When the job is run again, those bad rows will be duplicated, and if you run Scala Hadoop Bad Rows on the output of both jobs, you will end up with duplicate raw events.

Second, a bad row may contain a POST request with multiple events in it, of which not all are necessarily bad. If you run Scala Hadoop Bad Rows on such a POST request and write out all the events it contains, you will end up with duplicate events.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment