Last active
September 13, 2023 05:53
-
-
Save ibrezm1/682c21a1a1e712d17f0bd576794dc8af to your computer and use it in GitHub Desktop.
Vertex AI - Airflow Dags bigquery Source
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
python -c "from airflow.models import DagBag; d = DagBag();" | |
nohup airflow scheduler > airflow_sch.log 2>&1 & | |
nohup airflow webserver --host 0.0.0.0 --port 8080 > airflow_webserver.log 2>&1 & | |
kill $(ps aux | grep 'airflow webserver --' | awk '{print $2}') | |
kill $(ps aux | grep 'bin/airflow scheduler' | awk '{print $2}') | |
airflow dags unpause create_batch_prediction_job_dag | |
airflow dags trigger create_batch_prediction_job_dag |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[2023-06-17T18:42:48.869+0530] {logging_mixin.py:149} INFO - Changing /mnt/d/projects/airflow/logs/dag_id=vi_create_auto_ml_tabular_training_job_dag/run_id=manual__2023-06-17T13:12:33+00:00/task_id=auto_ml_tabular_task permission to 509 | |
[2023-06-17T18:42:48.891+0530] {logging_mixin.py:149} INFO - Changing /mnt/d/projects/airflow/logs/dag_id=vi_create_auto_ml_tabular_training_job_dag/run_id=manual__2023-06-17T13:12:33+00:00/task_id=auto_ml_tabular_task permission to 509 | |
[2023-06-17T18:42:48.906+0530] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: vi_create_auto_ml_tabular_training_job_dag.auto_ml_tabular_task manual__2023-06-17T13:12:33+00:00 [queued]> | |
[2023-06-17T18:42:48.914+0530] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: vi_create_auto_ml_tabular_training_job_dag.auto_ml_tabular_task manual__2023-06-17T13:12:33+00:00 [queued]> | |
[2023-06-17T18:42:48.916+0530] {taskinstance.py:1308} INFO - Starting attempt 1 of 1 | |
[2023-06-17T18:42:48.931+0530] {taskinstance.py:1327} INFO - Executing <Task(CreateAutoMLTabularTrainingJobOperator): auto_ml_tabular_task> on 2023-06-17 13:12:33+00:00 | |
[2023-06-17T18:42:48.964+0530] {standard_task_runner.py:57} INFO - Started process 12974 to run task | |
[2023-06-17T18:42:48.971+0530] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'vi_create_auto_ml_tabular_training_job_dag', 'auto_ml_tabular_task', 'manual__2023-06-17T13:12:33+00:00', '--job-id', '175', '--raw', '--subdir', 'DAGS_FOLDER/vi_create_model_train.py', '--cfg-path', '/tmp/tmprijpfzql'] | |
[2023-06-17T18:42:48.974+0530] {standard_task_runner.py:85} INFO - Job 175: Subtask auto_ml_tabular_task | |
[2023-06-17T18:42:49.043+0530] {logging_mixin.py:149} INFO - Changing /mnt/d/projects/airflow/logs/dag_id=vi_create_auto_ml_tabular_training_job_dag/run_id=manual__2023-06-17T13:12:33+00:00/task_id=auto_ml_tabular_task permission to 509 | |
[2023-06-17T18:42:49.044+0530] {task_command.py:410} INFO - Running <TaskInstance: vi_create_auto_ml_tabular_training_job_dag.auto_ml_tabular_task manual__2023-06-17T13:12:33+00:00 [running]> on host DESKTOP-EIFUHU2.localdomain | |
[2023-06-17T18:42:49.115+0530] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='vi_create_auto_ml_tabular_training_job_dag' AIRFLOW_CTX_TASK_ID='auto_ml_tabular_task' AIRFLOW_CTX_EXECUTION_DATE='2023-06-17T13:12:33+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-17T13:12:33+00:00' | |
[2023-06-17T18:42:49.120+0530] {base.py:73} INFO - Using connection ID 'gcp_conn' for task execution. | |
[2023-06-17T18:42:52.123+0530] {_metadata.py:141} WARNING - Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: timed out | |
[2023-06-17T18:42:55.125+0530] {_metadata.py:141} WARNING - Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: timed out | |
[2023-06-17T18:42:58.128+0530] {_metadata.py:141} WARNING - Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: timed out | |
[2023-06-17T18:42:58.129+0530] {_default.py:340} WARNING - Authentication failed using Compute Engine authentication due to unavailable metadata server. | |
[2023-06-17T18:42:58.131+0530] {taskinstance.py:1824} ERROR - Task failed with exception | |
Traceback (most recent call last): | |
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/google/cloud/aiplatform/initializer.py", line 244, in project | |
self._set_project_as_env_var_or_google_auth_default() | |
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/google/cloud/aiplatform/initializer.py", line 81, in _set_project_as_env_var_or_google_auth_default | |
credentials, project = google.auth.default() | |
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/google/auth/_default.py", line 692, in default | |
raise exceptions.DefaultCredentialsError(_CLOUD_SDK_MISSING_CREDENTIALS) | |
google.auth.exceptions.DefaultCredentialsError: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information. | |
The above exception was the direct cause of the following exception: | |
Traceback (most recent call last): | |
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py", line 359, in execute | |
dataset=datasets.TabularDataset(dataset_name=self.dataset_id), | |
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/google/cloud/aiplatform/datasets/dataset.py", line 77, in __init__ | |
super().__init__( | |
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/google/cloud/aiplatform/base.py", line 925, in __init__ | |
VertexAiResourceNoun.__init__( | |
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/google/cloud/aiplatform/base.py", line 507, in __init__ | |
self.project = project or initializer.global_config.project | |
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/google/cloud/aiplatform/initializer.py", line 247, in project | |
raise GoogleAuthError(project_not_found_exception_str) from exc | |
google.auth.exceptions.GoogleAuthError: Unable to find your project. Please provide a project ID by: | |
- Passing a constructor argument | |
- Using aiplatform.init() | |
- Setting project using 'gcloud config set project my-project' | |
- Setting a GCP environment variable | |
[2023-06-17T18:42:58.139+0530] {taskinstance.py:1345} INFO - Marking task as FAILED. dag_id=vi_create_auto_ml_tabular_training_job_dag, task_id=auto_ml_tabular_task, execution_date=20230617T131233, start_date=20230617T131248, end_date=20230617T131258 | |
[2023-06-17T18:42:58.152+0530] {standard_task_runner.py:104} ERROR - Failed to execute job 175 for task auto_ml_tabular_task (Unable to find your project. Please provide a project ID by: | |
- Passing a constructor argument | |
- Using aiplatform.init() | |
- Setting project using 'gcloud config set project my-project' | |
- Setting a GCP environment variable; 12974) | |
[2023-06-17T18:42:58.166+0530] {local_task_job_runner.py:225} INFO - Task exited with return code 1 | |
[2023-06-17T18:42:58.183+0530] {taskinstance.py:2651} INFO - 0 downstream tasks scheduled from follow-on schedule check |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG | |
from airflow.models import Variable | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.utils.dates import days_ago | |
from google.cloud.aiplatform import schema | |
from google.protobuf.json_format import ParseDict | |
from google.protobuf.struct_pb2 import Value | |
import logging | |
from airflow.providers.google.cloud.operators.vertex_ai.dataset import CreateDatasetOperator, ImportDataOperator | |
# Get the project and region variables from Airflow | |
PROJECT_ID = 'plated-ensign-390102' | |
REGION = 'us-central1' | |
ENV_ID = '02' | |
# https://github.com/googleapis/python-aiplatform | |
# https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_modules/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.html#TEST_IMPORT_CONFIG | |
TEST_EXPORT_CONFIG = { | |
'bigqueryDestination': { | |
'outputUri': f'bq://{PROJECT_ID}.bqDatasetID.bqTableId', | |
}, | |
}; | |
TEST_IMPORT_CONFIG = [ | |
{ | |
"bigquerySource": { | |
"inputUri": f"bq://{PROJECT_ID}.gsod_training_unique.test" | |
}, | |
}, | |
] | |
# https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_modules/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.html#TABULAR_DATASET | |
# https://programtalk.com/python-more-examples/google.cloud.aiplatform.schema.dataset.metadata.tabular/ | |
TABULAR_DATASET = { | |
"display_name": f"tabular-dataset-{ENV_ID}", | |
"metadata_schema_uri": schema.dataset.metadata.tabular, | |
"metadata": ParseDict( | |
{ | |
"input_config": { | |
"bigquery_source": {"uri": [f"bq://{PROJECT_ID}.gsod_training_unique.test"]} | |
} | |
}, | |
Value(), | |
), | |
} | |
default_args = { | |
'start_date': days_ago(1), | |
'schedule_interval': None, | |
} | |
with DAG( | |
dag_id='vi_create_dataset', | |
default_args=default_args, | |
catchup=False, | |
) as dag: | |
start_task = DummyOperator(task_id='start_task') | |
end_task = DummyOperator(task_id='end_task') | |
create_tabular_dataset = CreateDatasetOperator( | |
gcp_conn_id='gcp_conn', | |
task_id='tabular_dataset', | |
dataset=TABULAR_DATASET, | |
region=REGION, | |
project_id=PROJECT_ID | |
) | |
tabular_dataset_id = create_tabular_dataset.output["dataset_id"] | |
logging.info('tabular_dataset_id -------------') | |
logging.info(tabular_dataset_id) | |
logging.info('tabular_dataset_id -------------') | |
# https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/vertex_ai/dataset/index.html | |
start_task >> create_tabular_dataset >> end_task | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.utils.dates import days_ago | |
from google.cloud.aiplatform import schema | |
from google.protobuf.json_format import ParseDict | |
from google.protobuf.struct_pb2 import Value | |
from airflow.providers.google.cloud.operators.vertex_ai.dataset import CreateDatasetOperator | |
from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import ( | |
CreateAutoMLTabularTrainingJobOperator, | |
DeleteAutoMLTrainingJobOperator, | |
) | |
PROJECT_ID = 'plated-ensign-390102' | |
REGION = 'us-central1' | |
ENV_ID = '02' | |
TABULAR_DISPLAY_NAME = f"auto-ml-tabular-{ENV_ID}" | |
TABULAR_DISPLAY_NAME = 'your-tabular-display-name' | |
COLUMN_TRANSFORMATIONS = [ | |
{"categorical": {"column_name": "Type"}}, | |
{"numeric": {"column_name": "Age"}}, | |
{"categorical": {"column_name": "Breed1"}}, | |
{"categorical": {"column_name": "Color1"}}, | |
{"categorical": {"column_name": "Color2"}}, | |
{"categorical": {"column_name": "MaturitySize"}}, | |
{"categorical": {"column_name": "FurLength"}}, | |
{"categorical": {"column_name": "Vaccinated"}}, | |
{"categorical": {"column_name": "Sterilized"}}, | |
{"categorical": {"column_name": "Health"}}, | |
{"numeric": {"column_name": "Fee"}}, | |
{"numeric": {"column_name": "PhotoAmt"}}, | |
] | |
TABULAR_DATASET = { | |
"display_name": f"tabular-dataset-{ENV_ID}", | |
"metadata_schema_uri": schema.dataset.metadata.tabular, | |
"metadata": ParseDict( | |
{ | |
"input_config": { | |
"bigquery_source": {"uri": [f"bq://{PROJECT_ID}.gsod_training_unique.test"]} | |
} | |
}, | |
Value(), | |
), | |
} | |
default_args = { | |
'start_date': days_ago(1), | |
'schedule_interval': None, | |
} | |
with DAG( | |
dag_id='vi_create_auto_ml_tabular_training_job_dag', | |
default_args=default_args, | |
catchup=False, | |
) as dag: | |
start_task = DummyOperator(task_id='start_task') | |
end_task = DummyOperator(task_id='end_task') | |
create_tabular_dataset = CreateDatasetOperator( | |
gcp_conn_id='gcp_conn', | |
task_id='tabular_dataset', | |
dataset=TABULAR_DATASET, | |
region=REGION, | |
project_id=PROJECT_ID | |
) | |
tabular_dataset_id = create_tabular_dataset.output["dataset_id"] | |
# https://programtalk.com/python-more-examples/google.cloud.aiplatform.schema.dataset.metadata.tabular/ | |
# https://github.com/googleapis/python-aiplatform | |
# https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_modules/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.html#TEST_IMPORT_CONFIG | |
# https://cloud.google.com/vertex-ai/docs/tabular-data/classification-regression/get-batch-predictions#api:-bigquery | |
# https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py | |
# https://github.com/googleapis/python-aiplatform/blob/main/google/cloud/aiplatform/initializer.py | |
# https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/example_dags/example_vertex_ai.py | |
create_auto_ml_tabular_training_job = CreateAutoMLTabularTrainingJobOperator( | |
gcp_conn_id='gcp_conn', | |
task_id="auto_ml_tabular_task", | |
display_name=TABULAR_DISPLAY_NAME, | |
optimization_prediction_type="regression", | |
optimization_objective="minimize-rmse", | |
#column_transformations=COLUMN_TRANSFORMATIONS, | |
dataset_id=tabular_dataset_id, # Get this // | |
target_column="mean_temp", | |
training_fraction_split=0.8, | |
validation_fraction_split=0.1, | |
test_fraction_split=0.1, | |
model_display_name='your-model-display-name', | |
disable_early_stopping=False, | |
region=REGION, | |
project_id=PROJECT_ID | |
) | |
start_task >> create_tabular_dataset >> create_auto_ml_tabular_training_job >> end_task |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.utils.dates import days_ago | |
from airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job import ( | |
CreateBatchPredictionJobOperator, | |
) | |
PROJECT_ID = 'plated-ensign-390102' | |
REGION = 'us-central1' | |
ENV_ID = '02' | |
JOB_DISPLAY_NAME = 'your-job-display-name' | |
BIGQUERY_SOURCE = f'bq://{PROJECT_ID}.gsod_prediction_unique.prediction' # plated-ensign-390102.gsod_prediction_unique.prediction | |
BIGQUERY_TARGET = f'bq://{PROJECT_ID}.gsod_results_unique' | |
MODEL_PARAMETERS = { | |
# Define your model parameters here | |
} | |
default_args = { | |
'start_date': days_ago(1), | |
'schedule_interval': None, | |
} | |
with DAG( | |
dag_id='create_batch_prediction_job_dag', | |
default_args=default_args, | |
catchup=False, | |
) as dag: | |
start_task = DummyOperator(task_id='start_task') | |
end_task = DummyOperator(task_id='end_task') | |
# https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/vertex_ai/batch_prediction_job/index.html | |
create_batch_prediction_job = CreateBatchPredictionJobOperator( | |
region=REGION, | |
project_id=PROJECT_ID, | |
gcp_conn_id='gcp_conn', | |
task_id="create_batch_prediction_job", | |
job_display_name=JOB_DISPLAY_NAME, | |
model_name='2217066241363804160', #"{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}", | |
predictions_format="bigquery", | |
bigquery_source=BIGQUERY_SOURCE, | |
bigquery_destination_prefix=BIGQUERY_TARGET, | |
model_parameters=MODEL_PARAMETERS | |
#generateExplanation=false | |
) | |
start_task >> create_batch_prediction_job >> end_task |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I implemented the
vi_create_dataset_dag
for image_dataset and the dataset that is created inside vertexai is empty. Now I want to upload/import images along with their annotations inside it.Hey, can you kindly specify how to upload images along with annotations inside the dataset created
vi_create_dataset_dag.py
.