Skip to content

Instantly share code, notes, and snippets.

@ibrezm1
Last active September 13, 2023 05:53
Show Gist options
  • Save ibrezm1/682c21a1a1e712d17f0bd576794dc8af to your computer and use it in GitHub Desktop.
Save ibrezm1/682c21a1a1e712d17f0bd576794dc8af to your computer and use it in GitHub Desktop.
Vertex AI - Airflow Dags bigquery Source
python -c "from airflow.models import DagBag; d = DagBag();"
nohup airflow scheduler > airflow_sch.log 2>&1 &
nohup airflow webserver --host 0.0.0.0 --port 8080 > airflow_webserver.log 2>&1 &
kill $(ps aux | grep 'airflow webserver --' | awk '{print $2}')
kill $(ps aux | grep 'bin/airflow scheduler' | awk '{print $2}')
airflow dags unpause create_batch_prediction_job_dag
airflow dags trigger create_batch_prediction_job_dag
[2023-06-17T18:42:48.869+0530] {logging_mixin.py:149} INFO - Changing /mnt/d/projects/airflow/logs/dag_id=vi_create_auto_ml_tabular_training_job_dag/run_id=manual__2023-06-17T13:12:33+00:00/task_id=auto_ml_tabular_task permission to 509
[2023-06-17T18:42:48.891+0530] {logging_mixin.py:149} INFO - Changing /mnt/d/projects/airflow/logs/dag_id=vi_create_auto_ml_tabular_training_job_dag/run_id=manual__2023-06-17T13:12:33+00:00/task_id=auto_ml_tabular_task permission to 509
[2023-06-17T18:42:48.906+0530] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: vi_create_auto_ml_tabular_training_job_dag.auto_ml_tabular_task manual__2023-06-17T13:12:33+00:00 [queued]>
[2023-06-17T18:42:48.914+0530] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: vi_create_auto_ml_tabular_training_job_dag.auto_ml_tabular_task manual__2023-06-17T13:12:33+00:00 [queued]>
[2023-06-17T18:42:48.916+0530] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-06-17T18:42:48.931+0530] {taskinstance.py:1327} INFO - Executing <Task(CreateAutoMLTabularTrainingJobOperator): auto_ml_tabular_task> on 2023-06-17 13:12:33+00:00
[2023-06-17T18:42:48.964+0530] {standard_task_runner.py:57} INFO - Started process 12974 to run task
[2023-06-17T18:42:48.971+0530] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'vi_create_auto_ml_tabular_training_job_dag', 'auto_ml_tabular_task', 'manual__2023-06-17T13:12:33+00:00', '--job-id', '175', '--raw', '--subdir', 'DAGS_FOLDER/vi_create_model_train.py', '--cfg-path', '/tmp/tmprijpfzql']
[2023-06-17T18:42:48.974+0530] {standard_task_runner.py:85} INFO - Job 175: Subtask auto_ml_tabular_task
[2023-06-17T18:42:49.043+0530] {logging_mixin.py:149} INFO - Changing /mnt/d/projects/airflow/logs/dag_id=vi_create_auto_ml_tabular_training_job_dag/run_id=manual__2023-06-17T13:12:33+00:00/task_id=auto_ml_tabular_task permission to 509
[2023-06-17T18:42:49.044+0530] {task_command.py:410} INFO - Running <TaskInstance: vi_create_auto_ml_tabular_training_job_dag.auto_ml_tabular_task manual__2023-06-17T13:12:33+00:00 [running]> on host DESKTOP-EIFUHU2.localdomain
[2023-06-17T18:42:49.115+0530] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='vi_create_auto_ml_tabular_training_job_dag' AIRFLOW_CTX_TASK_ID='auto_ml_tabular_task' AIRFLOW_CTX_EXECUTION_DATE='2023-06-17T13:12:33+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-17T13:12:33+00:00'
[2023-06-17T18:42:49.120+0530] {base.py:73} INFO - Using connection ID 'gcp_conn' for task execution.
[2023-06-17T18:42:52.123+0530] {_metadata.py:141} WARNING - Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: timed out
[2023-06-17T18:42:55.125+0530] {_metadata.py:141} WARNING - Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: timed out
[2023-06-17T18:42:58.128+0530] {_metadata.py:141} WARNING - Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: timed out
[2023-06-17T18:42:58.129+0530] {_default.py:340} WARNING - Authentication failed using Compute Engine authentication due to unavailable metadata server.
[2023-06-17T18:42:58.131+0530] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/google/cloud/aiplatform/initializer.py", line 244, in project
self._set_project_as_env_var_or_google_auth_default()
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/google/cloud/aiplatform/initializer.py", line 81, in _set_project_as_env_var_or_google_auth_default
credentials, project = google.auth.default()
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/google/auth/_default.py", line 692, in default
raise exceptions.DefaultCredentialsError(_CLOUD_SDK_MISSING_CREDENTIALS)
google.auth.exceptions.DefaultCredentialsError: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py", line 359, in execute
dataset=datasets.TabularDataset(dataset_name=self.dataset_id),
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/google/cloud/aiplatform/datasets/dataset.py", line 77, in __init__
super().__init__(
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/google/cloud/aiplatform/base.py", line 925, in __init__
VertexAiResourceNoun.__init__(
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/google/cloud/aiplatform/base.py", line 507, in __init__
self.project = project or initializer.global_config.project
File "/mnt/d/projects/tvenv/lib/python3.8/site-packages/google/cloud/aiplatform/initializer.py", line 247, in project
raise GoogleAuthError(project_not_found_exception_str) from exc
google.auth.exceptions.GoogleAuthError: Unable to find your project. Please provide a project ID by:
- Passing a constructor argument
- Using aiplatform.init()
- Setting project using 'gcloud config set project my-project'
- Setting a GCP environment variable
[2023-06-17T18:42:58.139+0530] {taskinstance.py:1345} INFO - Marking task as FAILED. dag_id=vi_create_auto_ml_tabular_training_job_dag, task_id=auto_ml_tabular_task, execution_date=20230617T131233, start_date=20230617T131248, end_date=20230617T131258
[2023-06-17T18:42:58.152+0530] {standard_task_runner.py:104} ERROR - Failed to execute job 175 for task auto_ml_tabular_task (Unable to find your project. Please provide a project ID by:
- Passing a constructor argument
- Using aiplatform.init()
- Setting project using 'gcloud config set project my-project'
- Setting a GCP environment variable; 12974)
[2023-06-17T18:42:58.166+0530] {local_task_job_runner.py:225} INFO - Task exited with return code 1
[2023-06-17T18:42:58.183+0530] {taskinstance.py:2651} INFO - 0 downstream tasks scheduled from follow-on schedule check
from airflow import DAG
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from google.cloud.aiplatform import schema
from google.protobuf.json_format import ParseDict
from google.protobuf.struct_pb2 import Value
import logging
from airflow.providers.google.cloud.operators.vertex_ai.dataset import CreateDatasetOperator, ImportDataOperator
# Get the project and region variables from Airflow
PROJECT_ID = 'plated-ensign-390102'
REGION = 'us-central1'
ENV_ID = '02'
# https://github.com/googleapis/python-aiplatform
# https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_modules/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.html#TEST_IMPORT_CONFIG
TEST_EXPORT_CONFIG = {
'bigqueryDestination': {
'outputUri': f'bq://{PROJECT_ID}.bqDatasetID.bqTableId',
},
};
TEST_IMPORT_CONFIG = [
{
"bigquerySource": {
"inputUri": f"bq://{PROJECT_ID}.gsod_training_unique.test"
},
},
]
# https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_modules/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.html#TABULAR_DATASET
# https://programtalk.com/python-more-examples/google.cloud.aiplatform.schema.dataset.metadata.tabular/
TABULAR_DATASET = {
"display_name": f"tabular-dataset-{ENV_ID}",
"metadata_schema_uri": schema.dataset.metadata.tabular,
"metadata": ParseDict(
{
"input_config": {
"bigquery_source": {"uri": [f"bq://{PROJECT_ID}.gsod_training_unique.test"]}
}
},
Value(),
),
}
default_args = {
'start_date': days_ago(1),
'schedule_interval': None,
}
with DAG(
dag_id='vi_create_dataset',
default_args=default_args,
catchup=False,
) as dag:
start_task = DummyOperator(task_id='start_task')
end_task = DummyOperator(task_id='end_task')
create_tabular_dataset = CreateDatasetOperator(
gcp_conn_id='gcp_conn',
task_id='tabular_dataset',
dataset=TABULAR_DATASET,
region=REGION,
project_id=PROJECT_ID
)
tabular_dataset_id = create_tabular_dataset.output["dataset_id"]
logging.info('tabular_dataset_id -------------')
logging.info(tabular_dataset_id)
logging.info('tabular_dataset_id -------------')
# https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/vertex_ai/dataset/index.html
start_task >> create_tabular_dataset >> end_task
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from google.cloud.aiplatform import schema
from google.protobuf.json_format import ParseDict
from google.protobuf.struct_pb2 import Value
from airflow.providers.google.cloud.operators.vertex_ai.dataset import CreateDatasetOperator
from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
CreateAutoMLTabularTrainingJobOperator,
DeleteAutoMLTrainingJobOperator,
)
PROJECT_ID = 'plated-ensign-390102'
REGION = 'us-central1'
ENV_ID = '02'
TABULAR_DISPLAY_NAME = f"auto-ml-tabular-{ENV_ID}"
TABULAR_DISPLAY_NAME = 'your-tabular-display-name'
COLUMN_TRANSFORMATIONS = [
{"categorical": {"column_name": "Type"}},
{"numeric": {"column_name": "Age"}},
{"categorical": {"column_name": "Breed1"}},
{"categorical": {"column_name": "Color1"}},
{"categorical": {"column_name": "Color2"}},
{"categorical": {"column_name": "MaturitySize"}},
{"categorical": {"column_name": "FurLength"}},
{"categorical": {"column_name": "Vaccinated"}},
{"categorical": {"column_name": "Sterilized"}},
{"categorical": {"column_name": "Health"}},
{"numeric": {"column_name": "Fee"}},
{"numeric": {"column_name": "PhotoAmt"}},
]
TABULAR_DATASET = {
"display_name": f"tabular-dataset-{ENV_ID}",
"metadata_schema_uri": schema.dataset.metadata.tabular,
"metadata": ParseDict(
{
"input_config": {
"bigquery_source": {"uri": [f"bq://{PROJECT_ID}.gsod_training_unique.test"]}
}
},
Value(),
),
}
default_args = {
'start_date': days_ago(1),
'schedule_interval': None,
}
with DAG(
dag_id='vi_create_auto_ml_tabular_training_job_dag',
default_args=default_args,
catchup=False,
) as dag:
start_task = DummyOperator(task_id='start_task')
end_task = DummyOperator(task_id='end_task')
create_tabular_dataset = CreateDatasetOperator(
gcp_conn_id='gcp_conn',
task_id='tabular_dataset',
dataset=TABULAR_DATASET,
region=REGION,
project_id=PROJECT_ID
)
tabular_dataset_id = create_tabular_dataset.output["dataset_id"]
# https://programtalk.com/python-more-examples/google.cloud.aiplatform.schema.dataset.metadata.tabular/
# https://github.com/googleapis/python-aiplatform
# https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_modules/tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_dataset.html#TEST_IMPORT_CONFIG
# https://cloud.google.com/vertex-ai/docs/tabular-data/classification-regression/get-batch-predictions#api:-bigquery
# https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/vertex_ai/auto_ml.py
# https://github.com/googleapis/python-aiplatform/blob/main/google/cloud/aiplatform/initializer.py
# https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/example_dags/example_vertex_ai.py
create_auto_ml_tabular_training_job = CreateAutoMLTabularTrainingJobOperator(
gcp_conn_id='gcp_conn',
task_id="auto_ml_tabular_task",
display_name=TABULAR_DISPLAY_NAME,
optimization_prediction_type="regression",
optimization_objective="minimize-rmse",
#column_transformations=COLUMN_TRANSFORMATIONS,
dataset_id=tabular_dataset_id, # Get this //
target_column="mean_temp",
training_fraction_split=0.8,
validation_fraction_split=0.1,
test_fraction_split=0.1,
model_display_name='your-model-display-name',
disable_early_stopping=False,
region=REGION,
project_id=PROJECT_ID
)
start_task >> create_tabular_dataset >> create_auto_ml_tabular_training_job >> end_task
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job import (
CreateBatchPredictionJobOperator,
)
PROJECT_ID = 'plated-ensign-390102'
REGION = 'us-central1'
ENV_ID = '02'
JOB_DISPLAY_NAME = 'your-job-display-name'
BIGQUERY_SOURCE = f'bq://{PROJECT_ID}.gsod_prediction_unique.prediction' # plated-ensign-390102.gsod_prediction_unique.prediction
BIGQUERY_TARGET = f'bq://{PROJECT_ID}.gsod_results_unique'
MODEL_PARAMETERS = {
# Define your model parameters here
}
default_args = {
'start_date': days_ago(1),
'schedule_interval': None,
}
with DAG(
dag_id='create_batch_prediction_job_dag',
default_args=default_args,
catchup=False,
) as dag:
start_task = DummyOperator(task_id='start_task')
end_task = DummyOperator(task_id='end_task')
# https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/vertex_ai/batch_prediction_job/index.html
create_batch_prediction_job = CreateBatchPredictionJobOperator(
region=REGION,
project_id=PROJECT_ID,
gcp_conn_id='gcp_conn',
task_id="create_batch_prediction_job",
job_display_name=JOB_DISPLAY_NAME,
model_name='2217066241363804160', #"{{ti.xcom_pull('auto_ml_forecasting_task')['name']}}",
predictions_format="bigquery",
bigquery_source=BIGQUERY_SOURCE,
bigquery_destination_prefix=BIGQUERY_TARGET,
model_parameters=MODEL_PARAMETERS
#generateExplanation=false
)
start_task >> create_batch_prediction_job >> end_task
@roy-sub
Copy link

roy-sub commented Sep 13, 2023

I implemented the vi_create_dataset_dag for image_dataset and the dataset that is created inside vertexai is empty. Now I want to upload/import images along with their annotations inside it.

Hey, can you kindly specify how to upload images along with annotations inside the dataset created vi_create_dataset_dag.py.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment