Skip to content

Instantly share code, notes, and snippets.

@farhad-taran
Created August 18, 2022 19:47
Show Gist options
  • Save farhad-taran/e854aefe35b8dbcdf161f3d4e5f1ee9e to your computer and use it in GitHub Desktop.
Save farhad-taran/e854aefe35b8dbcdf161f3d4e5f1ee9e to your computer and use it in GitHub Desktop.
Analysing DynamoDb data using AWS Glue and Athena

running queries against a dynamodb instance is very hard and non performant if your intention is data analysis.

we can stream all data changes from a dynamodb instance into kinesis,s3,glue crawler and eventually athena. this approach allows us to keep an audit log of all data changes on a dynamoDb and query all these changes using athena.

if however we dont need to keep an audit log of all the changes and we simply need to run queries against the current data in dynamoDb then we can simply connect dynamoDb to aws glue using a glue crawler and athena.

below I will share both options:

  • audit log athena:
dynamodb --> kinesis --> s3 --> glue crawler --> glue db --> athena
locals {
  table-name = "test-table" #dynamodb table name
  athena-results-s3-name   = "${local.table-name}-analytics"
  athena-workgroup-name    = "${local.table-name}"
  glue-db-name             = "${local.table-name}-glue-db"
  glue-crawler-name        = "${local.table-name}-crawler"
  glue-crawler-role-name   = "${local.table-name}-crawler-role"
  glue-crawler-policy-name = "${local.table-name}-crawler"
  firehose-name            = "${local.table-name}-firehose"
  fireshose-s3-bucket-name = "${local.table-name}-firehose-s3-bucket"
  fireshose-log-group-name = "/aws/kinesisfirehose/${local.firehose-name}"
}

resource "aws_kms_key" "aws_kms_key" {
  description             = "KMS key for whole project"
  deletion_window_in_days = 10
}

##################################################################
# dynamodb kinesis stream
##################################################################

resource "aws_kinesis_stream" "aws_kinesis_stream" {
  name            = "${local.table-name}-data-stream"
  shard_count     = 1
  encryption_type = "KMS"
  kms_key_id      = aws_kms_key.aws_kms_key.arn
}

resource "aws_dynamodb_kinesis_streaming_destination" "aws_dynamodb_kinesis_streaming_destination" {
  stream_arn = aws_kinesis_stream.aws_kinesis_stream.arn
  table_name = local.table-name
}

##################################################################
# kinesis firehose
##################################################################

resource "aws_kinesis_firehose_delivery_stream" "aws_kinesis_firehose_delivery_stream" {
  name        = local.firehose-name
  destination = "extended_s3"

  extended_s3_configuration {
    role_arn   = aws_iam_role.aws_iam_role.arn
    bucket_arn = aws_s3_bucket.aws_s3_bucket.arn

    # Example prefix using partitionKeyFromQuery, applicable to JQ processor
    prefix              = var.kinsesis-s3-file-prefix
    error_output_prefix = "errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/"

    dynamic_partitioning_configuration {
      enabled = true
    }

    cloudwatch_logging_options {
      enabled         = true
      log_group_name  = aws_cloudwatch_log_group.aws_cloudwatch_log_group_firehose.name
      log_stream_name = aws_cloudwatch_log_stream.aws_cloudwatch_log_stream_firehose.name
    }

    # https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
    buffer_size = 64
    processing_configuration {
      enabled = "true"

      # Multi-record deaggregation processor example
      processors {
        type = "RecordDeAggregation"
        parameters {
          parameter_name  = "SubRecordType"
          parameter_value = "JSON"
        }
      }

      # New line delimiter processor example
      processors {
        type = "AppendDelimiterToRecord"
      }

      # JQ processor example
      processors {
        type = "MetadataExtraction"
        parameters {
          parameter_name  = "JsonParsingEngine"
          parameter_value = "JQ-1.6"
        }
        parameters {
          parameter_name  = "MetadataExtractionQuery"
          parameter_value = var.kinsesis-s3-file-key-extractor
        }
      }
    }
  }
}

resource "aws_cloudwatch_log_group" "aws_cloudwatch_log_group_firehose" {
  name              = local.fireshose-log-group-name
  retention_in_days = 7
}

resource "aws_cloudwatch_log_stream" "aws_cloudwatch_log_stream_firehose" {
  name           = "S3Delivery"
  log_group_name = aws_cloudwatch_log_group.aws_cloudwatch_log_group_firehose.name
}


resource "aws_s3_bucket" "aws_s3_bucket" {
  bucket = local.fireshose-s3-bucket-name
  acl    = "private"

  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        kms_master_key_id = aws_kms_key.aws_kms_key.arn
        sse_algorithm     = "aws:kms"
      }
    }
  }
}

resource "aws_iam_role" "aws_iam_role" {
  name = "${local.table-name}-analysis-role"

  assume_role_policy = jsonencode(
  {
    "Version" : "2012-10-17",
    "Statement" : [
      {
        "Action" : "sts:AssumeRole",
        "Principal" : {
          "Service" : "firehose.amazonaws.com"
        },
        "Effect" : "Allow",
        "Sid" : ""
      }
    ]
  })
  inline_policy {
    name = "${local.table-name}-analysis-policy"

    policy = jsonencode({
      Version = "2012-10-17"
      Statement = [
        {
          Action = [
            "logs:PutLogEvents"
          ]
          Effect   = "Allow"
          Resource = aws_cloudwatch_log_stream.aws_cloudwatch_log_stream_firehose.arn
        },
        {
          Action = [
            "kinesis:*",
          ]
          Effect   = "Allow"
          Resource = aws_kinesis_stream.aws_kinesis_stream.arn
        },
        {
          "Effect" : "Allow",
          "Action" : [
            "s3:AbortMultipartUpload",
            "s3:GetBucketLocation",
            "s3:GetObject",
            "s3:ListBucket",
            "s3:ListBucketMultipartUploads",
            "s3:PutObject"
          ],
          "Resource" : [
            aws_s3_bucket.aws_s3_bucket.arn,
            "${aws_s3_bucket.aws_s3_bucket.arn}/*"
          ]
        },
        {
          "Effect" : "Allow",
          "Action" : [
            "kms:*",
          ],
          "Resource" : [
            aws_kms_key.aws_kms_key.arn
          ],
        },
      ]
    })
  }

}

##################################################################
# glue
##################################################################

resource "aws_glue_catalog_database" "aws_glue_catalog_database" {
  name = local.glue-db-name
}

resource "aws_glue_crawler" "aws_glue_crawler" {
  database_name = aws_glue_catalog_database.aws_glue_catalog_database.name
  name          = local.glue-crawler-name
  role          = aws_iam_role.aws_iam_role_glue_crawler.arn

  configuration = jsonencode(
  {
    "Version" : 1.0
    CrawlerOutput = {
      Partitions = { AddOrUpdateBehavior = "InheritFromTable" }
    }
  }
  )

  s3_target {
    path = "s3://${aws_s3_bucket.aws_s3_bucket.bucket}"
  }
}

resource "aws_iam_role" "aws_iam_role_glue_crawler" {
  name = local.glue-crawler-role-name

  assume_role_policy = jsonencode(
  {
    "Version" : "2012-10-17",
    "Statement" : [
      {
        "Action" : "sts:AssumeRole",
        "Principal" : {
          "Service" : "glue.amazonaws.com"
        },
        "Effect" : "Allow",
        "Sid" : ""
      }
    ]
  }
  )
}

resource "aws_iam_role_policy" "aws_iam_role_policy_glue_crawler" {
  name = local.glue-crawler-policy-name
  role = aws_iam_role.aws_iam_role_glue_crawler.id
  policy = jsonencode(
  {
    "Version" : "2012-10-17",
    "Statement" : [
      {
        "Effect" : "Allow",
        "Action" : [
          "*"
        ],
        "Resource" : [
          "*"
        ]
      }
    ]
  }
  )
}

##################################################################
# athena
##################################################################
resource "aws_s3_bucket" "aws_s3_bucket_analytics" {
  bucket = local.athena-results-s3-name
  acl    = "private"

  versioning {
    enabled = true
  }

  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        kms_master_key_id = aws_kms_key.aws_kms_key.arn
        sse_algorithm     = "aws:kms"
      }
    }
  }
}

resource "aws_athena_workgroup" "aws_athena_workgroup" {
  name = local.athena-workgroup-name

  configuration {
    enforce_workgroup_configuration    = true
    publish_cloudwatch_metrics_enabled = true

    result_configuration {
      output_location = "s3://${aws_s3_bucket.aws_s3_bucket_analytics.bucket}/output/"

      encryption_configuration {
        encryption_option = "SSE_KMS"
        kms_key_arn       = aws_kms_key.aws_kms_key.arn
      }
    }
  }
}
  • current data athena:
dynamodb --> glue crawler --> glue db --> athena
locals {
  table-name = var.table-name
  athena-results-s3-name   = "${local.table-name}-analytics"
  athena-workgroup-name    = "${local.table-name}"
  glue-db-name             = "${local.table-name}-glue-db"
  glue-crawler-name        = "${local.table-name}-crawler"
  glue-crawler-role-name   = "${local.table-name}-crawler-role"
  glue-crawler-policy-name = "${local.table-name}-crawler"
}

resource "aws_kms_key" "aws_kms_key" {
  description             = "KMS key for whole project"
  deletion_window_in_days = 10
}

##################################################################
# glue
##################################################################

resource "aws_glue_catalog_database" "aws_glue_catalog_database" {
  name = local.glue-db-name
}

resource "aws_glue_crawler" "aws_glue_crawler" {
  database_name = aws_glue_catalog_database.aws_glue_catalog_database.name
  name          = local.glue-crawler-name
  role          = aws_iam_role.aws_iam_role_glue_crawler.arn

  configuration = jsonencode(
  {
    "Version" : 1.0
    CrawlerOutput = {
      Partitions = { AddOrUpdateBehavior = "InheritFromTable" }
    }
  }
  )

  dynamodb_target {
    path = local.table-name
  }
}

resource "aws_iam_role" "aws_iam_role_glue_crawler" {
  name = local.glue-crawler-role-name

  assume_role_policy = jsonencode(
  {
    "Version" : "2012-10-17",
    "Statement" : [
      {
        "Action" : "sts:AssumeRole",
        "Principal" : {
          "Service" : "glue.amazonaws.com"
        },
        "Effect" : "Allow",
        "Sid" : ""
      }
    ]
  }
  )
}

resource "aws_iam_role_policy" "aws_iam_role_policy_glue_crawler" {
  name = local.glue-crawler-policy-name
  role = aws_iam_role.aws_iam_role_glue_crawler.id
  policy = jsonencode(
  {
    "Version" : "2012-10-17",
    "Statement" : [
      {
        "Effect" : "Allow",
        "Action" : [
          "*"
        ],
        "Resource" : [
          "*"
        ]
      }
    ]
  }
  )
}

##################################################################
# athena
##################################################################
resource "aws_s3_bucket" "aws_s3_bucket_analytics" {
  bucket = local.athena-results-s3-name
  acl    = "private"

  versioning {
    enabled = true
  }

  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        kms_master_key_id = aws_kms_key.aws_kms_key.arn
        sse_algorithm     = "aws:kms"
      }
    }
  }
}

resource "aws_athena_workgroup" "aws_athena_workgroup" {
  name = local.athena-workgroup-name

  configuration {
    enforce_workgroup_configuration    = true
    publish_cloudwatch_metrics_enabled = true

    result_configuration {
      output_location = "s3://${aws_s3_bucket.aws_s3_bucket_analytics.bucket}/output/"

      encryption_configuration {
        encryption_option = "SSE_KMS"
        kms_key_arn       = aws_kms_key.aws_kms_key.arn
      }
    }
  }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment