Skip to content

Instantly share code, notes, and snippets.

@robertainslie
Created April 16, 2026 18:55
Show Gist options
  • Select an option

  • Save robertainslie/f09d2436f2271a71ae0e747938662fdb to your computer and use it in GitHub Desktop.

Select an option

Save robertainslie/f09d2436f2271a71ae0e747938662fdb to your computer and use it in GitHub Desktop.
Custom HubSpot Company Dedupe
# Copy this file to .env and fill in your HubSpot private app token.
# Never commit .env to version control.
HUBSPOT_API_KEY=pat-na1-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx

HubSpot Company Deduplication Script

Merges duplicate HubSpot companies into a designated primary record while:

  • Keeping all existing data on the primary intact
  • Backfilling only empty fields on the primary with data from the duplicate
  • Transferring all associated contacts, deals, and activities via HubSpot's merge API
  • Writing a full audit trail to a results CSV and log file

Table of contents

  1. Step 1 — Install Python
  2. Step 2 — Set up your project folder
  3. Step 3 — Install the required library
  4. Step 4 — Set up your HubSpot token
  5. Step 5 — Prepare your CSV file
  6. Step 6 — Run the script
  7. How the merge works
  8. Three modes in detail
  9. Common flags
  10. Output files
  11. Recommended workflow
  12. Troubleshooting

Step 1 — Install Python

Python is the programming language the script is written in. You need version 3.9 or newer.

Check if Python is already installed

Open a Terminal window and run:

python3 --version

How to open Terminal:

  • Mac: Press Command + Space, type Terminal, press Enter
  • Windows: Press Windows key, type cmd or PowerShell, press Enter

If you see something like Python 3.11.4, you're good — skip to Step 2.

If you see command not found or a version older than 3.9, follow the install steps below.

Install Python on Mac

  1. Go to python.org/downloads
  2. Click the big yellow Download Python button
  3. Open the downloaded .pkg file and follow the installer prompts
  4. When it finishes, close and reopen Terminal, then run python3 --version to confirm

Install Python on Windows

  1. Go to python.org/downloads
  2. Click the big yellow Download Python button
  3. Open the downloaded .exe file
  4. Important: On the first screen, check the box that says "Add Python to PATH" before clicking Install
  5. Click Install Now and follow the prompts
  6. When it finishes, close and reopen your command prompt, then run python3 --version to confirm

Step 2 — Set up your project folder

You need a folder on your computer to hold the script and your CSV files. Here's how to create one and navigate to it in Terminal.

Create the folder

You can create the folder the normal way (right-click on your Desktop → New Folder) and name it something like hubspot-dedup. Or do it directly in Terminal:

Mac:

mkdir ~/Desktop/hubspot-dedup

Windows (Command Prompt):

mkdir %USERPROFILE%\Desktop\hubspot-dedup

Download the script

Save the file dedup_companies.py into that folder. If you received it as an attachment, move it there now. If you're copying it from somewhere, make sure it saves with the .py extension (not .py.txt).

Navigate to the folder in Terminal

Terminal always has a "current location" — you need to be inside your project folder before running the script.

Mac:

cd ~/Desktop/hubspot-dedup

Windows:

cd %USERPROFILE%\Desktop\hubspot-dedup

What is cd? It stands for "change directory." Think of it like double-clicking a folder to open it, but in the terminal.

To confirm you're in the right place, run:

ls

(Mac) or

dir

(Windows) — you should see dedup_companies.py listed.

Tip: You can also drag the folder into Terminal after typing cd (with a space) and it will fill in the path automatically on Mac.


Step 3 — Install the required library

The script uses a Python library called requests to communicate with HubSpot's API. You only need to do this once.

It's best practice to use a virtual environment — a self-contained Python installation just for this project that won't interfere with anything else on your computer.

Create and activate a virtual environment

Mac:

python3 -m venv venv
source venv/bin/activate

Windows:

python3 -m venv venv
venv\Scripts\activate

After activation your terminal prompt will change to show (venv) at the start — that means it worked.

Every time you open a new Terminal window to run this script, you'll need to cd into the folder and run the activate command again. The (venv) prefix reminds you when it's active.

Install dependencies

With the virtual environment active, run:

pip install requests python-dotenv

You should see output ending with Successfully installed .... You only need to do this once.


Step 4 — Set up your HubSpot token

The script needs permission to access your HubSpot account. You grant that by creating a Private App in HubSpot and copying its token.

Create the token

  1. Log into HubSpot and click the Settings gear icon (top right)
  2. In the left sidebar, go to Integrations → Private Apps
  3. Click Create a private app
  4. Give it a name like Company Dedup Script
  5. Click the Scopes tab and search for and enable these three scopes:
Scope Why it's needed
crm.objects.companies.read Read company data
crm.objects.companies.write Merge and update companies
crm.schemas.companies.read Read property definitions
  1. Click Create appContinue creating
  2. On the next screen, click Show token and copy the full token — it starts with pat-

Keep this token private. It grants access to your HubSpot data. Don't paste it into emails or shared documents.

Add the token to your .env file

In your hubspot-dedup folder you'll find a file called .env.example. Make a copy of it named .env:

Mac:

cp .env.example .env

Windows:

copy .env.example .env

Open .env in any text editor (Notepad, TextEdit, VS Code) and replace the placeholder with your actual token:

HUBSPOT_API_KEY=pat-na1-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx

Save the file. The script reads this automatically every time it runs — you don't need to do anything else.

Why a .env file? It keeps your token in one place on your machine and out of your terminal history. The .gitignore in this folder ensures the file is never accidentally shared if you ever put this folder in version control.


Step 5 — Prepare your CSV file

The script has three modes, each accepting a different CSV format. Choose the one that matches what you have.

Place your CSV file in the same hubspot-dedup folder as the script.

Mode 1 — pairs: You already know which record to keep

Use this when you have a spreadsheet where you've already identified which company ID is the primary (to keep) and which is the duplicate (to merge in).

CSV format — two columns, one pair per row:

primary_company_id,duplicate_company_id
12345678,98765432
11111111,22222222
33333333,44444444

Save this file as something like pairs.csv in your project folder.


Mode 2 — export: You have a full HubSpot export

Use this when you've exported your companies from HubSpot and want the script to automatically find duplicates by matching company names. It will keep the oldest record for each name.

How to export from HubSpot:

  1. Go to CRM → Companies
  2. Optionally filter to just the affected records
  3. Click Actions or the export icon (top right of the table) → Export
  4. Choose CSV format
  5. Make sure these columns are included: Record ID, Company name, Create Date
  6. Download and move the file into your project folder

The script expects HubSpot's default column names. If your export uses different headers, see the flags section below.


Mode 3 — search: You have a list of records to keep, want the script to find duplicates

Use this when you have a list of the specific company IDs you want to keep, and you want the script to search HubSpot for any other companies with the same name.

CSV format — record ID and company name:

record_id,name
12345678,Acme Corp
11111111,Globex Industries

Save this as something like primaries.csv in your project folder.


Step 6 — Run the script

Make sure you have:

  • Terminal open and navigated to your project folder (cd ~/Desktop/hubspot-dedup)
  • Virtual environment active (you see (venv) in your prompt)
  • Your .env file exists in the folder and contains your HUBSPOT_API_KEY
  • Your CSV file in the same folder

Always do a dry run first

A dry run shows you exactly what the script would do without making any changes to HubSpot. Always run this first and review the output before running live.

Mode 1 — pairs:

python3 dedup_companies.py pairs pairs.csv --dry-run

Mode 2 — export:

python3 dedup_companies.py export hs_companies_export.csv --dry-run

Mode 3 — search:

python3 dedup_companies.py search primaries.csv --dry-run

Read through the output carefully. For each pair you should see which company is primary, which is the duplicate, and what fields (if any) would be backfilled.

Run live

Once you're happy with the dry run output, remove --dry-run to execute the merges:

python3 dedup_companies.py pairs pairs.csv

The script will print progress to the screen and write a full log to dedup_companies.log in the same folder.

Passing the token inline (optional override)

If you'd rather not use a .env file, you can pass the token directly on any command with --token:

python3 dedup_companies.py pairs pairs.csv --token pat-na1-xxxxxxxx --dry-run

When both are present, --token takes precedence over .env.


How the merge works

For every primary/duplicate pair the script:

  1. Fetches all properties from both the primary and duplicate company
  2. Identifies backfill fields — properties that are blank on the primary but have a value on the duplicate
  3. Calls the merge API — this carries all contacts, deals, and activities to the primary; the primary's own non-empty values win on any conflict
  4. Updates the primary with the backfill values from step 2, filling gaps without overwriting anything

The script uses HubSpot's official merge endpoint and intentionally avoids HubSpot's built-in duplicate tool.


Three modes in detail

Mode 1 — pairs

You control exactly which ID is primary and which is the duplicate. No name matching happens.

python3 dedup_companies.py pairs pairs.csv

If your CSV uses different column names than the defaults:

python3 dedup_companies.py pairs pairs.csv \
  --primary-col "Keep ID" \
  --duplicate-col "Merge ID"

Mode 2 — export

Groups all rows by company name (case-insensitive). For every group with 2 or more records, the oldest record by Create Date becomes the primary and all others are merged into it.

python3 dedup_companies.py export hs_companies_export.csv

If your export file uses different column headers:

python3 dedup_companies.py export export.csv \
  --id-col "Record ID" \
  --name-col "Name" \
  --date-col "Create date"

Why oldest = primary: The oldest record is most likely to have the most complete history and the most existing associations. If you need a different rule (e.g., keep the record with the most contacts), use pairs mode with a pre-processed CSV instead.


Mode 3 — search

For each row in your CSV the script searches HubSpot for all companies with the exact same name. Any match whose ID differs from your specified primary is treated as a duplicate.

python3 dedup_companies.py search primaries.csv

If your CSV uses different column names:

python3 dedup_companies.py search primaries.csv \
  --id-col "Company ID" \
  --name-col "Company Name"

Rate limit note: HubSpot's Search API allows ~4 requests per second, which is a separate, stricter limit from general API calls. For a list of 100 companies, the search phase alone takes at least 25 seconds. The script shows an estimated wait time when it starts.


Common flags (all modes)

Flag Default Description
--token PAT value from .env Your HubSpot private app token
--dry-run off Preview every action without making any changes
--results-csv FILE dedup_results.csv Name of the output results file

Output files

After running, the script creates two files in your project folder:

dedup_results.csv

A spreadsheet with one row per pair processed:

Column Description
primary_id The company ID that was kept
duplicate_id The company ID that was merged in
status success, dry_run, http_error, or skipped_*
fields_filled Properties that were backfilled onto the primary
error Error details if something went wrong (blank on success)

Open this in Excel or Google Sheets after running to confirm everything went as expected.

dedup_companies.log

A detailed timestamped log of every step the script took. Useful for auditing the run or diagnosing problems.


Recommended workflow

1. Follow Steps 1–4 to get set up (one-time).
2. Prepare your CSV file (Step 5).
3. Run with --dry-run and read the output.
4. Spot-check 2–3 pairs manually in HubSpot to confirm they look right.
5. Run live on just the first 5 rows to test end-to-end.
6. Open dedup_results.csv and confirm all rows show "success".
7. Run the full list.

Troubleshooting

command not found: python3 Python is not installed or not on your PATH. Go back to Step 1. On Windows, make sure you checked "Add Python to PATH" during installation.

ModuleNotFoundError: No module named 'requests' The requests library isn't installed in your current environment. Make sure your virtual environment is active (you should see (venv) in your prompt) and run pip install requests again.

No HubSpot API token found The script can't find your token. Make sure .env exists in the same folder as the script and contains HUBSPOT_API_KEY=pat-.... Alternatively, pass it directly with --token pat-....

Column 'X' not found The column names in your CSV don't match what the script expects. Check the headers in your file (open it in a text editor or Excel) and use --id-col, --name-col, --date-col, --primary-col, or --duplicate-col to point to the correct ones.

Company 12345 not found (already merged or deleted) The duplicate was already merged in a previous run or was deleted. The script logs it as skipped_duplicate_missing and moves on — this is safe to ignore.

HTTP 403 / insufficient scopes Your private app token is missing one of the required scopes. Go back to Step 4 and make sure all three scopes are enabled.

HTTP 429 / rate limit Another process is hitting HubSpot's API at the same time from the same account. Wait a minute and re-run — pairs that already succeeded will not be re-processed (the duplicate no longer exists, so they are skipped cleanly).

Search mode returns unexpected companies The search uses exact name matching, but company names with extra spaces or special characters may behave unexpectedly. Always review the dry-run output before running live.

#!/usr/bin/env python3
"""
HubSpot Company Deduplication Script
Three modes for finding and merging duplicate companies:
pairs — You supply explicit primary/duplicate ID pairs in a CSV.
export — You supply a full HubSpot company export; script finds duplicates by
exact company name match and keeps the oldest record as primary.
search — You supply a list of companies to keep (name + record ID); script
searches HubSpot for same-name companies and merges them in.
In every mode the merge logic is the same:
1. Fetch all properties from both companies.
2. Identify fields that are empty on the primary but have a value on the duplicate.
3. Call HubSpot's merge endpoint (primary wins; all associations transfer).
4. PATCH the primary with the backfill values collected before the merge.
Usage:
python dedup_companies.py pairs pairs.csv --token pat-xxx [--dry-run]
python dedup_companies.py export hs_export.csv --token pat-xxx [--dry-run]
python dedup_companies.py search primaries.csv --token pat-xxx [--dry-run]
"""
from __future__ import annotations
import argparse
import csv
import json
import logging
import os
import sys
import time
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import requests
from dotenv import load_dotenv
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
BASE_URL = "https://api.hubapi.com"
# General CRM API: ~100 req / 10 s for private apps → 0.15 s is safe.
RATE_LIMIT_DELAY = 0.15
# Search API: 4 req / s limit (separate quota from CRM reads/writes).
SEARCH_RATE_LIMIT_DELAY = 0.26
# Properties HubSpot manages internally — never attempt to write these.
SKIP_PROPERTIES: set[str] = {
"hs_object_id",
"hs_created_by",
"hs_created_by_user_id",
"hs_createdate",
"createdate",
"hs_lastmodifieddate",
"hs_object_source",
"hs_object_source_id",
"hs_object_source_label",
"hs_merged_object_ids",
"hs_was_imported",
"hs_updated_by_user_id",
"hs_user_ids_of_all_notification_followers",
"hs_user_ids_of_all_notification_unfollowers",
"hs_user_ids_of_all_owners",
"hs_all_owner_ids",
"hs_all_team_ids",
"hs_all_accessible_team_ids",
"hs_read_only",
}
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("dedup_companies.log", encoding="utf-8"),
],
)
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
def _headers(token: str) -> dict:
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
def get_all_property_definitions(token: str) -> dict[str, dict]:
"""Return all company property definitions keyed by internal API name."""
url = f"{BASE_URL}/crm/v3/properties/companies"
resp = requests.get(url, headers=_headers(token))
resp.raise_for_status()
return {p["name"]: p for p in resp.json().get("results", [])}
def _writable_property_names(all_props: dict[str, dict]) -> list[str]:
"""Return only the property names the API will accept in a PATCH."""
names = []
for name, prop in all_props.items():
if name in SKIP_PROPERTIES:
continue
meta = prop.get("modificationMetadata", {})
if meta.get("readOnlyValue"):
continue
if prop.get("calculated"):
continue
names.append(name)
return names
def get_company_props(company_id: str, token: str, property_names: list[str]) -> dict:
"""
Fetch all requested properties for a single company.
Splits into batches of 250 to stay safely within URL-length limits.
Raises ValueError if the company is not found (already merged/deleted).
"""
merged: dict = {}
for i in range(0, len(property_names), 250):
chunk = property_names[i : i + 250]
url = f"{BASE_URL}/crm/v3/objects/companies/{company_id}"
resp = requests.get(url, headers=_headers(token), params={"properties": ",".join(chunk)})
if resp.status_code == 404:
raise ValueError(f"Company {company_id} not found (already merged or deleted)")
resp.raise_for_status()
merged.update(resp.json().get("properties", {}))
time.sleep(RATE_LIMIT_DELAY)
return merged
def search_companies_by_name(name: str, token: str) -> list[dict]:
"""
Search the CRM for companies whose name exactly matches `name`.
Paginates automatically. Returns a list of raw result objects
with at minimum {id, properties: {name, createdate, domain}}.
"""
url = f"{BASE_URL}/crm/v3/objects/companies/search"
results: list[dict] = []
after: str | None = None
while True:
payload: dict = {
"filterGroups": [
{
"filters": [
{"propertyName": "name", "operator": "EQ", "value": name}
]
}
],
"properties": ["name", "createdate", "domain"],
"limit": 100,
}
if after:
payload["after"] = after
resp = requests.post(url, headers=_headers(token), json=payload)
resp.raise_for_status()
data = resp.json()
results.extend(data.get("results", []))
after = data.get("paging", {}).get("next", {}).get("after")
if not after:
break
time.sleep(SEARCH_RATE_LIMIT_DELAY)
return results
def merge_companies(primary_id: str, duplicate_id: str, token: str) -> dict:
"""
Call HubSpot's merge endpoint.
primaryObjectId's properties win; all associations transfer to primary.
"""
url = f"{BASE_URL}/crm/v3/objects/companies/merge"
resp = requests.post(
url,
headers=_headers(token),
json={"primaryObjectId": str(primary_id), "objectIdToMerge": str(duplicate_id)},
)
resp.raise_for_status()
return resp.json()
def update_company(company_id: str, properties: dict, token: str) -> dict:
"""PATCH a set of properties onto a company."""
url = f"{BASE_URL}/crm/v3/objects/companies/{company_id}"
resp = requests.patch(url, headers=_headers(token), json={"properties": properties})
resp.raise_for_status()
return resp.json()
# ---------------------------------------------------------------------------
# Core merge + backfill logic
# ---------------------------------------------------------------------------
def _is_empty(value) -> bool:
return value is None or (isinstance(value, str) and not value.strip())
def find_enrichment_fields(
primary_props: dict,
duplicate_props: dict,
writable_names: set[str],
) -> dict:
"""
Return {property_name: value} for fields that are empty on the primary
but have a value on the duplicate, restricted to writable properties.
"""
return {
name: val
for name, val in duplicate_props.items()
if name in writable_names
and not _is_empty(val)
and _is_empty(primary_props.get(name))
}
def process_pair(
primary_id: str,
duplicate_id: str,
token: str,
writable_names: set[str],
all_prop_names: list[str],
dry_run: bool,
results_writer,
) -> bool:
"""
Execute the fetch → diff → merge → backfill workflow for one pair.
Returns True on success, False on any error.
"""
row_result = {
"primary_id": primary_id,
"duplicate_id": duplicate_id,
"status": "",
"fields_filled": "",
"error": "",
}
try:
log.info(" Fetching primary %s ...", primary_id)
primary_props = get_company_props(primary_id, token, all_prop_names)
log.info(" Fetching duplicate %s ...", duplicate_id)
try:
dup_props = get_company_props(duplicate_id, token, all_prop_names)
except ValueError as exc:
log.warning(" Skipping — %s", exc)
row_result.update(status="skipped_duplicate_missing", error=str(exc))
results_writer.writerow(row_result)
return False
enrichment = find_enrichment_fields(primary_props, dup_props, writable_names)
fields_str = ", ".join(sorted(enrichment)) if enrichment else "none"
log.info(" Fields to backfill: %s", fields_str)
if dry_run:
log.info(" [DRY RUN] Would merge %s → %s", duplicate_id, primary_id)
if enrichment:
log.info(
" [DRY RUN] Would then PATCH %s with:\n%s",
primary_id,
json.dumps(enrichment, indent=4),
)
row_result.update(status="dry_run", fields_filled=fields_str)
else:
log.info(" Merging %s → %s ...", duplicate_id, primary_id)
merge_companies(primary_id, duplicate_id, token)
time.sleep(RATE_LIMIT_DELAY)
log.info(" Merge complete.")
if enrichment:
log.info(" Backfilling %d field(s) onto %s ...", len(enrichment), primary_id)
update_company(primary_id, enrichment, token)
time.sleep(RATE_LIMIT_DELAY)
log.info(" Backfill complete.")
row_result.update(status="success", fields_filled=fields_str)
results_writer.writerow(row_result)
return True
except requests.HTTPError as exc:
body = (exc.response.text if exc.response is not None else "")[:300]
log.error(" HTTP error: %s — %s", exc, body)
row_result.update(status="http_error", error=f"{exc} | {body}")
results_writer.writerow(row_result)
return False
except Exception as exc:
log.exception(" Unexpected error: %s", exc)
row_result.update(status="error", error=str(exc))
results_writer.writerow(row_result)
return False
def _open_results(path: str):
"""Open a results CSV writer. Returns (file_handle, DictWriter)."""
fh = open(path, "w", newline="", encoding="utf-8")
writer = csv.DictWriter(
fh,
fieldnames=["primary_id", "duplicate_id", "status", "fields_filled", "error"],
)
writer.writeheader()
return fh, writer
def _run_pairs(pairs: list[tuple[str, str]], token: str, writable_names: set[str],
all_prop_names: list[str], dry_run: bool, results_path: str):
"""Process a list of (primary_id, duplicate_id) tuples."""
success = failure = skipped = 0
total = len(pairs)
fh, writer = _open_results(results_path)
try:
for i, (primary_id, duplicate_id) in enumerate(pairs, 1):
log.info("[%d/%d] primary=%s duplicate=%s", i, total, primary_id, duplicate_id)
if not primary_id or not duplicate_id:
log.warning(" Blank ID — skipping row %d", i)
skipped += 1
writer.writerow({
"primary_id": primary_id, "duplicate_id": duplicate_id,
"status": "skipped_missing_ids", "fields_filled": "",
"error": "blank ID in input",
})
continue
if primary_id == duplicate_id:
log.warning(" primary == duplicate (%s) — skipping", primary_id)
skipped += 1
writer.writerow({
"primary_id": primary_id, "duplicate_id": duplicate_id,
"status": "skipped_same_id", "fields_filled": "",
"error": "primary and duplicate are identical",
})
continue
ok = process_pair(primary_id, duplicate_id, token, writable_names,
all_prop_names, dry_run, writer)
if ok:
success += 1
else:
failure += 1
finally:
fh.close()
log.info("")
log.info("=" * 60)
log.info("Done. Success: %d | Failures: %d | Skipped: %d", success, failure, skipped)
log.info("Results → %s", results_path)
log.info("Full log → dedup_companies.log")
# ---------------------------------------------------------------------------
# Path 1: explicit pairs
# ---------------------------------------------------------------------------
def cmd_pairs(args):
"""
Mode: pairs
Input CSV must have two columns: primary company ID and duplicate company ID.
No name matching or searching — you control every pair explicitly.
"""
csv_path = Path(args.csv_file)
if not csv_path.exists():
log.error("File not found: %s", csv_path)
sys.exit(1)
with csv_path.open(newline="", encoding="utf-8-sig") as f:
rows = list(csv.DictReader(f))
if not rows:
log.error("CSV is empty.")
sys.exit(1)
for col in (args.primary_col, args.duplicate_col):
if col not in rows[0]:
log.error("Column '%s' not found in CSV. Available: %s", col, list(rows[0].keys()))
sys.exit(1)
pairs = [
(str(r[args.primary_col]).strip(), str(r[args.duplicate_col]).strip())
for r in rows
]
log.info("Loaded %d pair(s) from %s", len(pairs), csv_path)
token, writable, all_names = _load_properties(args.token)
_run_pairs(pairs, token, writable, all_names, args.dry_run, args.results_csv)
# ---------------------------------------------------------------------------
# Path 2: full export — find duplicates by exact name, keep oldest
# ---------------------------------------------------------------------------
def _parse_hs_date(raw: str) -> datetime | None:
"""
Parse a HubSpot date/timestamp into an aware UTC datetime.
Handles: Unix ms integers, ISO strings, US date strings.
"""
if not raw or not raw.strip():
return None
raw = raw.strip()
# Unix milliseconds (HubSpot API default)
if raw.isdigit() and len(raw) > 10:
return datetime.fromtimestamp(int(raw) / 1000, tz=timezone.utc)
# Try common string formats
for fmt in (
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d",
"%m/%d/%Y",
"%B %d, %Y",
):
try:
dt = datetime.strptime(raw, fmt)
return dt.replace(tzinfo=timezone.utc)
except ValueError:
continue
return None
def cmd_export(args):
"""
Mode: export
Reads a full HubSpot company export CSV.
Groups rows by exact (case-insensitive) company name.
For each group with 2+ records, the oldest (earliest createdate) becomes
primary; all others are merged into it.
"""
csv_path = Path(args.csv_file)
if not csv_path.exists():
log.error("File not found: %s", csv_path)
sys.exit(1)
with csv_path.open(newline="", encoding="utf-8-sig") as f:
rows = list(csv.DictReader(f))
if not rows:
log.error("CSV is empty.")
sys.exit(1)
for col in (args.name_col, args.id_col, args.date_col):
if col not in rows[0]:
log.error("Column '%s' not found. Available: %s", col, list(rows[0].keys()))
sys.exit(1)
# Group by normalised name
groups: dict[str, list[dict]] = defaultdict(list)
for r in rows:
name = r[args.name_col].strip()
if name:
groups[name.lower()].append(r)
duplicate_groups = {k: v for k, v in groups.items() if len(v) > 1}
log.info(
"Found %d duplicate name group(s) across %d total rows.",
len(duplicate_groups),
len(rows),
)
if not duplicate_groups:
log.info("No duplicates found. Nothing to do.")
return
pairs: list[tuple[str, str]] = []
for name_key, group in duplicate_groups.items():
# Sort ascending by createdate so index 0 is the oldest
def sort_key(r):
dt = _parse_hs_date(r.get(args.date_col, ""))
return dt or datetime.max.replace(tzinfo=timezone.utc)
group_sorted = sorted(group, key=sort_key)
primary_id = str(group_sorted[0][args.id_col]).strip()
primary_name = group_sorted[0][args.name_col].strip()
log.info(
" Group '%s': primary=%s duplicates=%s",
primary_name,
primary_id,
[str(r[args.id_col]).strip() for r in group_sorted[1:]],
)
for dup_row in group_sorted[1:]:
dup_id = str(dup_row[args.id_col]).strip()
if dup_id and dup_id != primary_id:
pairs.append((primary_id, dup_id))
log.info("Built %d merge pair(s).", len(pairs))
token, writable, all_names = _load_properties(args.token)
_run_pairs(pairs, token, writable, all_names, args.dry_run, args.results_csv)
# ---------------------------------------------------------------------------
# Path 3: search — user provides primaries, script finds duplicates via API
# ---------------------------------------------------------------------------
def cmd_search(args):
"""
Mode: search
Reads a CSV of companies you want to KEEP (each row has a record ID and name).
For each row, searches HubSpot for all companies with the same exact name.
Any match whose ID differs from your specified primary is treated as a duplicate.
NOTE: The Search API is rate-limited to ~4 req/s. For large lists this mode
will be slower than the others. A progress indicator shows estimated wait time.
"""
csv_path = Path(args.csv_file)
if not csv_path.exists():
log.error("File not found: %s", csv_path)
sys.exit(1)
with csv_path.open(newline="", encoding="utf-8-sig") as f:
rows = list(csv.DictReader(f))
if not rows:
log.error("CSV is empty.")
sys.exit(1)
for col in (args.id_col, args.name_col):
if col not in rows[0]:
log.error("Column '%s' not found. Available: %s", col, list(rows[0].keys()))
sys.exit(1)
token, writable, all_names = _load_properties(args.token)
log.info("Searching HubSpot for duplicates of %d company/companies...", len(rows))
log.info("(Search API rate limit: ~4 req/s — expect ~%.0fs minimum)", len(rows) * SEARCH_RATE_LIMIT_DELAY)
pairs: list[tuple[str, str]] = []
for i, row in enumerate(rows, 1):
primary_id = str(row[args.id_col]).strip()
name = row[args.name_col].strip()
if not primary_id or not name:
log.warning("[%d/%d] Blank id or name — skipping", i, len(rows))
continue
log.info("[%d/%d] Searching for '%s' (keep ID %s) ...", i, len(rows), name, primary_id)
try:
results = search_companies_by_name(name, token)
except requests.HTTPError as exc:
log.error(" Search failed: %s", exc)
continue
duplicates = [r for r in results if str(r["id"]) != str(primary_id)]
if not duplicates:
log.info(" No other companies found with this name.")
continue
log.info(" Found %d duplicate(s): %s", len(duplicates), [r["id"] for r in duplicates])
for dup in duplicates:
pairs.append((primary_id, str(dup["id"])))
# Extra delay after each search call to respect the stricter quota
time.sleep(SEARCH_RATE_LIMIT_DELAY)
if not pairs:
log.info("No duplicate pairs found after searching. Nothing to merge.")
return
log.info("Built %d merge pair(s) from search results.", len(pairs))
_run_pairs(pairs, token, writable, all_names, args.dry_run, args.results_csv)
# ---------------------------------------------------------------------------
# Shared setup: load property definitions
# ---------------------------------------------------------------------------
def _load_properties(token: str) -> tuple[str, set[str], list[str]]:
"""Fetch property definitions and return (token, writable_set, all_names_list)."""
log.info("Fetching company property definitions from HubSpot...")
try:
all_props = get_all_property_definitions(token)
except requests.HTTPError as exc:
log.error("Failed to fetch property definitions: %s", exc)
sys.exit(1)
writable: set[str] = set(_writable_property_names(all_props))
all_names: list[str] = list(all_props.keys())
log.info("Found %d total properties, %d writable.", len(all_names), len(writable))
return token, writable, all_names
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def _add_common_args(p: argparse.ArgumentParser):
p.add_argument(
"--token",
default=os.environ.get("HUBSPOT_API_KEY", ""),
metavar="PAT",
help="HubSpot private app token (or set HUBSPOT_API_KEY in .env)",
)
p.add_argument(
"--dry-run",
action="store_true",
help="Preview every action without making any API changes",
)
p.add_argument(
"--results-csv",
default="dedup_results.csv",
metavar="FILE",
help="Output CSV for per-pair results (default: dedup_results.csv)",
)
def main():
load_dotenv()
parser = argparse.ArgumentParser(
description="Merge duplicate HubSpot companies. Choose a mode with the first argument.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Modes:
pairs Explicit CSV of primary/duplicate ID pairs.
export Full HubSpot company export; script groups by name and keeps the oldest.
search CSV of companies to keep; script searches HubSpot for same-name duplicates.
Examples:
python dedup_companies.py pairs pairs.csv --token pat-xxx --dry-run
python dedup_companies.py export hs_export.csv --token pat-xxx
python dedup_companies.py search primaries.csv --token pat-xxx --dry-run
""",
)
sub = parser.add_subparsers(dest="mode", required=True)
# -- pairs ----------------------------------------------------------------
p_pairs = sub.add_parser("pairs", help="Explicit primary/duplicate ID pairs CSV")
p_pairs.add_argument("csv_file", help="CSV file with pairs")
p_pairs.add_argument("--primary-col", default="primary_company_id",
help="Column name for primary ID (default: primary_company_id)")
p_pairs.add_argument("--duplicate-col", default="duplicate_company_id",
help="Column name for duplicate ID (default: duplicate_company_id)")
_add_common_args(p_pairs)
p_pairs.set_defaults(func=cmd_pairs)
# -- export ---------------------------------------------------------------
p_export = sub.add_parser("export", help="Full HubSpot company export CSV")
p_export.add_argument("csv_file", help="CSV file — HubSpot company export")
p_export.add_argument("--id-col", default="Record ID",
help="Column name for company record ID (default: 'Record ID')")
p_export.add_argument("--name-col", default="Company name",
help="Column name for company name (default: 'Company name')")
p_export.add_argument("--date-col", default="Create Date",
help="Column name for create date (default: 'Create Date')")
_add_common_args(p_export)
p_export.set_defaults(func=cmd_export)
# -- search ---------------------------------------------------------------
p_search = sub.add_parser("search", help="CSV of primaries; script searches HubSpot for duplicates")
p_search.add_argument("csv_file", help="CSV file with companies to keep")
p_search.add_argument("--id-col", default="record_id",
help="Column name for the record ID to keep (default: record_id)")
p_search.add_argument("--name-col", default="name",
help="Column name for company name to search (default: name)")
_add_common_args(p_search)
p_search.set_defaults(func=cmd_search)
args = parser.parse_args()
if not args.token:
log.error(
"No HubSpot API token found. "
"Add HUBSPOT_API_KEY=pat-... to your .env file, or pass --token."
)
sys.exit(1)
if args.dry_run:
log.info("=" * 60)
log.info("DRY RUN MODE — no API changes will be made")
log.info("=" * 60)
args.func(args)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment