Last active
          January 2, 2024 07:56 
        
      - 
      
- 
        Save practice/25e6080e14057cbce7fd2d2cf656892e to your computer and use it in GitHub Desktop. 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | import json | |
| import pathlib | |
| import requests | |
| import airflow | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import datetime | |
| from datetime import timedelta | |
| import glob | |
| import os | |
| from pandas.errors import EmptyDataError | |
| from airflow import DAG | |
| from airflow.operators.python import PythonOperator | |
| from airflow.models import Variable | |
| dag = DAG( | |
| description="AERONET classification by UNIST FineParticle web", | |
| dag_id="aeronet_classify", | |
| start_date=airflow.utils.dates.days_ago(0, second=10), | |
| schedule_interval=datetime.timedelta(minutes=30), | |
| max_active_runs=1, | |
| ) | |
| def load_df(file): | |
| print(f"loading file:{file}") | |
| ###############bring data###################################### | |
| if file.endswith('if_no_html=1'): #만약 aae, eae 파일이면 | |
| df = pd.read_table(file, sep=",", skiprows=6) | |
| else: | |
| df = pd.read_table(file, sep=",", skiprows=7) | |
| df = df.drop(df.index[-1]) | |
| ###############change column name of fmf####################### | |
| if file.endswith('SDA15=1&AVG=10'): #만약 fmf 파일이면 | |
| df.rename(columns={'Date_(dd:mm:yyyy)':'Date(dd:mm:yyyy)'}, inplace=True) | |
| df.rename(columns={'Time_(hh:mm:ss)':'Time(hh:mm:ss)'}, inplace=True) | |
| ###############remove -999##################################### | |
| df = df.replace(to_replace = -999, value = np.nan) | |
| ###############converge date and time########################## | |
| # print(df.columns) | |
| df['times'] = df[['Date(dd:mm:yyyy)','Time(hh:mm:ss)']].apply(lambda x:' '.join(x),axis=1) | |
| df.drop(columns=['Date(dd:mm:yyyy)', 'Time(hh:mm:ss)'], inplace=True) | |
| ##############make datetime index############################## | |
| df['times'] = pd.to_datetime(df['times'], format = "%d:%m:%Y %H:%M:%S") | |
| df['times'] = pd.DatetimeIndex(df['times']) + timedelta(hours = 9) | |
| df = df.set_index('times') | |
| print(f"loading file:{file} done.") | |
| return df | |
| def gen_image(finep_url, year, month, day): | |
| os.umask(0) | |
| today = datetime.datetime(int(year),int(month),int(day)) | |
| yesterday = today - timedelta(days=1) | |
| sites = ["Socheongcho", "Anmyon", "Seoul_SNU", "Yonsei_University", "Gangneung_WNU", "KORUS_UNIST_Ulsan"] | |
| # sites = ["Yonsei_University"] | |
| for site in sites: | |
| for a_day in [today, yesterday]: | |
| gen_image_for_site(site, finep_url, a_day) | |
| # DB save | |
| print(f"calling finep /handle-aeronet-classify file handling for year={year},month={month},day={day}") | |
| url = f"{finep_url}/handle-aeronet-classify" | |
| print(f"posting {url}") | |
| res = requests.post(url) | |
| if res.status_code != 200: | |
| print(res.text) | |
| raise ValueError(f"Error response: {res.status_code}") | |
| return res.text | |
| def gen_image_for_site(site, finep_url, day): | |
| print(f"gen_image called with: {site},{finep_url},{day}") | |
| #### Bring every products (all point) #### | |
| # AEROSOL OPTICAL DEPTH Category | |
| # aod_file = path+'.lev15' # 1.5 : Aerosol Optical Depth (AOD) with Precipitable Water and Angstrom Parameter | |
| # fmf_file = path+'.ONEILL_lev15' # 1.5 : Spectral Deconvolution Algorithm (SDA) Retrievals --Fine Mode AOD, Coarse Mode AOD, and Fine Mode Fraction | |
| # AEROSOL INVERSION Category | |
| # aae_file = path+'.tab' # 1.5 Almucantar : Absorption AOD | |
| # eae_file = path+'.aod' # 1.5 Almucantar : Extinction AOD | |
| #%% File Path ####################################################################################################### | |
| output_dir = '/data/finepart/aeronet-classify/temp/' | |
| os.makedirs(output_dir, exist_ok=True) | |
| date_today = pd.Timestamp(day) | |
| days = 20 | |
| while True: | |
| try: | |
| date_before = pd.Timestamp(date_today-timedelta(days=days)) | |
| #### Bring every products (all point) #### | |
| # AEROSOL OPTICAL DEPTH Category | |
| # 1.5 : Aerosol Optical Depth (AOD) with Precipitable Water and Angstrom Parameter | |
| aod_file = f'https://aeronet.gsfc.nasa.gov/cgi-bin/print_web_data_v3?site={site}&year={date_before.year}&month={date_before.month}&day={date_before.day}&year2={date_today.year}&month2={date_today.month}&day2={date_today.day}&AOD15=1&AVG=10' | |
| # 1.5 : Spectral Deconvolution Algorithm (SDA) Retrievals --Fine Mode AOD, Coarse Mode AOD, and Fine Mode Fraction | |
| fmf_file = f'https://aeronet.gsfc.nasa.gov/cgi-bin/print_web_data_v3?site={site}&year={date_before.year}&month={date_before.month}&day={date_before.day}&year2={date_today.year}&month2={date_today.month}&day2={date_today.day}&SDA15=1&AVG=10' | |
| # AEROSOL INVERSION Category | |
| # 1.5 Almucantar : Absorption AOD | |
| aae_file = f'https://aeronet.gsfc.nasa.gov/cgi-bin/print_web_data_inv_v3?site={site}&year={date_before.year}&month={date_before.month}&day={date_before.day}&year2={date_today.year}&month2={date_today.month}&day2={date_today.day}&product=TAB&AVG=10&ALM15=1&if_no_html=1' | |
| # 1.5 Almucantar : Extinction AOD | |
| eae_file = f'https://aeronet.gsfc.nasa.gov/cgi-bin/print_web_data_inv_v3?site={site}&year={date_before.year}&month={date_before.month}&day={date_before.day}&year2={date_today.year}&month2={date_today.month}&day2={date_today.day}&product=AOD&AVG=10&ALM15=1&if_no_html=1' | |
| data_aod = load_df(aod_file) | |
| data_fmf = load_df(fmf_file) | |
| data_aae = load_df(aae_file) | |
| data_eae = load_df(eae_file) | |
| break | |
| except EmptyDataError as e: | |
| days = days+10 | |
| #%% ARRANGE ####################################################################################################### | |
| data_AOD = data_aod[['AOD_440nm','AOD_500nm']] | |
| data_FMF = data_fmf[['FineModeFraction_500nm[eta]']] | |
| if site == 'Socheongcho': | |
| data_AOD = data_aod[['AOD_443nm','AOD_500nm']] | |
| data_AAE = data_aae[['Absorption_AOD[443nm]','Absorption_AOD[870nm]','Absorption_Angstrom_Exponent_440-870nm']] | |
| data_EAE = data_eae[['AOD_Extinction-Total[443nm]','AOD_Extinction-Total[870nm]']] | |
| data_AOD = data_AOD.rename(columns={'AOD_443nm':'AOD_440nm'}) | |
| data_AAE = data_AAE.rename(columns={'Absorption_AOD[443nm]':'Absorption_AOD[440nm]'}) | |
| data_EAE = data_EAE.rename(columns={'AOD_Extinction-Total[443nm]':'AOD_Extinction-Total[440nm]'}) | |
| else: | |
| data_AOD = data_aod[['AOD_440nm','AOD_500nm']] | |
| data_AAE = data_aae[['Absorption_AOD[440nm]','Absorption_AOD[870nm]','Absorption_Angstrom_Exponent_440-870nm']] | |
| data_EAE = data_eae[['AOD_Extinction-Total[440nm]','AOD_Extinction-Total[870nm]']] | |
| #merge AOD-FMF, AAE-EAE (depend on time index) | |
| data_AOD_FMF = data_AOD.join(data_FMF, how='left') | |
| data_AAE_EAE = data_AAE.join(data_EAE, how='left') | |
| #latest date | |
| date_latest = data_AAE_EAE.index[-1] | |
| #%% Calculte SAE ####################################################################################################### | |
| data_AAE_EAE = data_AAE_EAE.rename(columns={'Absorption_Angstrom_Exponent_440-870nm':'AAE'}) | |
| data_AAE_EAE['scattering440'] = data_AAE_EAE['AOD_Extinction-Total[440nm]']-data_AAE_EAE['Absorption_AOD[440nm]'] | |
| data_AAE_EAE['scattering870'] = data_AAE_EAE['AOD_Extinction-Total[870nm]']-data_AAE_EAE['Absorption_AOD[870nm]'] | |
| data_AAE_EAE['SAE'] = -np.log(data_AAE_EAE['scattering440']/data_AAE_EAE['scattering870'])/np.log(440/870) | |
| #%% Make Daily data ###################################################################################################### | |
| data_AOD_FMF = data_AOD_FMF.resample('1d').mean() | |
| #std of AAE and SAE | |
| data_AAE_std = data_AAE_EAE['AAE'].resample('1d').std() | |
| data_SAE_std = data_AAE_EAE['SAE'].resample('1d').std() | |
| data_AAE_SAE = data_AAE_EAE.resample('1d').mean() | |
| data_AAE_SAE['SAE_std'] = data_SAE_std | |
| data_AAE_SAE['AAE_std'] = data_AAE_std | |
| data_AAE_SAE = data_AAE_SAE[['AAE','SAE','AAE_std','SAE_std']] | |
| ### total 7 products ### | |
| data = data_AOD_FMF.join(data_AAE_SAE, how='left') | |
| #AAE-SAE criteria | |
| for index, row in data.iterrows(): | |
| bc = np.nan | |
| if row['AOD_440nm'] <= 0.4: ##classification AOD minimum criteria | |
| data.at[index, 'type'] = 'Low' | |
| data.at[index, 'color'] = 'white' | |
| if row['AOD_440nm'] > 0.4: | |
| if row['SAE'] <= 0 and row['AAE'] >= 2: | |
| data.at[index, 'type'] = 'Dust' | |
| data.at[index, 'color'] = 'gold' | |
| if row['SAE'] <= 1.5 and row['AAE'] >= 1.5 and row['FineModeFraction_500nm[eta]'] < 0.4: | |
| data.at[index, 'type'] = 'Dust' | |
| data.at[index, 'color'] = 'gold' | |
| if row['SAE']<=1.5 and row['AAE']>=1.5 and row['FineModeFraction_500nm[eta]']>=0.4: | |
| data.at[index, 'type'] = 'BC+BrC' | |
| data.at[index, 'color'] = 'tan' | |
| if row['SAE']<1 and row['AAE']<1: | |
| data.at[index, 'type'] = 'Uncertain' | |
| data.at[index, 'color'] = 'skyblue' | |
| if row['SAE']>=1 and row['AAE']<1: | |
| data.at[index, 'type'] = 'NA' | |
| data.at[index, 'color'] = 'hotpink' | |
| if row['SAE']<1.5 and row['AAE']>=1.0 and row['AAE']<1.5 and row['AAE']>row['SAE'] and row['FineModeFraction_500nm[eta]']>0.6: | |
| bc = 1 | |
| if row['SAE']<1.5 and row['AAE']>=1.0 and row['AAE']<1.5 and row['AAE'] > row['SAE'] and row['FineModeFraction_500nm[eta]']<=0.6: | |
| data.at[index, 'type'] = 'Uncertain' | |
| data.at[index, 'color'] = 'skyblue' | |
| if row['SAE']>=1 and row['AAE']>=1.0 and row['AAE']<1.5 and row['AAE']<=row['SAE']: | |
| bc = 1 | |
| if row['SAE']>=1.5 and row['AAE']>1.5: | |
| bc = 1 | |
| if bc == 1: | |
| data.at[index, 'bc'] = 1 | |
| if bc == 1 and row['AAE']>=2: | |
| data.at[index, 'type'] = 'BrC' | |
| data.at[index, 'color'] = 'brown' | |
| if bc == 1 and 1.5<=row['AAE']<2: | |
| data.at[index, 'type'] = 'BC+BrC' | |
| data.at[index, 'color'] = 'tan' | |
| if bc == 1 and row['AAE']<1.5: | |
| data.at[index, 'type'] = 'BC' | |
| data.at[index, 'color'] = 'dimgray' | |
| #%% ############################################################################## | |
| #%% Date ####################################################################################################### | |
| data_recent = data[date_today-timedelta(days=days):date_today] | |
| data_recent = data_recent.dropna(axis=0, subset=['AAE', 'AOD_440nm']) #if AAE is none, delete row | |
| #%%1. Make Grid ############################################################################## | |
| fig, ax = plt.subplots(figsize = (12,12)) | |
| #Data Plotting | |
| plt.scatter(data_recent['SAE'], data_recent['AAE'], c=data_recent['color'], s=300, edgecolors='red', marker='o', zorder=2) | |
| plt.errorbar(data_recent['SAE'], data_recent['AAE'], xerr=data_recent['SAE_std'], yerr=data_recent['AAE_std'], color='None', \ | |
| ecolor='red',elinewidth=1.5, capsize=8, zorder=1) | |
| #Write down Dates | |
| for t in range (0, np.size(data_recent.index)): | |
| if np.isnan(data_recent['SAE'][t]) == False and -0.5<data_recent['SAE'][t]<2.5 and 0<data_recent['AAE'][t]<3: | |
| plt.text(data_recent['SAE'][t]+0.03, data_recent['AAE'][t]+0.03, data_recent.index[t].strftime("%m%d"), fontsize=15, fontweight='bold') | |
| plt.xlim(-0.5,2.51) | |
| plt.xticks(np.arange(-0.5,2.501,0.5),fontsize=15) | |
| plt.ylim(0,3) | |
| plt.yticks(np.arange(0,3.01,1),fontsize=15) | |
| plt.xlabel('SAE 440-870nm', fontsize=25) | |
| plt.ylabel('AAE 440-870nm', fontsize=25) | |
| plt.title('< Aerosol Type Classification [ '+site+' ] >', fontsize=20, fontweight='bold') | |
| #draw lines | |
| plt.plot([1,1],[0,1],'-',color='grey',linewidth=1,alpha = 0.8) | |
| plt.plot([-0.5,2.5],[1,1],'-',color='grey',linewidth=1,alpha = 0.8) | |
| plt.plot([-0.5,2.5],[1.5,1.5],'-',color='grey',linewidth=1,alpha = 0.8) | |
| plt.plot([1.5,1.5],[1.5,3],'-',color='grey',linewidth=1,alpha = 0.8) | |
| plt.plot([1.5,2.5],[1.5,1.5],'-',color='grey',linewidth=1,alpha = 0.8) | |
| plt.plot([1.5,2.5],[2,2],'-',color='grey',linewidth=1,alpha = 0.8) | |
| plt.plot([-0.5,0],[2,2],'-',color='grey',linewidth=1,alpha = 0.8) | |
| plt.plot([0,0],[2,3],'-',color='grey',linewidth=1,alpha = 0.8) | |
| plt.plot([1,1.5],[1,1.5],'-',color='grey',linewidth=1,alpha = 0.8) | |
| #type names | |
| plt.text(-0.1,0.45,'Uncertain', fontsize=30) | |
| plt.text(-0.28,0.33,'large and low absorbing', fontsize=18, color='grey') | |
| plt.text(-0.4,2.43,'Dust', fontsize=30) | |
| plt.text(1.63,0.45,' NA', fontsize=30) | |
| plt.text(1.13,0.33,' small and low (non) absorbing', fontsize=18, color='grey') | |
| plt.text(1.88,1.18,'BC', fontsize=30) | |
| plt.text(1.7,1.06,'black carbon', fontsize=18, color='grey') | |
| plt.text(1.7,1.68,'BC + BrC', fontsize=30) | |
| plt.text(1.88,2.43,'BrC', fontsize=30) | |
| plt.text(1.72,2.31,'brown carbon', fontsize=18, color='grey') | |
| plt.text(-0.1,1.2,'Uncertain / BC', fontsize=30) | |
| plt.text(0.3,2.2,'Dust / BC + BrC', fontsize=30) | |
| #fmf criteria | |
| plt.text(-0.1,1.06,'FMF<0.6', fontsize=12, color='blue') | |
| plt.text(0.68,1.06,'FMF>0.6', fontsize=12, color='blue') | |
| plt.text(-0.1,1.15,'-------------------------- --------', fontsize=16, color='blue') | |
| plt.text(0.3,2.06,'FMF<0.4', fontsize=12, color='blue') | |
| plt.text(0.75,2.06,'FMF>0.4', fontsize=12, color='blue') | |
| plt.text(0.3,2.15,'------------ -------------------------', fontsize=16, color='blue') | |
| # _-_ is seperator between site and the rest of filename | |
| fig.savefig(output_dir + site + '_-_criteria_'+date_today.strftime("%Y%m%d")+'.png', dpi=300) | |
| #%% 2. Make Legend ############################################################################## | |
| fig2, ax2 = plt.subplots(figsize = (6,12)) | |
| plt.xlim(0,2.8) | |
| plt.ylim(0,4) | |
| plt.text(1.2,2.8,'< Aerosol Types >', fontsize=20, fontweight='bold') | |
| plt.scatter(0.8,2.5, c='gold',s=300, edgecolor='red') | |
| plt.text(1.1,2.47,'Dust', fontsize=20) | |
| plt.scatter(0.8,2.3, c='dimgray',s=300, edgecolor='red') | |
| plt.text(1.1,2.27,'Black Carbon', fontsize=20) | |
| plt.scatter(0.8,2.1, c='tan',s=300, edgecolor='red') | |
| plt.text(1.1,2.07,'Black + Brown Carbon', fontsize=20) | |
| plt.scatter(0.8,1.9, c='brown',s=300, edgecolor='red') | |
| plt.text(1.1,1.87,'Brown Carbon', fontsize=20) | |
| plt.scatter(0.8,1.7, c='hotpink',s=300, edgecolor='red') | |
| plt.text(1.1,1.67,'Non-Absorbing', fontsize=20) | |
| plt.scatter(0.8,1.5, c='skyblue',s=300, edgecolor='red') | |
| plt.text(1.1,1.47,'Uncertain', fontsize=20) | |
| plt.scatter(0.8,1.2, c='white',s=300, edgecolor='red') | |
| plt.text(1.1,1.17,'Low (AOD440 < 0.4)', fontsize=20) | |
| plt.scatter(0.73,1.0, c='red',s=300, marker=1) | |
| plt.text(1.1,0.97,'Daily Std', fontsize=20) | |
| plt.axis(False) | |
| fig2.savefig(output_dir + site + '_-_legend_'+date_today.strftime("%Y%m%d")+'.png', dpi=300) | |
| #%%3. Make Pie Chart ######################################################################################## | |
| if data_recent.count()[0] == 0 : #if data is absent, show 'no data available' | |
| fig3, ax3 = plt.subplots(figsize = (12,12)) | |
| plt.xlim(0,2) | |
| plt.ylim(0,2) | |
| plt.axis(False) | |
| plt.text(0.5,1,'No Data Available', fontsize=20) | |
| fig3.savefig(output_dir + site + '_-_piechart_'+date_today.strftime("%Y%m%d")+'.png', dpi=300) | |
| # fig3.savefig(output_dir + site + '_-_piechart.png', dpi=300) | |
| else: | |
| fig3, ax3 = plt.subplots(figsize = (12,12)) | |
| labels = ['Dust','BC','BC+BrC','Low','NA','Uncertain'] | |
| colors = ['gold','dimgray','tan','white','hotpink','skyblue'] | |
| ratio = [] | |
| l_ind = 0 | |
| for l in labels: | |
| ratio.append(len(data_recent.loc[data_recent['type']==l])) | |
| if len(data_recent.loc[data_recent['type']==l]) == 0: | |
| labels[l_ind] = '' #blank | |
| l_ind = l_ind+1 | |
| plt.pie(ratio, labels=labels, colors=colors, autopct = '%.1f%%', \ | |
| wedgeprops = {'width' : 1, 'edgecolor' : 'grey', 'linewidth' : 3}, textprops={'size' : 30, 'color' : 'black'} ) | |
| plt.title('< Recent Aerosol Types : '+data_recent.index[0].strftime("%Y%m%d")+' ~ '+data_recent.index[-1].strftime("%Y%m%d")+' >', fontsize=20, fontweight='bold') | |
| fig3.savefig(output_dir + site + '_-_piechart_'+date_today.strftime("%Y%m%d")+'.png', dpi=300) | |
| plt.close("all") | |
| # %% | |
| gen_image = PythonOperator( | |
| task_id="gen_image", | |
| python_callable=gen_image, | |
| op_kwargs={ | |
| "finep_url": Variable.get("finep_url"), | |
| "year": "{{ execution_date.year }}", | |
| "month": "{{ execution_date.month }}", | |
| "day": "{{ execution_date.day }}" | |
| }, | |
| execution_timeout = datetime.timedelta(minutes=30), | |
| retries = 5, # 최대 재시도 횟수 | |
| retry_delay = datetime.timedelta(minutes=1), # 재시도 간격 | |
| dag=dag, | |
| ) | |
| gen_image | 
125~133 라인 전체를 다음으로 대체 부탁드립니다.
data_FMF = data_fmf[['FineModeFraction_500nm[eta]']] 
if site == 'Socheongcho':
    data_AOD = data_aod[['AOD_443nm','AOD_500nm']] 
    data_AAE = data_aae[['Absorption_AOD[443nm]','Absorption_AOD[865nm]','Absorption_Angstrom_Exponent_440-870nm']]
    data_EAE = data_eae[['AOD_Extinction-Total[443nm]','AOD_Extinction-Total[865nm]']]
    data_AOD = data_AOD.rename(columns={'AOD_443nm':'AOD_440nm'})
    data_AAE = data_AAE.rename(columns={'Absorption_AOD[443nm]':'Absorption_AOD[440nm]'})
    data_AAE = data_AAE.rename(columns={'Absorption_AOD[865nm]':'Absorption_AOD[870nm]'})
    data_EAE = data_EAE.rename(columns={'AOD_Extinction-Total[443nm]':'AOD_Extinction-Total[440nm]'})
    data_EAE = data_EAE.rename(columns={'AOD_Extinction-Total[865nm]':'AOD_Extinction-Total[870nm]'})
else:
    data_AOD = data_aod[['AOD_440nm','AOD_500nm']] 
    data_AAE = data_aae[['Absorption_AOD[440nm]','Absorption_AOD[870nm]','Absorption_Angstrom_Exponent_440-870nm']]
    data_EAE = data_eae[['AOD_Extinction-Total[440nm]','AOD_Extinction-Total[870nm]']]`
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment
  
            
위 코드에서 125 라인입니다.