Skip to content

Instantly share code, notes, and snippets.

@jonnymaserati
Created December 9, 2024 10:43
Show Gist options
  • Save jonnymaserati/daca90a4dc2f04a50d48c477be4a606d to your computer and use it in GitHub Desktop.
Save jonnymaserati/daca90a4dc2f04a50d48c477be4a606d to your computer and use it in GitHub Desktop.
A vectorized method for wellbore TVD interpolation
"""
Prototyping a non-iterative solution to interpolate TVDs of a survey.
"""
import logging
import requests
import numpy as np
from scipy.spatial.transform import Rotation as R
from datetime import datetime
import welleng as we
from typing import Optional
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Constants
ERROR_THRESHOLD = 1e-6
# Utility functions
def fetch_test_wells(url: str) -> Optional[dict]:
"""
Fetch JSON data from a given URL.
Parameters
----------
url : str
The URL of the JSON file.
Returns
-------
dict or None
A dictionary containing the JSON data if the request is successful,
or None if the request fails or an error occurs.
"""
try:
response = requests.get(url)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching JSON data: {e}")
return None
def check_for_inflection_points(survey: we.survey.Survey) -> we.survey.Survey:
"""
Ensure inflection points are included in the survey to capture all TVDs.
Parameters
----------
survey : we.survey.Survey
The original survey.
Returns
-------
we.survey.Survey
A survey with inflection points added if any are detected.
"""
angles = _get_angles(survey)
rotations_inverse = R.from_euler('zyz', angles * -1, degrees=False).inv()
a, _, c = rotations_inverse.as_matrix()[:, -1].T
critical_theta = np.arctan2(-c, a)
critical_points = np.where(np.abs(critical_theta[1:]) < survey.dogleg[:-1])[0]
if len(critical_points) > 0:
radius = survey.curve_radius[critical_points + 1]
theta = critical_theta[critical_points + 1]
md = survey.md[critical_points] + radius * np.abs(theta)
_, local_vec = _get_local_pos_and_vec(radius, np.abs(theta))
global_vec = [
r.apply(lv)
for r, lv in zip(rotations_inverse[critical_points], local_vec)
]
inc, azi = we.utils.get_angles(global_vec, nev=True).T
survey_temp = np.vstack((survey.survey_rad, np.array([md, inc, azi]).T))
survey_temp = survey_temp[np.argsort(survey_temp[:, 0])]
sh = survey.header
sh.azi_reference = 'grid'
survey_new = we.survey.Survey(
md=survey_temp[:, 0],
inc=survey_temp[:, 1],
azi=survey_temp[:, 2],
deg=False,
header=sh,
start_nev=survey.start_nev,
start_xyz=survey.start_xyz,
)
survey_new.interpolated[np.searchsorted(survey_new.md, md)] = 1
return survey_new
return survey
def _get_indices_between(
tvd: np.ndarray, target_tvd: list | np.ndarray, err: float = 0
) -> tuple[np.ndarray, np.ndarray]:
"""
Get indices where the target TVD lies between two consecutive TVDs.
Parameters
----------
tvd : np.ndarray
Array of TVDs in the survey (n,).
target_tvd : list or np.ndarray
The target TVD(s) to check.
err : float, optional
Allowable error for matching boundaries, by default 0.
Returns
-------
tuple[np.ndarray, np.ndarray]
- Indices of the target TVDs in the target array.
- Indices of the survey TVDs bracketing the target values.
"""
target_tvd = np.asarray(target_tvd).reshape(-1, 1) # Ensure target_tvd is 2D
relative_positions = (target_tvd - tvd[:-1]) / (tvd[1:] - tvd[:-1])
# Identify where target TVDs lie between consecutive TVDs
mask = (relative_positions >= 0 + err) & (relative_positions <= 1 - err)
target_indices, survey_indices = np.where(mask)
# Ensure survey_indices does not exceed the bounds of tvd[:-1]
survey_indices = np.clip(survey_indices, 0, len(tvd) - 2)
return target_indices, survey_indices
def _get_angles(survey, idx: Optional[int] = None) -> np.ndarray:
"""
Retrieve azimuth, inclination, and toolface angles from the survey.
Parameters
----------
survey : we.survey.Survey
The well survey.
idx : int or None, optional
Specific index to extract, or None for all.
Returns
-------
np.ndarray
Array of angles (n, 3).
"""
if idx is None:
return np.column_stack((survey.azi_grid_rad, survey.inc_rad, survey.toolface))
if np.isscalar(idx):
return np.array([survey.azi_grid_rad[idx], survey.inc_rad[idx], survey.toolface[idx]])
return np.column_stack((survey.azi_grid_rad[idx], survey.inc_rad[idx], survey.toolface[idx]))
def _get_local_pos_and_vec(radius: np.ndarray, angle: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""
Compute local positions and vectors along an arc.
Parameters
----------
radius : np.ndarray
Array of radii.
angle : np.ndarray
Array of angles (radians).
Returns
-------
tuple
Local positions and vectors as arrays.
"""
pos = np.column_stack((
radius - radius * np.cos(angle),
np.zeros_like(radius),
radius * np.sin(angle)
))
vec = np.column_stack((
np.sin(angle),
np.zeros_like(radius),
np.cos(angle)
))
return pos, vec
def compute_pos_and_vec(
radius: float, delta_tvd: float, inverse_rotation_matrix: np.ndarray
) -> tuple[np.ndarray, np.ndarray, float]:
"""
Unified computation for position and vector.
Parameters
----------
radius : float
The curve radius.
delta_tvd : float
Change in TVD for the segment.
inverse_rotation_matrix : np.ndarray
The inverse rotation matrix (3 x 3).
Returns
-------
tuple
Global position, global vector, and delta MD.
"""
a, _, c = inverse_rotation_matrix[-1]
if radius == np.inf:
vec = np.array([0, 0, 1])
delta_md = delta_tvd / c
pos = np.array([0, 0, delta_md])
else:
theta = np.arcsin((delta_tvd / radius - a) / np.hypot(a, c)) + np.arctan2(a, c)
delta_md = theta * radius
pos, vec = _get_local_pos_and_vec(np.array([radius]), np.array([theta]))
pos, vec = pos[0], vec[0]
pos_global, vec_global = (
np.stack((pos, vec)) @ inverse_rotation_matrix.T
)
return pos_global, vec_global, delta_md
def _vectorized_solve_for_delta_tvd(
radius: np.ndarray, delta_tvd: np.ndarray, inverse_rotation_matrix: np.ndarray,
pos_0: np.ndarray, md: np.ndarray
) -> np.ndarray:
"""
Vectorized computation for solving for delta TVD, positions, and vectors.
Parameters
----------
radius : np.ndarray
Array of radii (n,). A value of np.inf indicates a straight section.
delta_tvd : np.ndarray
Array of delta TVD values (n,).
inverse_rotation_matrix : np.ndarray
Array of inverse rotation matrices (n, 3, 3), where each matrix corresponds
to the transformation for each interval.
pos_0 : np.ndarray
Array of starting positions (n, 3), representing the initial global
positions for each segment.
md : np.ndarray
Array of measured depths (n,), representing the cumulative MD at the
start of each segment.
Returns
-------
np.ndarray
Array of results with shape (n, 3, 3), where each entry contains:
- pos_1 : ndarray (1 x 3)
Final positions for each segment after applying the transformations.
- vec_1 : ndarray (1 x 3)
Final vectors for each segment after applying the transformations.
- [md_1, radius, delta_tvd] : ndarray (1 x 3)
Array containing:
- `md_1`: The measured depth at the end of the segment
(delta_md + md).
- `radius`: The radius of the curve or np.inf for straight sections.
- `delta_tvd`: The change in true vertical depth for the segment.
"""
a = inverse_rotation_matrix[:, 2, 0]
c = inverse_rotation_matrix[:, 2, 2]
n = radius.shape[0]
pos_1_global = np.zeros((n, 3))
vec_1_global = np.zeros((n, 3))
delta_md_array = np.zeros(n)
# Straight sections
straight_section_mask = radius == np.inf
if np.any(straight_section_mask):
straight_delta_tvd = delta_tvd[straight_section_mask]
straight_c = c[straight_section_mask]
vec_1_straight = np.tile([0.0, 0.0, 1.0], (straight_delta_tvd.shape[0], 1))
delta_md_straight = straight_delta_tvd / straight_c
pos_1_straight = np.zeros_like(vec_1_straight)
pos_1_straight[:, 2] = delta_md_straight
pos_1_global[straight_section_mask] = np.einsum(
'nij,nj->ni', inverse_rotation_matrix[straight_section_mask], pos_1_straight
)
vec_1_global[straight_section_mask] = np.einsum(
'nij,nj->ni', inverse_rotation_matrix[straight_section_mask], vec_1_straight
)
delta_md_array[straight_section_mask] = delta_md_straight
# Curved sections
curved_section_mask = ~straight_section_mask
if np.any(curved_section_mask):
curved_radius = radius[curved_section_mask]
curved_delta_tvd = delta_tvd[curved_section_mask]
curved_a = a[curved_section_mask]
curved_c = c[curved_section_mask]
k = curved_delta_tvd / curved_radius
m = k - curved_a
denominator = np.hypot(curved_a, curved_c)
theta = np.arcsin(m / denominator) + np.arctan2(curved_a, curved_c)
delta_md_curved = theta * curved_radius
pos_1_curved, vec_1_curved = _get_local_pos_and_vec(curved_radius, theta)
pos_1_global[curved_section_mask] = np.einsum(
'nij,nj->ni', inverse_rotation_matrix[curved_section_mask], pos_1_curved
)
vec_1_global[curved_section_mask] = np.einsum(
'nij,nj->ni', inverse_rotation_matrix[curved_section_mask], vec_1_curved
)
delta_md_array[curved_section_mask] = delta_md_curved
# Combine results
result = np.zeros((n, 3, 3))
result[:, 0, :] = pos_1_global + pos_0.reshape(pos_1_global.shape)
result[:, 1, :] = vec_1_global
result[:, 2, :] = np.stack((delta_md_array + md, radius, delta_tvd), axis=-1)
return result
def interpolate_tvd(
survey: we.survey.Survey, tvd: np.ndarray
) -> np.ndarray:
"""
Interpolate TVD values for a survey.
Parameters
----------
survey : we.survey.Survey
The survey to interpolate.
tvd : np.ndarray
Array of target TVD values.
Returns
-------
np.ndarray
Interpolated results.
"""
survey = check_for_inflection_points(survey)
tvd_indices, survey_indices = _get_indices_between(survey.tvd, tvd)
rotation = R.from_euler('zyz', _get_angles(survey, survey_indices) * -1, degrees=False)
inverse_rotation_matrix = rotation.inv().as_matrix()
radius = survey.curve_radius[survey_indices + 1]
return _vectorized_solve_for_delta_tvd(
radius,
tvd[tvd_indices] - survey.tvd[survey_indices],
inverse_rotation_matrix,
pos_0=survey.pos_nev[survey_indices],
md=survey.md[survey_indices],
)
def main():
"""
Main function to fetch and process JSON data.
"""
url = "https://raw.githubusercontent.com/jonnymaserati/welleng/main/tests/test_data/clearance_iscwsa_well_data.json"
well_surveys = fetch_test_wells(url)
if not well_surveys:
logging.error("Failed to fetch JSON data.")
return
logging.info("JSON data loaded successfully.")
well = well_surveys['wells']['11 - well']
keys = ('MD', 'IncDeg', 'AziDeg')
data = {k: well[k] for k in keys}
data['MD'].extend([3550, 4550, 5550])
data['IncDeg'].extend([70, 110, 60])
data['AziDeg'].extend([88, 86, 84])
sh = we.survey.SurveyHeader(**well['header'])
survey = we.survey.Survey(
md=data['MD'],
inc=data['IncDeg'],
azi=data['AziDeg'],
deg=True,
radius=50,
start_nev=[well['N'][0], well['E'][0], well['TVD'][0]],
header=sh,
)
n = 10_000
start = datetime.now()
result = interpolate_tvd(survey, np.linspace(0, survey.tvd[-1], n))
stop = datetime.now()
logging.info(f"Time taken for {n} iterations: {stop - start}")
for r in result:
node = survey.interpolate_md(r[2, 0])
assert np.allclose(node.pos_nev, r[0], rtol=1e-3, atol=1e-3)
assert np.any((
np.allclose(node.vec_nev, r[1], rtol=1e-3, atol=1e-3),
node.inc_deg < 0.7
))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment