Last active
April 28, 2021 12:52
-
-
Save helena-intel/f862a27e29398f5961fd663d43bd7e6c to your computer and use it in GitHub Desktop.
204-vision-worker-safety
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"cells": [{"cell_type": "markdown", "metadata": {}, "source": "<a id=\"top\"></a>\n# Safety Gear Detection Sample Application"}, {"id": "0369b1cb", "cell_type": "markdown", "source": "## Preparation\n\nInstall the requirements and download the files that are necessary for running this notebook.\n\n**NOTE:** installation may take a while. It is recommended to restart the Jupyter kernel after installing the packages. Choose *Kernel->Restart Kernel* in Jupyter Notebook or Lab, or *Runtime->Restart runtime* in Google Colab.", "metadata": {}}, {"id": "2696281f", "cell_type": "code", "metadata": {}, "execution_count": null, "source": "# Install or upgrade required Python packages. Install specific versions of some packages to ensure compatibility.\n!pip install openvino-dev matplotlib opencv-python-headless==4.2.0.32 numpy==1.17.3 ipython", "outputs": []}, {"id": "3a79c6ad", "cell_type": "code", "metadata": {}, "execution_count": null, "source": "# Download image and model files\nimport os\nimport pip\nimport urllib.parse\nimport urllib.request\nfrom pathlib import Path\n\nurls = ['https://raw.githubusercontent.com/helena-intel/openvino_notebooks/safety_gear/notebooks/204-vision-worker-safety/labels.txt', 'https://raw.githubusercontent.com/helena-intel/openvino_notebooks/safety_gear/notebooks/204-vision-worker-safety/models/mobilenet-ssd.bin', 'https://raw.githubusercontent.com/helena-intel/openvino_notebooks/safety_gear/notebooks/204-vision-worker-safety/models/mobilenet-ssd.xml', 'https://raw.githubusercontent.com/helena-intel/openvino_notebooks/safety_gear/notebooks/204-vision-worker-safety/media/worker_zone_detection_small.mp4', 'https://raw.githubusercontent.com/helena-intel/openvino_notebooks/safety_gear/notebooks/204-vision-worker-safety/media/safety-gear-image.jpg']\n\nfor url in urls:\n save_path = Path(url).relative_to(fr\"https:/raw.githubusercontent.com/helena-intel/openvino_notebooks/safety_gear/notebooks/204-vision-worker-safety\")\n os.makedirs(save_path.parent, exist_ok=True)\n safe_url = urllib.parse.quote(url, safe=\":/\")\n\n urllib.request.urlretrieve(safe_url, save_path.as_posix())", "outputs": []}, {"cell_type": "markdown", "metadata": {}, "source": "## Introduction\n\nThis sample application demonstrates how a smart video IoT solution may be created using Intel\u00ae hardware and software tools to perform safety gear detection. This solution detects any number of objects within a video frame looking specifically for people, safety vests, and hardhats. "}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "import colorsys\nimport os\nimport random\nimport time\nimport urllib\nfrom pathlib import Path\n\nimport cv2\nimport matplotlib.pyplot as plt\nimport numpy as np\nfrom IPython.display import (\n HTML,\n FileLink,\n Pretty,\n ProgressBar,\n Video,\n clear_output,\n display,\n)\nfrom openvino.inference_engine import IECore"}, {"cell_type": "markdown", "metadata": {"id": "contained-office"}, "source": "### Settings"}, {"cell_type": "code", "execution_count": null, "metadata": {"id": "amber-lithuania", "tags": []}, "outputs": [], "source": "DEVICE = \"CPU\"\nMODEL_FILE = \"models/mobilenet-ssd.xml\"\nMODEL_FILE_PERSON = \"models/person-detection-retail-0013.xml\"\nLABELS_FILE = \"labels.txt\"\nmodel_name = os.path.basename(MODEL_FILE)\nmodel_name_person = os.path.basename(MODEL_FILE_PERSON)\nmodel_xml_path = Path(MODEL_FILE).with_suffix(\".xml\")\nmodel_xml_path_person = Path(MODEL_FILE_PERSON).with_suffix(\".xml\")"}, {"cell_type": "markdown", "metadata": {}, "source": "### Functions"}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "def load_image(path: str):\n \"\"\"\n Loads an image from `path` and returns it as BGR numpy array. `path`\n should point to an image file, either a local filename or an url.\n \"\"\"\n if path.startswith(\"http\"):\n # Set User-Agent to Mozilla because some websites block\n # requests with User-Agent Python\n request = urllib.request.Request(\n path, headers={\"User-Agent\": \"Mozilla/5.0\"}\n )\n response = urllib.request.urlopen(request)\n array = np.asarray(bytearray(response.read()), dtype=\"uint8\")\n image = cv2.imdecode(array, -1) # Loads the image as BGR\n else:\n image = cv2.imread(path)\n return image"}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "class ColorPalette:\n def __init__(self, n, rng=None):\n assert n > 0\n\n if rng is None:\n rng = random.Random(0xACE)\n\n candidates_num = 100\n hsv_colors = [(1.0, 1.0, 1.0)]\n for _ in range(1, n):\n colors_candidates = [\n (rng.random(), rng.uniform(0.8, 1.0), rng.uniform(0.5, 1.0))\n for _ in range(candidates_num)\n ]\n min_distances = [\n self.min_distance(hsv_colors, c) for c in colors_candidates\n ]\n arg_max = np.argmax(min_distances)\n hsv_colors.append(colors_candidates[arg_max])\n\n self.palette = [self.hsv2rgb(*hsv) for hsv in hsv_colors]\n\n @staticmethod\n def dist(c1, c2):\n dh = min(abs(c1[0] - c2[0]), 1 - abs(c1[0] - c2[0])) * 2\n ds = abs(c1[1] - c2[1])\n dv = abs(c1[2] - c2[2])\n return dh * dh + ds * ds + dv * dv\n\n @classmethod\n def min_distance(cls, colors_set, color_candidate):\n distances = [cls.dist(o, color_candidate) for o in colors_set]\n return np.min(distances)\n\n @staticmethod\n def hsv2rgb(h, s, v):\n return tuple(round(c * 255) for c in colorsys.hsv_to_rgb(h, s, v))\n\n def __getitem__(self, n):\n return self.palette[n % len(self.palette)]\n\n def __len__(self):\n return len(self.palette)"}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "def convert_result_to_image(resized_image, result, labeldict):\n inf_results = result[0][0]\n colors = ((255, 0, 0), (0, 255, 0), (0, 0, 255), (0, 0, 255))\n\n resized_image_rgb = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)\n\n for number, proposal in enumerate(inf_results):\n if proposal[2] > 0.5:\n ih, iw = resized_image.shape[:-1]\n label = np.int(proposal[1])\n labelname = labeldict[label]\n\n xmin = np.int(iw * proposal[3])\n ymin = max(10, np.int(ih * proposal[4]))\n xmax = np.int(iw * proposal[5])\n ymax = np.int(ih * proposal[6])\n\n resized_image_rgb = cv2.rectangle(\n resized_image_rgb,\n (xmin, ymin),\n (xmax, ymax),\n colors[label - 1],\n 3,\n )\n cv2.putText(\n resized_image_rgb,\n f\"{labelname} {proposal[2]:.2f}\",\n (xmin, ymin - 10),\n cv2.FONT_HERSHEY_SIMPLEX,\n 0.8,\n colors[label - 1],\n 1,\n cv2.LINE_AA,\n )\n\n result_image_rgb = cv2.resize(resized_image_rgb, (image.shape[:2][::-1]))\n return result_image_rgb"}, {"cell_type": "markdown", "metadata": {"id": "sensitive-wagner"}, "source": "## Load model and get model information\n\nLoad the model in Inference Engine with `ie.read_network` and load it to the specified device with `ie.load_network`"}, {"cell_type": "code", "execution_count": null, "metadata": {"id": "complete-brother", "tags": []}, "outputs": [], "source": "ie = IECore()\nnet = ie.read_network(\n str(model_xml_path),\n str(model_xml_path.with_suffix(\".bin\")),\n)\n\nexec_net = ie.load_network(network=net, device_name=DEVICE)\n\ninput_key = list(exec_net.input_info)[0]\noutput_key = list(exec_net.outputs.keys())[0]\n\nnetwork_input_shape = exec_net.input_info[input_key].tensor_desc.dims\n(network_image_height, network_image_width) = network_input_shape[2:]"}, {"cell_type": "markdown", "metadata": {}, "source": "## Safety Gear Detection on a Single Image"}, {"cell_type": "code", "execution_count": null, "metadata": {"colab": {"base_uri": "https://localhost:8080/"}, "id": "central-psychology", "outputId": "d864ee96-3fbd-488d-da1a-88e730f34aad", "tags": []}, "outputs": [], "source": "image = load_image(\"media/safety-gear-image.jpg\")\n# resize to input shape for network\nresized_image = cv2.resize(image, (network_image_width, network_image_height))\n\n# reshape image to network input shape NCHW\ninput_image = np.expand_dims(np.transpose(resized_image, (2, 0, 1)), 0)\nplt.imshow(image[:, :, (2, 1, 0)])"}, {"cell_type": "markdown", "metadata": {"id": "taken-spanking"}, "source": "### Do inference on image\n\nDo the inference, convert the result to an image, and resize it to the original image shape"}, {"cell_type": "code", "execution_count": null, "metadata": {"id": "banner-kruger", "tags": []}, "outputs": [], "source": "result = exec_net.infer(inputs={input_key: input_image})[output_key]"}, {"cell_type": "markdown", "metadata": {}, "source": "### Display result"}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "labels = open(LABELS_FILE).read().splitlines()\nlabeldict = {i + 1: labelname for i, labelname in enumerate(labels)}\n\nresult_image_rgb = convert_result_to_image(image, result, labeldict)\nplt.figure(figsize=(12, 6))\nplt.imshow(result_image_rgb)"}, {"cell_type": "markdown", "metadata": {}, "source": "## Safety Gear Detection on Video"}, {"cell_type": "code", "execution_count": null, "metadata": {"colab": {"base_uri": "https://localhost:8080/"}, "id": "terminal-dividend", "outputId": "87f5ada0-8caf-49c3-fe54-626e2b1967f3", "tags": []}, "outputs": [], "source": "VIDEO_FILE = \"media/Safety_Full_Hat_and_Vest.mp4\"\n# worker_zone video source: https://github.com/intel-iot-devkit/sample-videos\nVIDEO_FILE = \"media/worker_zone_detection_small.mp4\"\n# Number of video frames to process. Set to 0 to process all frames.\nNUM_FRAMES = 240\n# Scale the output video sides with a factor of SCALE_OUTPUT\n# If the original video has a resolution of 1920x1080, a factor\n# of 0.5 results in an output video of 960x540\n# Set to 1 to keep the original resolution\nSCALE_OUTPUT = 0.5\n# Create Path objects for the input video and the resulting video\nvideo_path = Path(VIDEO_FILE)\nresult_video_path = video_path.with_name(f\"{video_path.stem}_result.mp4\")"}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "cap = cv2.VideoCapture(str(video_path))\nret, image = cap.read()\nif not ret:\n raise ValueError(f\"The video at {video_path} cannot be read.\")\nFPS = cap.get(cv2.CAP_PROP_FPS)\ninput_frame_height, input_frame_width = image.shape[:2]\n# The format to use for video encoding. VP90 is slow,\n# but it works on most systems.\n# Try the THEO encoding if you have FFMPEG installed.\n# FOURCC = cv2.VideoWriter_fourcc(*\"VP90\")\nFOURCC = cv2.VideoWriter_fourcc(*\"vp09\")\n\ncap.release()\nprint(\n f\"The input video has a frame width of {input_frame_width}, \"\n f\"frame height of {input_frame_height} and runs at {FPS} fps\"\n)"}, {"cell_type": "markdown", "metadata": {}, "source": "### Inference loop"}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "frame_nr = 1\ntarget_frame_width = int(input_frame_width * SCALE_OUTPUT)\ntarget_frame_height = int(input_frame_height * SCALE_OUTPUT)\nstart_time = time.perf_counter()\ntotal_inference_duration = 0\n\ncap = cv2.VideoCapture(str(video_path))\nout_video = cv2.VideoWriter(\n str(result_video_path),\n FOURCC,\n FPS,\n (target_frame_width, target_frame_height),\n)\n\ntotal_frames = (\n cap.get(cv2.CAP_PROP_FRAME_COUNT) if NUM_FRAMES == 0 else NUM_FRAMES\n)\nprogress_bar = ProgressBar(total=total_frames)\nprogress_bar.display()\n\ntry:\n while cap.isOpened():\n ret, image = cap.read()\n if not ret:\n cap.release()\n break\n\n if frame_nr == total_frames:\n break\n\n # Prepare frame for inference\n # resize to input shape for network\n resized_image = cv2.resize(\n image, (network_image_height, network_image_width)\n )\n # reshape image to network input shape NCHW\n input_image = np.expand_dims(np.transpose(resized_image, (2, 0, 1)), 0)\n\n # Do inference\n inference_start_time = time.perf_counter()\n result = exec_net.infer(inputs={input_key: input_image})[output_key]\n inference_stop_time = time.perf_counter()\n inference_duration = inference_stop_time - inference_start_time\n total_inference_duration += inference_duration\n\n if frame_nr % 10 == 0:\n clear_output(wait=True)\n progress_bar.display()\n display(\n Pretty(\n f\"Processed frame {frame_nr}. \"\n f\"Inference time: {inference_duration:.2f} seconds \"\n f\"({1/inference_duration:.2f} FPS)\"\n )\n )\n\n # Transform network result to RGB image\n result_frame = convert_result_to_image(image, result, labeldict)[\n :, :, (2, 1, 0)\n ]\n # Resize to original image shape\n result_frame = cv2.resize(\n result_frame, (target_frame_width, target_frame_height)\n )\n # Save frame to video\n out_video.write(result_frame)\n\n frame_nr = frame_nr + 1\n progress_bar.progress = frame_nr\n progress_bar.update()\n\nexcept KeyboardInterrupt:\n print(\"Processing interrupted.\")\nfinally:\n out_video.release()\n cap.release()\n end_time = time.perf_counter()\n duration = end_time - start_time\n clear_output()\n print(f\"Safety Gear Detection Video saved to '{str(result_video_path)}'.\")\n print(\n f\"Processed {frame_nr} frames in {duration:.2f} seconds. \"\n f\"Total FPS (including video processing): {frame_nr/duration:.2f}.\"\n f\"Inference FPS: {frame_nr/total_inference_duration:.2f} \"\n )"}, {"cell_type": "markdown", "metadata": {"execution": {"iopub.execute_input": "2021-04-16T13:38:56.065237Z", "iopub.status.busy": "2021-04-16T13:38:56.065237Z", "iopub.status.idle": "2021-04-16T13:38:56.085468Z", "shell.execute_reply": "2021-04-16T13:38:56.085468Z", "shell.execute_reply.started": "2021-04-16T13:38:56.065237Z"}}, "source": "### Display or download video with results"}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "# TODO: embed=True doesn't work well for large videos\nvideo = Video(result_video_path, width=800, embed=True)\nif not result_video_path.exists():\n plt.imshow(result_frame)\n raise ValueError(\n \"OpenCV was unable to write the video file. Showing one video frame.\"\n )\nelse:\n print(\n \"Showing Safety Gear Detection video saved at\\n\"\n f\"{result_video_path.resolve()}\"\n )\n print(\n \"If you cannot see the video in your browser, please click on the \"\n \"following link to download the video \"\n )\n video_link = FileLink(result_video_path)\n video_link.html_link_str = \"<a href='%s' download>%s</a>\"\n display(HTML(video_link._repr_html_()))\n display(video)"}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ""}], "metadata": {"kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"}, "language_info": {"codemirror_mode": {"name": "ipython", "version": 3}, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8"}, "nbTranslate": {"displayLangs": ["*"], "hotkey": "alt-t", "langInMainMenu": true, "sourceLang": "en", "targetLang": "fr", "useGoogleTranslate": true}, "toc": {"base_numbering": 1, "nav_menu": {}, "number_sections": true, "sideBar": true, "skip_h1_title": true, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": {"height": "calc(100% - 180px)", "left": "10px", "top": "150px", "width": "251.4px"}, "toc_section_display": true, "toc_window_display": true}}, "nbformat": 4, "nbformat_minor": 5} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
libpython3.7-dev |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment