Created
October 25, 2025 05:01
-
-
Save justrajdeep/7eab1b3dc6b1fe1d56279868b95c7a20 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S /Users/rmondal/.local/bin/uv run --script | |
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = [ | |
| # "requests", | |
| # "pandas", | |
| # "colored", | |
| # "path", | |
| # ] | |
| # /// | |
| # | |
| import sys | |
| import requests | |
| import pandas as pd | |
| import tempfile | |
| import colored | |
| from datetime import datetime | |
| from path import Path | |
| import os | |
| import re | |
| import json | |
| from workflow import Workflow, ICON_WEB, ICON_ERROR | |
| sys.path.insert(0, '.') | |
| # Set to True to use cached static data, False to make API calls | |
| USE_CACHED_DATA = True | |
| # Get the system temp directory | |
| temp_dir = tempfile.gettempdir() | |
| TODAY = datetime.today().strftime('%Y-%m-%d') | |
| HOUR_MIN = datetime.today().strftime('%H_%M') | |
| TODAY_HOUR_MIN = TODAY + "__" + HOUR_MIN | |
| SCRIPT_NAME = Path(__file__).stem | |
| SCRIPT_DIR = Path(__file__).dirname() | |
| APP_LOG = os.path.abspath(os.path.expanduser(SCRIPT_DIR + f'/{SCRIPT_NAME}_{TODAY}.log')) | |
| CACHE_FILE = os.path.join(SCRIPT_DIR, 'bugs_cache.json') | |
| ### pandas default options | |
| pd.set_option('display.max_colwidth', None) | |
| ################ logger ################################# {{{ | |
| import logging | |
| import logging.config | |
| class ColorStrippingFormatter(logging.Formatter): | |
| """Formatter that strips ANSI color codes""" | |
| # ANSI color code regex pattern | |
| ANSI_COLOR_PATTERN = re.compile(r'\x1b\[[0-9;]*m') | |
| def format(self, record): | |
| # Get the original formatted message | |
| formatted = super().format(record) | |
| # Strip ANSI color codes | |
| return self.ANSI_COLOR_PATTERN.sub('', formatted) | |
| logging_config = {# {{{ | |
| 'version': 1, | |
| 'disable_existing_loggers': True, | |
| 'formatters': { | |
| 'default': { | |
| 'format': | |
| # '[%(levelname)s] %(name)s: %(message)s' | |
| '%(asctime)s %(name)-12s %(levelname)-8s [%(pathname)s:%(lineno)d] %(message)s', | |
| 'datefmt':'%m/%d/%Y %I:%M:%S %p' | |
| }, | |
| 'file_formatter': { | |
| '()': ColorStrippingFormatter, | |
| 'format': | |
| # '[%(levelname)s] %(name)s: %(message)s' | |
| '%(asctime)s %(name)-12s %(levelname)-8s [%(pathname)s:%(lineno)d] %(message)s', | |
| 'datefmt':'%m/%d/%Y %I:%M:%S %p' | |
| } | |
| }, | |
| 'handlers': { | |
| 'stdout': { | |
| 'class': 'logging.StreamHandler', | |
| 'formatter': 'default', | |
| 'stream': 'ext://sys.stdout', | |
| 'level': logging.INFO, | |
| }, | |
| 'file': { | |
| 'class': 'logging.FileHandler', | |
| # 'class': 'logging.handlers.RotatingFileHandler', | |
| 'filename': APP_LOG, | |
| 'formatter': 'file_formatter', | |
| 'level': logging.DEBUG, | |
| "mode": "w", | |
| # 'maxBytes': 1024*1024, | |
| # 'backupCount': 5, | |
| }, | |
| }, | |
| 'loggers': { | |
| '': { | |
| 'handlers': ['stdout', 'file'], | |
| # 'level': 'DEBUG', | |
| 'level': logging.INFO, | |
| 'propagate': False | |
| }, | |
| } | |
| }# }}} | |
| logging.config.dictConfig(logging_config) | |
| logger = logging.getLogger(__name__) | |
| ################## }}} | |
| def save_bugs_cache(df):# {{{ | |
| """Save bugs dataframe to JSON cache file""" | |
| try: | |
| cache_data = df.to_dict('records') | |
| with open(CACHE_FILE, 'w') as f: | |
| json.dump(cache_data, f, indent=2) | |
| logger.info(f"Saved {len(cache_data)} bugs to cache: {CACHE_FILE}") | |
| except Exception as e: | |
| logger.error(f"Failed to save cache: {e}") | |
| # }}} | |
| def load_bugs_cache():# {{{ | |
| """Load bugs dataframe from JSON cache file""" | |
| try: | |
| if not os.path.exists(CACHE_FILE): | |
| logger.warning(f"Cache file not found: {CACHE_FILE}") | |
| return None | |
| with open(CACHE_FILE, 'r') as f: | |
| cache_data = json.load(f) | |
| df = pd.DataFrame(cache_data) | |
| logger.info(f"Loaded {len(df)} bugs from cache: {CACHE_FILE}") | |
| return df | |
| except Exception as e: | |
| logger.error(f"Failed to load cache: {e}") | |
| return None | |
| # }}} | |
| def get_request_df(_api, _session, _limit = None, _return_df = True):# {{{ | |
| _result_entries = [] | |
| _fetched_all = False | |
| _page = 1 | |
| ''' | |
| page 0 returns the details of the data | |
| {'EndIndex': -1, | |
| 'ErrorMessage': '', | |
| 'IsSuccess': True, | |
| 'ReturnValue': [], | |
| 'StartIndex': -50, | |
| 'TotalCount': 118} | |
| ''' | |
| while(not _fetched_all): | |
| _url = '%s/%s' % (_server,_api) | |
| # pprint(_url) | |
| _add_these_to_url = [] | |
| _add_these_to_url.append('&page=%s' % _page) | |
| if _limit: | |
| _add_these_to_url.append('&limit=%s' % _limit) | |
| if _add_these_to_url: | |
| _url += '%s' % ''.join(_add_these_to_url) | |
| logger.info("request %s" % _url) | |
| try: | |
| _response = _session.get(_url, headers=_headers) | |
| except: | |
| raise Exception("bugs api call failed") | |
| # pprint("v"*80) | |
| # pprint(f"Page: {_page}") | |
| # pprint(_response.json()) | |
| # pprint("^"*80) | |
| # if not _response.ok: | |
| # pprint(_response.text) | |
| # raise Exception | |
| _json = _response.json() | |
| assert _json.get('IsSuccess'), f"'IsSuccess' value is #{_json.get('IsSuccess')}" | |
| _values = _json['ReturnValue'] | |
| if isinstance(_values, list): | |
| _result_entries += _values | |
| else: | |
| _result_entries += [ _values ] | |
| _start = _json['StartIndex'] | |
| _end = _json['EndIndex'] | |
| _total = _json['TotalCount'] | |
| # carp.carp(f"EndIndex: {_end}") | |
| # carp.carp(f"TotalCount: {_total}") | |
| _page = _page + 1 | |
| ## FIXME | |
| # if _page > 2: | |
| # break | |
| if _total > ( _end + 1 ): | |
| _fetched_all = False | |
| else: | |
| _fetched_all = True | |
| ### parse data | |
| if _return_df: | |
| _results = pd.DataFrame(_result_entries).dropna(axis=1) | |
| if(_results.empty): | |
| logger.error(f"no data in {_url}") | |
| sys.exit(1) | |
| else: | |
| _results = _result_entries | |
| return _results | |
| # }}} | |
| def main(wf): | |
| # Get query from Alfred | |
| if len(wf.args): | |
| query = wf.args[0] | |
| else: | |
| query = None | |
| logger.warning("-"*120) | |
| logger.warning(colored.stylize('logFile:', colored.attr('bold') + colored.fg('cyan'))) | |
| logger.warning(colored.stylize(APP_LOG, colored.fg('red'))) | |
| global sessions | |
| # Check if we should use cached data | |
| if USE_CACHED_DATA: | |
| logger.info("Using cached data mode...") | |
| df_bugs = load_bugs_cache() | |
| if df_bugs is None: | |
| logger.error("No cache available and USE_CACHED_DATA=True. Set USE_CACHED_DATA=False to fetch from API.") | |
| return | |
| else: | |
| pass | |
| # Print available columns for debugging | |
| logger.info(f"Available columns: {df_bugs.columns.tolist()}") | |
| # Loop through all bugs and print BugId, Synopsis, and ActionReqByFullName | |
| logger.info("\n" + "="*120) | |
| for index, row in df_bugs.iterrows(): | |
| try: | |
| logger.info(f"Adding item: BugId={row['BugId']}, Synopsis={row['Synopsis']}, ARB={row['ActionReqByFullName']}") | |
| wf.add_item( | |
| title=f"{row['BugId']} - {row['Synopsis']}", | |
| subtitle=f"ARB: {row['ActionReqByFullName']}", | |
| arg=f"{row['BugId']}", | |
| valid=True, | |
| icon=ICON_WEB | |
| ) | |
| except: | |
| logger.error(f"Failed to add item for row", exc_info=True) | |
| wf.add_item(title='Error adding item', | |
| subtitle='Please try again.', | |
| icon=ICON_ERROR) | |
| wf.send_feedback() | |
| if __name__ == "__main__": | |
| wf = Workflow() | |
| sys.exit(wf.run(main)) | |
| # vim: filetype=python:syntax=python:ts=4:tw=0:sw=4:sts=4:expandtab:norl:foldmethod=marker: |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment