Skip to content

Instantly share code, notes, and snippets.

@stettix
Created November 28, 2018 09:59
Show Gist options
  • Save stettix/201a5030e2aa7f5c1d507ccd27a2e746 to your computer and use it in GitHub Desktop.
Save stettix/201a5030e2aa7f5c1d507ccd27a2e746 to your computer and use it in GitHub Desktop.
import spark.implicits._
// Input data
val identity =
spark
.table("clean.identity")
.select("id", "account_created", "last_active_date", "primary_email_address")
.cache()
val consentSummary = spark
.table("clean.consent_summary")
.select("identity_id", "jobs")
.cache()
val brazeJobConsentUsersFull = spark.read
.option("header", true)
.csv("s3://data-tech-temp/jan/braze-investigation/Jan_Test_export-2.csv")
.cache()
val brazeJobConsentUsers = brazeJobConsentUsersFull
.select($"user_id".as("external_id"), $"Appboy ID".as("braze_id"), $"email", $"consents_jobs_opted_in")
val dlJobConsentUsersToUploadQuery =
"""select *
|from clean.braze_users
|where consents_jobs_opted_in = true
""".stripMargin
val dlJobConsentUsersToUpload = spark.sql(dlJobConsentUsersToUploadQuery).cache()
// Diffs
val usersInBrazeNotDl = brazeJobConsentUsers
.join(dlJobConsentUsersToUpload.select($"external_id.id".as("external_id")), Seq("external_id"), "left_anti")
.cache()
val usersInDlNotBraze =
dlJobConsentUsersToUpload
.select($"external_id.id".as("external_id"), $"email")
.join(brazeJobConsentUsers, Seq("external_id"), "left_anti")
.cache()
// Investigating diffs.
// 1. Join extraneous users in Braze with identity to see how many match up
val brazeExtraUsersVsIdentity =
usersInBrazeNotDl
.cache()
.join(identity, usersInBrazeNotDl("external_id") === identity("id"), "left_outer")
val numExtraBrazeUsersThatAreNotInIdentity = brazeExtraUsersVsIdentity.where("id is null").count()
// > 20
val numExtraBrazeUsersThatAreInIdentity = brazeExtraUsersVsIdentity.where("id is not null").count()
// > 1149
// Find why the 1149 that are still in 'identity' have job consents in Braze.
// First, check their job consent value in clean.consent_summary
val knownExtraBrazeUsersWithConsents =
brazeExtraUsersVsIdentity
.where("id is not null")
.join(consentSummary, brazeExtraUsersVsIdentity("id") === consentSummary("identity_id"), "left_outer")
.cache()
val numExtraBrazeUsersWithAnyFormOfJobConsents =
knownExtraBrazeUsersWithConsents.where("jobs is not null")
// > 0
// So ALL of these users had 'null' in consent_summary.
// Had they given job consent before?
// Have a look in older versions of clean.consents to find out.
def consentsDataFrame(dateStr: String) = spark.sql(s"""
|select identity_id, consents.jobs.consented
|from clean.consents
|where export_date = date '$dateStr'""".stripMargin)
// The first date we have braze_users data for is 2018-11-18, so try this first.
val oldConsents = consentsDataFrame("2018-11-18")
.cache()
val knownExtraBrazeUsersWithOldConsents =
brazeExtraUsersVsIdentity
.where("id is not null")
.join(oldConsents, brazeExtraUsersVsIdentity("id") === oldConsents("identity_id"), "left_outer")
.cache()
val numExtraBrazeUsersWithOldConsents = knownExtraBrazeUsersWithOldConsents.where("consented = true").count()
// > 1143
// Also: 6 nulls in at this date
// So, these extra users DID have job consents at some point but don't in the latest data.
// Now the question is: why did we not action deletions for the ones who removed their consent?
// To answer this, plot the number of consents present over the date range.
val dateRange = Stream
.range(0, 100)
.map(n => LocalDate.of(2018, 11, 18).plusDays(n))
.takeWhile(_ isBefore LocalDate.of(2018, 11, 27))
.toList
def numConsentsForDate(date: LocalDate): Long = {
val consentsForDate = consentsDataFrame(date.toString)
val usersWithConsentsForDate =
brazeExtraUsersVsIdentity
.where("id is not null")
.join(consentsForDate, brazeExtraUsersVsIdentity("id") === consentsForDate("identity_id"), "left_outer")
val numActualConsentsForDate = usersWithConsentsForDate.where("consented = true").count()
println(s"For date $date, $numActualConsentsForDate of the users had a value of 'true' for the job consent")
numActualConsentsForDate
}
val numConsentsByDate = dateRange.map(date => date -> numConsentsForDate(date))
// Dump counts as CSV
println(s"date,count")
numConsentsByDate.foreach {
case (date, count) =>
println(s"$date,$count")
}
// date,count
// 2018-11-18,1143
// 2018-11-19,1146
// 2018-11-20,1146
// 2018-11-21,1148
// 2018-11-22,1148
// 2018-11-23,991
// 2018-11-24,700
// 2018-11-25,269
// 2018-11-26,3
// So we see that these users HAVE been removing their consent steadily over this date range.
// Yet somehow this has not been reflected in Braze.
// If all the removed consents had happened on a single date, we might suspect that the job had a one-off failure.
// But as there should have been deletions happening on many dates, there seems to be something systematic
// wrong with deleting attributes off users.
// A hypothesis for what went wrong:
//
// - We're not handling the case when a consent flag is 'null' (as opposed to false).
// There are two options for what to do in this case: send a 'null' value to Braze, which will delete the user property,
// or infer a 'false' value and send this to Braze. What does our code do in this case?
// First, check what the job consent values are in braze_users for these 1,143 users:
val interestingIdentityIds = brazeExtraUsersVsIdentity
.select("id")
.where("id is not null")
val brazeUsers = spark
.table("clean.braze_users")
.select($"external_id.id".as("external_id"), $"consents_jobs_opted_in")
.cache()
val brazeUsersContentForInterestingIds =
brazeUsers
.join(interestingIdentityIds, interestingIdentityIds("id") === brazeUsers("external_id"))
brazeUsersContentForInterestingIds.where("consents_jobs_opted_in is null").count()
// > 0
// There are actually NO null values for consents_jobs_opted_in in clean.braze_users
// The values uploaded to braze are always Some(true) or Some(false), never None.
// (See BrazeUserAttributesSource.createUserAttributes())
// So that hypothesis can't hold.
// Some other candidate hypotheses:
//
// - Braze aren't correctly handling these updates
// - We didn't apply diffs for these users (the job failing to handle some diffs)
// To test these, it would be useful to know if there were any users that DID change their job consent
// from true to null/false in the same time period and see if ANY of these were updated correctly in Braze.
// First, find all users who DID give job consent on 2018-11-18 and DIDN'T give job consent by 2018-11-26
val consentsAtStart = consentsDataFrame("2018-11-18")
.withColumnRenamed("consented", "consented_at_start")
.cache()
val consentsAtEnd = consentsDataFrame("2018-11-26")
.withColumnRenamed("consented", "consented_at_end")
.cache()
val changedConsents = consentsAtStart
.join(consentsAtEnd, "identity_id")
.where("consented_at_start = true and (consented_at_end = false or consented_at_end is null)")
val numChangedConsents = changedConsents.count()
// > 1399
val changedConsentsToFalse = consentsAtStart
.join(consentsAtEnd, "identity_id")
.where("consented_at_start = true and consented_at_end = false")
val numChangedConsentsToFalse = changedConsentsToFalse.count()
// > 1399
val changedConsentsToNull = consentsAtStart
.join(consentsAtEnd, "identity_id")
.where("consented_at_start = true and consented_at_end is null")
val numChangedConsentsToNull = changedConsentsToNull.count()
// > 0
// Now for these users, see how many of them have the correct job consent setting in Braze.
val brazeStateForChangedConsents = brazeJobConsentUsers.join(
changedConsents,
brazeJobConsentUsers("external_id") === changedConsents("identity_id"),
"full_outer")
val numMatchedAtStart = brazeStateForChangedConsents.where("consents_jobs_opted_in = consented_at_start").count()
// > 1141
val numMatchingAtEnd = brazeStateForChangedConsents.where("consents_jobs_opted_in = consented_at_end")
// > 0
// This suggests that NONE of the cases where users change their consent to false have actually been picked up in Braze.
// Which suggests this is not an intermittent problem.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment