Created
December 9, 2024 10:43
-
-
Save jonnymaserati/daca90a4dc2f04a50d48c477be4a606d to your computer and use it in GitHub Desktop.
A vectorized method for wellbore TVD interpolation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Prototyping a non-iterative solution to interpolate TVDs of a survey. | |
""" | |
import logging | |
import requests | |
import numpy as np | |
from scipy.spatial.transform import Rotation as R | |
from datetime import datetime | |
import welleng as we | |
from typing import Optional | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Constants | |
ERROR_THRESHOLD = 1e-6 | |
# Utility functions | |
def fetch_test_wells(url: str) -> Optional[dict]: | |
""" | |
Fetch JSON data from a given URL. | |
Parameters | |
---------- | |
url : str | |
The URL of the JSON file. | |
Returns | |
------- | |
dict or None | |
A dictionary containing the JSON data if the request is successful, | |
or None if the request fails or an error occurs. | |
""" | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
logging.error(f"Error fetching JSON data: {e}") | |
return None | |
def check_for_inflection_points(survey: we.survey.Survey) -> we.survey.Survey: | |
""" | |
Ensure inflection points are included in the survey to capture all TVDs. | |
Parameters | |
---------- | |
survey : we.survey.Survey | |
The original survey. | |
Returns | |
------- | |
we.survey.Survey | |
A survey with inflection points added if any are detected. | |
""" | |
angles = _get_angles(survey) | |
rotations_inverse = R.from_euler('zyz', angles * -1, degrees=False).inv() | |
a, _, c = rotations_inverse.as_matrix()[:, -1].T | |
critical_theta = np.arctan2(-c, a) | |
critical_points = np.where(np.abs(critical_theta[1:]) < survey.dogleg[:-1])[0] | |
if len(critical_points) > 0: | |
radius = survey.curve_radius[critical_points + 1] | |
theta = critical_theta[critical_points + 1] | |
md = survey.md[critical_points] + radius * np.abs(theta) | |
_, local_vec = _get_local_pos_and_vec(radius, np.abs(theta)) | |
global_vec = [ | |
r.apply(lv) | |
for r, lv in zip(rotations_inverse[critical_points], local_vec) | |
] | |
inc, azi = we.utils.get_angles(global_vec, nev=True).T | |
survey_temp = np.vstack((survey.survey_rad, np.array([md, inc, azi]).T)) | |
survey_temp = survey_temp[np.argsort(survey_temp[:, 0])] | |
sh = survey.header | |
sh.azi_reference = 'grid' | |
survey_new = we.survey.Survey( | |
md=survey_temp[:, 0], | |
inc=survey_temp[:, 1], | |
azi=survey_temp[:, 2], | |
deg=False, | |
header=sh, | |
start_nev=survey.start_nev, | |
start_xyz=survey.start_xyz, | |
) | |
survey_new.interpolated[np.searchsorted(survey_new.md, md)] = 1 | |
return survey_new | |
return survey | |
def _get_indices_between( | |
tvd: np.ndarray, target_tvd: list | np.ndarray, err: float = 0 | |
) -> tuple[np.ndarray, np.ndarray]: | |
""" | |
Get indices where the target TVD lies between two consecutive TVDs. | |
Parameters | |
---------- | |
tvd : np.ndarray | |
Array of TVDs in the survey (n,). | |
target_tvd : list or np.ndarray | |
The target TVD(s) to check. | |
err : float, optional | |
Allowable error for matching boundaries, by default 0. | |
Returns | |
------- | |
tuple[np.ndarray, np.ndarray] | |
- Indices of the target TVDs in the target array. | |
- Indices of the survey TVDs bracketing the target values. | |
""" | |
target_tvd = np.asarray(target_tvd).reshape(-1, 1) # Ensure target_tvd is 2D | |
relative_positions = (target_tvd - tvd[:-1]) / (tvd[1:] - tvd[:-1]) | |
# Identify where target TVDs lie between consecutive TVDs | |
mask = (relative_positions >= 0 + err) & (relative_positions <= 1 - err) | |
target_indices, survey_indices = np.where(mask) | |
# Ensure survey_indices does not exceed the bounds of tvd[:-1] | |
survey_indices = np.clip(survey_indices, 0, len(tvd) - 2) | |
return target_indices, survey_indices | |
def _get_angles(survey, idx: Optional[int] = None) -> np.ndarray: | |
""" | |
Retrieve azimuth, inclination, and toolface angles from the survey. | |
Parameters | |
---------- | |
survey : we.survey.Survey | |
The well survey. | |
idx : int or None, optional | |
Specific index to extract, or None for all. | |
Returns | |
------- | |
np.ndarray | |
Array of angles (n, 3). | |
""" | |
if idx is None: | |
return np.column_stack((survey.azi_grid_rad, survey.inc_rad, survey.toolface)) | |
if np.isscalar(idx): | |
return np.array([survey.azi_grid_rad[idx], survey.inc_rad[idx], survey.toolface[idx]]) | |
return np.column_stack((survey.azi_grid_rad[idx], survey.inc_rad[idx], survey.toolface[idx])) | |
def _get_local_pos_and_vec(radius: np.ndarray, angle: np.ndarray) -> tuple[np.ndarray, np.ndarray]: | |
""" | |
Compute local positions and vectors along an arc. | |
Parameters | |
---------- | |
radius : np.ndarray | |
Array of radii. | |
angle : np.ndarray | |
Array of angles (radians). | |
Returns | |
------- | |
tuple | |
Local positions and vectors as arrays. | |
""" | |
pos = np.column_stack(( | |
radius - radius * np.cos(angle), | |
np.zeros_like(radius), | |
radius * np.sin(angle) | |
)) | |
vec = np.column_stack(( | |
np.sin(angle), | |
np.zeros_like(radius), | |
np.cos(angle) | |
)) | |
return pos, vec | |
def compute_pos_and_vec( | |
radius: float, delta_tvd: float, inverse_rotation_matrix: np.ndarray | |
) -> tuple[np.ndarray, np.ndarray, float]: | |
""" | |
Unified computation for position and vector. | |
Parameters | |
---------- | |
radius : float | |
The curve radius. | |
delta_tvd : float | |
Change in TVD for the segment. | |
inverse_rotation_matrix : np.ndarray | |
The inverse rotation matrix (3 x 3). | |
Returns | |
------- | |
tuple | |
Global position, global vector, and delta MD. | |
""" | |
a, _, c = inverse_rotation_matrix[-1] | |
if radius == np.inf: | |
vec = np.array([0, 0, 1]) | |
delta_md = delta_tvd / c | |
pos = np.array([0, 0, delta_md]) | |
else: | |
theta = np.arcsin((delta_tvd / radius - a) / np.hypot(a, c)) + np.arctan2(a, c) | |
delta_md = theta * radius | |
pos, vec = _get_local_pos_and_vec(np.array([radius]), np.array([theta])) | |
pos, vec = pos[0], vec[0] | |
pos_global, vec_global = ( | |
np.stack((pos, vec)) @ inverse_rotation_matrix.T | |
) | |
return pos_global, vec_global, delta_md | |
def _vectorized_solve_for_delta_tvd( | |
radius: np.ndarray, delta_tvd: np.ndarray, inverse_rotation_matrix: np.ndarray, | |
pos_0: np.ndarray, md: np.ndarray | |
) -> np.ndarray: | |
""" | |
Vectorized computation for solving for delta TVD, positions, and vectors. | |
Parameters | |
---------- | |
radius : np.ndarray | |
Array of radii (n,). A value of np.inf indicates a straight section. | |
delta_tvd : np.ndarray | |
Array of delta TVD values (n,). | |
inverse_rotation_matrix : np.ndarray | |
Array of inverse rotation matrices (n, 3, 3), where each matrix corresponds | |
to the transformation for each interval. | |
pos_0 : np.ndarray | |
Array of starting positions (n, 3), representing the initial global | |
positions for each segment. | |
md : np.ndarray | |
Array of measured depths (n,), representing the cumulative MD at the | |
start of each segment. | |
Returns | |
------- | |
np.ndarray | |
Array of results with shape (n, 3, 3), where each entry contains: | |
- pos_1 : ndarray (1 x 3) | |
Final positions for each segment after applying the transformations. | |
- vec_1 : ndarray (1 x 3) | |
Final vectors for each segment after applying the transformations. | |
- [md_1, radius, delta_tvd] : ndarray (1 x 3) | |
Array containing: | |
- `md_1`: The measured depth at the end of the segment | |
(delta_md + md). | |
- `radius`: The radius of the curve or np.inf for straight sections. | |
- `delta_tvd`: The change in true vertical depth for the segment. | |
""" | |
a = inverse_rotation_matrix[:, 2, 0] | |
c = inverse_rotation_matrix[:, 2, 2] | |
n = radius.shape[0] | |
pos_1_global = np.zeros((n, 3)) | |
vec_1_global = np.zeros((n, 3)) | |
delta_md_array = np.zeros(n) | |
# Straight sections | |
straight_section_mask = radius == np.inf | |
if np.any(straight_section_mask): | |
straight_delta_tvd = delta_tvd[straight_section_mask] | |
straight_c = c[straight_section_mask] | |
vec_1_straight = np.tile([0.0, 0.0, 1.0], (straight_delta_tvd.shape[0], 1)) | |
delta_md_straight = straight_delta_tvd / straight_c | |
pos_1_straight = np.zeros_like(vec_1_straight) | |
pos_1_straight[:, 2] = delta_md_straight | |
pos_1_global[straight_section_mask] = np.einsum( | |
'nij,nj->ni', inverse_rotation_matrix[straight_section_mask], pos_1_straight | |
) | |
vec_1_global[straight_section_mask] = np.einsum( | |
'nij,nj->ni', inverse_rotation_matrix[straight_section_mask], vec_1_straight | |
) | |
delta_md_array[straight_section_mask] = delta_md_straight | |
# Curved sections | |
curved_section_mask = ~straight_section_mask | |
if np.any(curved_section_mask): | |
curved_radius = radius[curved_section_mask] | |
curved_delta_tvd = delta_tvd[curved_section_mask] | |
curved_a = a[curved_section_mask] | |
curved_c = c[curved_section_mask] | |
k = curved_delta_tvd / curved_radius | |
m = k - curved_a | |
denominator = np.hypot(curved_a, curved_c) | |
theta = np.arcsin(m / denominator) + np.arctan2(curved_a, curved_c) | |
delta_md_curved = theta * curved_radius | |
pos_1_curved, vec_1_curved = _get_local_pos_and_vec(curved_radius, theta) | |
pos_1_global[curved_section_mask] = np.einsum( | |
'nij,nj->ni', inverse_rotation_matrix[curved_section_mask], pos_1_curved | |
) | |
vec_1_global[curved_section_mask] = np.einsum( | |
'nij,nj->ni', inverse_rotation_matrix[curved_section_mask], vec_1_curved | |
) | |
delta_md_array[curved_section_mask] = delta_md_curved | |
# Combine results | |
result = np.zeros((n, 3, 3)) | |
result[:, 0, :] = pos_1_global + pos_0.reshape(pos_1_global.shape) | |
result[:, 1, :] = vec_1_global | |
result[:, 2, :] = np.stack((delta_md_array + md, radius, delta_tvd), axis=-1) | |
return result | |
def interpolate_tvd( | |
survey: we.survey.Survey, tvd: np.ndarray | |
) -> np.ndarray: | |
""" | |
Interpolate TVD values for a survey. | |
Parameters | |
---------- | |
survey : we.survey.Survey | |
The survey to interpolate. | |
tvd : np.ndarray | |
Array of target TVD values. | |
Returns | |
------- | |
np.ndarray | |
Interpolated results. | |
""" | |
survey = check_for_inflection_points(survey) | |
tvd_indices, survey_indices = _get_indices_between(survey.tvd, tvd) | |
rotation = R.from_euler('zyz', _get_angles(survey, survey_indices) * -1, degrees=False) | |
inverse_rotation_matrix = rotation.inv().as_matrix() | |
radius = survey.curve_radius[survey_indices + 1] | |
return _vectorized_solve_for_delta_tvd( | |
radius, | |
tvd[tvd_indices] - survey.tvd[survey_indices], | |
inverse_rotation_matrix, | |
pos_0=survey.pos_nev[survey_indices], | |
md=survey.md[survey_indices], | |
) | |
def main(): | |
""" | |
Main function to fetch and process JSON data. | |
""" | |
url = "https://raw.githubusercontent.com/jonnymaserati/welleng/main/tests/test_data/clearance_iscwsa_well_data.json" | |
well_surveys = fetch_test_wells(url) | |
if not well_surveys: | |
logging.error("Failed to fetch JSON data.") | |
return | |
logging.info("JSON data loaded successfully.") | |
well = well_surveys['wells']['11 - well'] | |
keys = ('MD', 'IncDeg', 'AziDeg') | |
data = {k: well[k] for k in keys} | |
data['MD'].extend([3550, 4550, 5550]) | |
data['IncDeg'].extend([70, 110, 60]) | |
data['AziDeg'].extend([88, 86, 84]) | |
sh = we.survey.SurveyHeader(**well['header']) | |
survey = we.survey.Survey( | |
md=data['MD'], | |
inc=data['IncDeg'], | |
azi=data['AziDeg'], | |
deg=True, | |
radius=50, | |
start_nev=[well['N'][0], well['E'][0], well['TVD'][0]], | |
header=sh, | |
) | |
n = 10_000 | |
start = datetime.now() | |
result = interpolate_tvd(survey, np.linspace(0, survey.tvd[-1], n)) | |
stop = datetime.now() | |
logging.info(f"Time taken for {n} iterations: {stop - start}") | |
for r in result: | |
node = survey.interpolate_md(r[2, 0]) | |
assert np.allclose(node.pos_nev, r[0], rtol=1e-3, atol=1e-3) | |
assert np.any(( | |
np.allclose(node.vec_nev, r[1], rtol=1e-3, atol=1e-3), | |
node.inc_deg < 0.7 | |
)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment