How to aggregate search results over specific fields (Buckets Aggregations), calculate their properties (Metrics Aggregation) and filter buckets on their properties (Pipeline Aggregation).
This is the search scenario (what we would like to catch):
Find out a potential web sweep (an attacker looking for listening HTTP servers in the network). If a single IP try to connect on too amby hosts on the same port, it may indicates a suspicous activity.
Search for all documents with a dest_port field matching the value 80 over the past 3 days.
"size": 0,
"query": {
  "bool": {
    "must": [
      {
        "bool": {
          "must": [
             { "match": {"dest_port": 80  } }
          ]
        }
      }
    ],
      "filter": [
      {
        "range": {
         "@timestamp": {
           "gte": "now-3d/d",
           "lte": "now-1d/d"
          }
        }
      }
    ]
  }
},Aggregate using a Term Aggregation on the fields src_ip.keyword and dest_ip.keyword. The src_ip bucket will be named attacker_ip and the dest_ip bucket will be known as target_ip.
We obtain the set of target_ip by attacker_ip.
"aggs": {
  "attacker_ip": {
    "terms": {
      "field": "src_ip.keyword"
    },
    "aggs": {
      "target_ip": {
        "terms": {
          "field": "dest_ip.keyword"
        }
      }This calculation is made using a Cardinality Aggregation. The aggregation is performed under the attacker_ip one.
We obtain the number of target_ip by attacker_ip.
"target_ip_count": {
  "cardinality": {
    "field": "dest_ip.keyword"
  }
}This filter is made using a Pipeline Aggregation. The aggregation is performed under the attacker_ip one, after the target_ip and target_ip_count ones.
We keep only the attacker_ip buckets which have a target_ip_count value upper than 10.
"target_ip_bucket_filter": {
  "bucket_selector": {
    "buckets_path": {
      "totalTargetIP": "target_ip_count"
    },
    "script": "params.totalTargetIP > 10"
  }
}{
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "bool": {
            "must": [
               { "match": {"dest_port": 80  } }
            ]
          }
        }
      ],
        "filter": [
        {
          "range": {
           "@timestamp": {
             "gte": "now-3d/d",
             "lte": "now-1d/d"
            }
          }
        }
      ]
    }
  },
  "aggs": {
    "attacker_ip": {
      "terms": {
        "field": "src_ip.keyword"
      },
      "aggs": {
        "target_ip": {
          "terms": {
            "field": "dest_ip.keyword"
          }
        },
        "target_ip_count": {
          "cardinality": {
            "field": "dest_ip.keyword"
          }
        },
        "target_ip_bucket_filter": {
          "bucket_selector": {
            "buckets_path": {
              "totalTargetIP": "target_ip_count"
            },
            "script": "params.totalTargetIP > 10"
          }
        }
      }
    }
  }
}
Merged while Gist into a single Markdown document.
Updated code description.