-
-
Save ourway/75e8b7f970fc2a255b7389c6daa715b0 to your computer and use it in GitHub Desktop.
Dialog Flow Memory Leak issue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| For more information regarding DialogFlow SDK see: | |
| * https://github.com/googleapis/dialogflow-python-client-v2 | |
| * https://dialogflow-python-client-v2.readthedocs.io/en/latest/ | |
| * https://cloud.google.com/dialogflow/docs/reference/rest/v2/projects.agent.sessions/detectIntent | |
| """ | |
| import dialogflow_v2 as dialogflow | |
| from base import config | |
| from google.protobuf.json_format import MessageToDict | |
| from bot_clients.bot_client import BotClient | |
| from google.api_core.exceptions import ResourceExhausted, GoogleAPICallError, TooManyRequests | |
| class DialogflowClient(BotClient): | |
| def __init__(self, project_id, logger): | |
| super().__init__() | |
| self.project_id = project_id | |
| self.session_client = dialogflow.SessionsClient() | |
| self.logger = logger | |
| self.is_success_response = False | |
| self._response = None | |
| self._messages = None | |
| def detect_intent(self, session_id, text=None, payload=None, event=None, event_payload=None, context=None, | |
| language_code='en-US'): | |
| """ | |
| :param session_id: | |
| :param text: string | |
| :param payload: dict | |
| :param event: dict | |
| :param event_payload: dict | |
| :param context: context id | |
| :param language_code: string | |
| :return: | |
| """ | |
| if payload is None: | |
| payload = dict() | |
| if event_payload is None: | |
| event_payload = dict() | |
| session = self.session_client.session_path(self.project_id, session_id) | |
| query_input = self._create_query_input(text, event, event_payload, language_code) | |
| custom_payload = self._create_custom_payload(payload, context, session_id) | |
| try: | |
| self._response = self.session_client.detect_intent( | |
| session=session, query_input=query_input, query_params=custom_payload) | |
| self.is_success_response = True | |
| if self._response.webhook_status.code > config.DIALOGFLOW_SUCCESS_CODE: | |
| self.logger.error("Webhook %s", self._response.webhook_status) | |
| self.logger.info("Dialogflow intent %s", self._response.query_result.intent) | |
| self.logger.info("Webhook %s", self._response.webhook_status) | |
| except ResourceExhausted as e: | |
| self.logger.error("Quota Exceeded from Google %s", str(e)) | |
| except TooManyRequests as e: | |
| self.logger.error("Too many Requests from Google %s", str(e)) | |
| except GoogleAPICallError as e: | |
| self.logger.error('Google API error: %s', str(e)) | |
| def get_response_messages(self): | |
| return self._get_response_messages() | |
| def check_response_success(self): | |
| if self._response is not None and self.is_success_response: | |
| return self._response.webhook_status.code == config.DIALOGFLOW_SUCCESS_CODE | |
| return False | |
| def _create_query_input(self, text=None, event=None, event_payload=None, language_code='en-US'): | |
| if event_payload is None: | |
| event_payload = dict() | |
| if text is not None: | |
| text_input = dialogflow.types.TextInput( | |
| text=text, language_code=language_code) | |
| return dialogflow.types.QueryInput( | |
| text=text_input) | |
| elif event is not None: | |
| parameters = dialogflow.types.struct_pb2.Struct() | |
| for key, value in event_payload.items(): | |
| parameters[key] = value | |
| event_input = dialogflow.types.EventInput( | |
| name=event, parameters=parameters, language_code=language_code) | |
| return dialogflow.types.QueryInput( | |
| event=event_input) | |
| def _create_custom_payload(self, params, context=None, session_id=None): | |
| parameters = dialogflow.types.struct_pb2.Struct() | |
| for key, value in params.items(): | |
| parameters[key] = value | |
| if context is not None: | |
| custom_context = [self._make_query_context(context, session_id)] | |
| return dialogflow.types.QueryParameters( | |
| payload=parameters, contexts=custom_context | |
| ) | |
| return dialogflow.types.QueryParameters( | |
| payload=parameters | |
| ) | |
| def _make_query_context(self, context, session_id, lifespan_count=1): | |
| return dialogflow.types.Context( | |
| name=self._generate_context(context, session_id), lifespan_count=lifespan_count) | |
| def get_response(self): | |
| if self._response is not None: | |
| return MessageToDict(self._response) | |
| return dict() | |
| def get_status_code(self): | |
| if self._response is not None: | |
| return self._response.webhook_status.code | |
| return 1 | |
| def _get_response_messages(self): | |
| if self._response is not None: | |
| return self._response.query_result.fulfillment_messages | |
| def _get_intent_name(self): | |
| return self._response.query_result.intent.display_name | |
| def _generate_context(self, context, session_id): | |
| return "projects/{}/agent/sessions/{}/contexts/{}".format(self.project_id, session_id, context) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import dialogflow_v2 as dialogflow | |
| from base import config | |
| from google.protobuf.json_format import MessageToDict | |
| from bot_clients.bot_client import BotClient | |
| from google.api_core.exceptions import ResourceExhausted, GoogleAPICallError, TooManyRequests | |
| class DialogflowClient(BotClient): | |
| def __init__(self, project_id, session_id, logger): | |
| super().__init__() | |
| self.project_id = project_id | |
| self.session_id = session_id | |
| self.session_client = dialogflow.SessionsClient() | |
| self.session = self.session_client.session_path(project_id, session_id) | |
| self.logger = logger | |
| self.is_success_response = False | |
| self._response = None | |
| self._messages = None | |
| def detect_intent(self, text=None, payload=None, event=None, event_payload=None, context=None, | |
| language_code='en-US'): | |
| """ | |
| :param text: string | |
| :param payload: dict | |
| :param event: dict | |
| :param event_payload: dict | |
| :param context: context id | |
| :param language_code: string | |
| :return: | |
| """ | |
| if payload is None: | |
| payload = dict() | |
| if event_payload is None: | |
| event_payload = dict() | |
| query_input = self._create_query_input(text, event, event_payload, language_code) | |
| custom_payload = self._create_custom_payload(payload, context) | |
| try: | |
| self._response = self.session_client.detect_intent( | |
| session=self.session, query_input=query_input, query_params=custom_payload) | |
| self.is_success_response = True | |
| if self._response.webhook_status.code > config.DIALOGFLOW_SUCCESS_CODE: | |
| self.logger.error("Webhook %s", self._response.webhook_status) | |
| self.logger.info("Dialogflow intent %s", self._response.query_result.intent) | |
| self.logger.info("Webhook %s", self._response.webhook_status) | |
| except ResourceExhausted as e: | |
| self.logger.error("Quota Exceeded from Google %s", str(e)) | |
| except TooManyRequests as e: | |
| self.logger.error("Too many Requests from Google %s", str(e)) | |
| except GoogleAPICallError as e: | |
| self.logger.error('Google API error: %s', str(e)) | |
| def get_response_messages(self): | |
| return self._get_response_messages() | |
| def check_response_success(self): | |
| if self._response is not None and self.is_success_response: | |
| return self._response.webhook_status.code == config.DIALOGFLOW_SUCCESS_CODE | |
| return False | |
| def _create_query_input(self, text=None, event=None, event_payload=None, language_code='en-US'): | |
| if event_payload is None: | |
| event_payload = dict() | |
| if text is not None: | |
| text_input = dialogflow.types.TextInput( | |
| text=text, language_code=language_code) | |
| return dialogflow.types.QueryInput( | |
| text=text_input) | |
| elif event is not None: | |
| parameters = dialogflow.types.struct_pb2.Struct() | |
| for key, value in event_payload.items(): | |
| parameters[key] = value | |
| event_input = dialogflow.types.EventInput( | |
| name=event, parameters=parameters, language_code=language_code) | |
| return dialogflow.types.QueryInput( | |
| event=event_input) | |
| def _create_custom_payload(self, params, context=None): | |
| parameters = dialogflow.types.struct_pb2.Struct() | |
| for key, value in params.items(): | |
| parameters[key] = value | |
| if context is not None: | |
| custom_context = [self._make_query_context(context)] | |
| return dialogflow.types.QueryParameters( | |
| payload=parameters, contexts=custom_context | |
| ) | |
| return dialogflow.types.QueryParameters( | |
| payload=parameters | |
| ) | |
| def _make_query_context(self, context, lifespan_count=1): | |
| return dialogflow.types.Context( | |
| name=self._generate_context(context), lifespan_count=lifespan_count) | |
| def get_response(self): | |
| if self._response is not None: | |
| return MessageToDict(self._response) | |
| return dict() | |
| def get_status_code(self): | |
| if self._response is not None: | |
| return self._response.webhook_status.code | |
| return 1 | |
| def _get_response_messages(self): | |
| if self._response is not None: | |
| return self._response.query_result.fulfillment_messages | |
| def _get_intent_name(self): | |
| return self._response.query_result.intent.display_name | |
| def _generate_context(self, context): | |
| return "projects/{}/agent/sessions/{}/contexts/{}".format(self.project_id, | |
| self.session_id, context) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Q: What is your country of residence? | |
| A: Cyprus | |
| Q: Please select your city from the options below. | |
| A: Nicosia | |
| Q: Please type your telephone number | |
| A: 99123456 | |
| Q: Please choose from the below, your highest level of education. | |
| A: Associate Degre | |
| Q: Which from the below levels would you like to study towards? | |
| A: Bachelor | |
| Q: Which from the below programmes would you like to study towards? | |
| A: BA (Hons) - Business Studies (Top-Up Degree) V2 | |
| Q: What is your gender? | |
| A: Male | |
| Q: your age? | |
| A: 23 | |
| Q: Please select how many years of work experience do you have? | |
| A: 1 - 3 years | |
| Q: How much do you currently earn | |
| A: 1800 plus | |
| Q: Are you currently employed? | |
| A: Yes | |
| Q: Would you like to be contacted now by one of our advisors in order to assist you further with your application? | |
| A: Yes | |
| All is good when we see "been nice hearing from you." |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class LiveChatEventHandler(ChatEventHandler): | |
| def __init__(self, event_data, agent, redis_client, logger, dialog_flow): | |
| super().__init__() | |
| self.dialog_flow = dialog_flow | |
| self.data = event_data | |
| self.agent_id = agent.agent_id | |
| self.agent = agent | |
| self.redis = redis_client | |
| self.logger = logger | |
| def typing(self): | |
| trace(self.logger, self.data, step=4) | |
| self.agent.send_typing_indicator(self.get_sender_id()) | |
| def handle(self): | |
| try: | |
| self.typing() | |
| except: | |
| self.logger.error("Typing indicator Failed") | |
| # self.start_processing_chat_thread_seq() | |
| payload = self._create_custom_payload() | |
| session_id = self.redis.get_session_id(self.get_sender_id()) | |
| trace(self.logger, self.data, step=5) | |
| self.dialog_flow.detect_intent(session_id, text=self._get_text_from_incoming_event(), payload=payload) | |
| action = self.data.get('action', "UNKNOWN") | |
| self.logger.debug( | |
| "livechat_debug sender:%s action:%s step:dialogflow request: %s payload: %s code: %d response: %s" % ( | |
| self.get_sender_id(), | |
| action, | |
| self._get_text_from_incoming_event(), json.dumps(payload), | |
| self.dialog_flow.get_status_code(), json.dumps(self.dialog_flow.get_response()))) | |
| if self.dialog_flow.check_response_success(): | |
| trace(self.logger, self.data, step=6, payload=self.dialog_flow.get_response()) | |
| all_messages = self.dialog_flow.get_response_messages() | |
| livechat_messages = self.create_messages_from_dialogflow_response(all_messages) | |
| msgs = [] | |
| for m in livechat_messages: | |
| msgs.append(m.get_message_payload()) | |
| trace(self.logger, self.data, step=7, payload=msgs) | |
| ok, resps = self.agent.send_messages(livechat_messages) | |
| if ok: | |
| for r in resps: | |
| if "response" in r: | |
| event_id = r.get("response", {}).get("event_id", "no_event") | |
| self.stop_processing_chat_response(event_id) | |
| self.stop_processing_chat_thread_seq() | |
| trace(self.logger, self.data, step=8, payload=resps) | |
| self.provider_success() | |
| else: | |
| trace(self.logger, self.data, step=8, label="failed", payload=resps) | |
| self.provider_failed() | |
| else: | |
| trace(self.logger, self.data, step=6) | |
| self.dialogflow_failure() | |
| trace(self.logger, self.data, step=7) | |
| ok, response = self.agent.send_message( | |
| TextMessage(config.SERVER_ERROR_MESSAGE_USER, self.get_sender_id(), self.agent_id) | |
| ) | |
| if ok < 400: | |
| trace(self.logger, self.data, step=8, payload=response) | |
| self.provider_success() | |
| else: | |
| trace(self.logger, self.data, step=8, label="failed", payload=response) | |
| self.provider_failed() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment