Skip to content

Instantly share code, notes, and snippets.

@jmchilton
Created January 7, 2026 21:01
Show Gist options
  • Select an option

  • Save jmchilton/d159c404e234e413bc6779c1e3f8f7fb to your computer and use it in GitHub Desktop.

Select an option

Save jmchilton/d159c404e234e413bc6779c1e3f8f7fb to your computer and use it in GitHub Desktop.
Deduplication Plan: test/integration/test_job_cache.py

Deduplication Plan: test/integration/test_job_cache.py

Summary

Metric Before After
Total Lines 160 ~110
Duplicated Lines ~70 (44%) ~10 (9%)
Test Methods 2 2
Helper Methods 0 3

Refactoring Steps

Step 1: Extract _set_hda_to_failed_metadata() helper

Extract the identical 7-line DB manipulation block into a helper method.

def _set_hda_to_failed_metadata(self, hda_id: str):
    """Set an HDA to failed_metadata state via direct DB manipulation."""
    database_id = self._get(f"configuration/decode/{hda_id}").json()["decoded_id"]
    hda_model = self.sa_session.scalar(
        select(HistoryDatasetAssociation).where(HistoryDatasetAssociation.id == database_id)
    )
    assert hda_model
    hda_model.state = HistoryDatasetAssociation.states.FAILED_METADATA
    self.sa_session.add(hda_model)
    self.sa_session.commit()

Step 2: Extract _run_and_verify_cache_hit() helper

Extract the run-wait-run_cached-wait-verify pattern.

def _run_and_verify_cache_hit(self, tool_id: str, inputs: dict, history_id: str) -> str:
    """Run tool, then run again with cache, verify cache was used. Returns first job ID."""
    first_run = self.dataset_populator.run_tool(
        tool_id=tool_id, inputs=inputs, history_id=history_id
    )
    first_job_id = first_run["jobs"][0]["id"]
    self.dataset_populator.wait_for_job(first_job_id)

    cached_run = self.dataset_populator.run_tool(
        tool_id=tool_id, inputs=inputs, history_id=history_id, use_cached_job=True
    )
    cached_job_id = cached_run["jobs"][0]["id"]
    self.dataset_populator.wait_for_job(cached_job_id)

    cached_job_details = self.dataset_populator.get_job_details(cached_job_id, full=True).json()
    assert cached_job_details["copied_from_job_id"] == first_job_id, "Should have used cached job"
    return first_job_id

Step 3: Extract _verify_cache_excluded_with_failed_metadata() helper

Extract the final verification block.

def _verify_cache_excluded_with_failed_metadata(self, tool_id: str, inputs: dict, history_id: str):
    """Run tool with cache enabled, verify cache NOT used due to failed_metadata."""
    result = self.dataset_populator.run_tool_raw(
        tool_id=tool_id, inputs=inputs, history_id=history_id, use_cached_job=True
    ).json()
    assert "errors" in result and result["errors"], \
        "Tool should fail when input HDA is in failed_metadata state"
    assert "failed_metadata" in str(result["errors"]), \
        "Error should mention failed_metadata state"

Step 4: Simplify test methods

Before (test_job_cache_excludes_failed_metadata_hda): 62 lines After: ~12 lines

def test_job_cache_excludes_failed_metadata_hda(self):
    """Test that job cache lookup excludes HDAs in failed_metadata state."""
    with self.dataset_populator.test_history() as history_id:
        hda = self.dataset_populator.new_dataset(history_id, content="test content")
        hda_id = hda["id"]
        inputs = {"input1": {"src": "hda", "id": hda_id}}

        self._run_and_verify_cache_hit("cat1", inputs, history_id)
        self._set_hda_to_failed_metadata(hda_id)
        self._verify_cache_excluded_with_failed_metadata("cat1", inputs, history_id)

Before (test_job_cache_excludes_failed_metadata_hdca_element): 69 lines After: ~16 lines

def test_job_cache_excludes_failed_metadata_hdca_element(self):
    """Test that job cache lookup excludes HDCAs with elements in failed_metadata state."""
    with self.dataset_populator.test_history() as history_id:
        create_response = self.dataset_collection_populator.create_list_in_history(
            history_id, contents=["content1\n", "content2\n"], wait=True
        ).json()
        hdca_id = create_response["output_collections"][0]["id"]

        hdca_details = self.dataset_populator.get_history_collection_details(history_id, content_id=hdca_id)
        first_element_hda_id = hdca_details["elements"][0]["object"]["id"]

        inputs = {"input1": {"src": "hdca", "id": hdca_id}}

        self._run_and_verify_cache_hit("cat_list", inputs, history_id)
        self._set_hda_to_failed_metadata(first_element_hda_id)
        self._verify_cache_excluded_with_failed_metadata("cat_list", inputs, history_id)

Final File Structure

class TestJobCacheFiltering(integration_util.IntegrationTestCase):
    # ... existing setUp, sa_session ...

    # Helper methods (new)
    def _set_hda_to_failed_metadata(self, hda_id: str): ...
    def _run_and_verify_cache_hit(self, tool_id: str, inputs: dict, history_id: str) -> str: ...
    def _verify_cache_excluded_with_failed_metadata(self, tool_id: str, inputs: dict, history_id: str): ...

    # Test methods (simplified)
    def test_job_cache_excludes_failed_metadata_hda(self): ...
    def test_job_cache_excludes_failed_metadata_hdca_element(self): ...

Risk Assessment

  • Risk Level: Low
  • Testing: Run existing tests after refactoring - behavior unchanged
  • Reversibility: Easy - just inline helpers back if needed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment