Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save ourway/75e8b7f970fc2a255b7389c6daa715b0 to your computer and use it in GitHub Desktop.
Save ourway/75e8b7f970fc2a255b7389c6daa715b0 to your computer and use it in GitHub Desktop.
Dialog Flow Memory Leak issue
"""
For more information regarding DialogFlow SDK see:
* https://github.com/googleapis/dialogflow-python-client-v2
* https://dialogflow-python-client-v2.readthedocs.io/en/latest/
* https://cloud.google.com/dialogflow/docs/reference/rest/v2/projects.agent.sessions/detectIntent
"""
import dialogflow_v2 as dialogflow
from base import config
from google.protobuf.json_format import MessageToDict
from bot_clients.bot_client import BotClient
from google.api_core.exceptions import ResourceExhausted, GoogleAPICallError, TooManyRequests
class DialogflowClient(BotClient):
def __init__(self, project_id, logger):
super().__init__()
self.project_id = project_id
self.session_client = dialogflow.SessionsClient()
self.logger = logger
self.is_success_response = False
self._response = None
self._messages = None
def detect_intent(self, session_id, text=None, payload=None, event=None, event_payload=None, context=None,
language_code='en-US'):
"""
:param session_id:
:param text: string
:param payload: dict
:param event: dict
:param event_payload: dict
:param context: context id
:param language_code: string
:return:
"""
if payload is None:
payload = dict()
if event_payload is None:
event_payload = dict()
session = self.session_client.session_path(self.project_id, session_id)
query_input = self._create_query_input(text, event, event_payload, language_code)
custom_payload = self._create_custom_payload(payload, context, session_id)
try:
self._response = self.session_client.detect_intent(
session=session, query_input=query_input, query_params=custom_payload)
self.is_success_response = True
if self._response.webhook_status.code > config.DIALOGFLOW_SUCCESS_CODE:
self.logger.error("Webhook %s", self._response.webhook_status)
self.logger.info("Dialogflow intent %s", self._response.query_result.intent)
self.logger.info("Webhook %s", self._response.webhook_status)
except ResourceExhausted as e:
self.logger.error("Quota Exceeded from Google %s", str(e))
except TooManyRequests as e:
self.logger.error("Too many Requests from Google %s", str(e))
except GoogleAPICallError as e:
self.logger.error('Google API error: %s', str(e))
def get_response_messages(self):
return self._get_response_messages()
def check_response_success(self):
if self._response is not None and self.is_success_response:
return self._response.webhook_status.code == config.DIALOGFLOW_SUCCESS_CODE
return False
def _create_query_input(self, text=None, event=None, event_payload=None, language_code='en-US'):
if event_payload is None:
event_payload = dict()
if text is not None:
text_input = dialogflow.types.TextInput(
text=text, language_code=language_code)
return dialogflow.types.QueryInput(
text=text_input)
elif event is not None:
parameters = dialogflow.types.struct_pb2.Struct()
for key, value in event_payload.items():
parameters[key] = value
event_input = dialogflow.types.EventInput(
name=event, parameters=parameters, language_code=language_code)
return dialogflow.types.QueryInput(
event=event_input)
def _create_custom_payload(self, params, context=None, session_id=None):
parameters = dialogflow.types.struct_pb2.Struct()
for key, value in params.items():
parameters[key] = value
if context is not None:
custom_context = [self._make_query_context(context, session_id)]
return dialogflow.types.QueryParameters(
payload=parameters, contexts=custom_context
)
return dialogflow.types.QueryParameters(
payload=parameters
)
def _make_query_context(self, context, session_id, lifespan_count=1):
return dialogflow.types.Context(
name=self._generate_context(context, session_id), lifespan_count=lifespan_count)
def get_response(self):
if self._response is not None:
return MessageToDict(self._response)
return dict()
def get_status_code(self):
if self._response is not None:
return self._response.webhook_status.code
return 1
def _get_response_messages(self):
if self._response is not None:
return self._response.query_result.fulfillment_messages
def _get_intent_name(self):
return self._response.query_result.intent.display_name
def _generate_context(self, context, session_id):
return "projects/{}/agent/sessions/{}/contexts/{}".format(self.project_id, session_id, context)
import dialogflow_v2 as dialogflow
from base import config
from google.protobuf.json_format import MessageToDict
from bot_clients.bot_client import BotClient
from google.api_core.exceptions import ResourceExhausted, GoogleAPICallError, TooManyRequests
class DialogflowClient(BotClient):
def __init__(self, project_id, session_id, logger):
super().__init__()
self.project_id = project_id
self.session_id = session_id
self.session_client = dialogflow.SessionsClient()
self.session = self.session_client.session_path(project_id, session_id)
self.logger = logger
self.is_success_response = False
self._response = None
self._messages = None
def detect_intent(self, text=None, payload=None, event=None, event_payload=None, context=None,
language_code='en-US'):
"""
:param text: string
:param payload: dict
:param event: dict
:param event_payload: dict
:param context: context id
:param language_code: string
:return:
"""
if payload is None:
payload = dict()
if event_payload is None:
event_payload = dict()
query_input = self._create_query_input(text, event, event_payload, language_code)
custom_payload = self._create_custom_payload(payload, context)
try:
self._response = self.session_client.detect_intent(
session=self.session, query_input=query_input, query_params=custom_payload)
self.is_success_response = True
if self._response.webhook_status.code > config.DIALOGFLOW_SUCCESS_CODE:
self.logger.error("Webhook %s", self._response.webhook_status)
self.logger.info("Dialogflow intent %s", self._response.query_result.intent)
self.logger.info("Webhook %s", self._response.webhook_status)
except ResourceExhausted as e:
self.logger.error("Quota Exceeded from Google %s", str(e))
except TooManyRequests as e:
self.logger.error("Too many Requests from Google %s", str(e))
except GoogleAPICallError as e:
self.logger.error('Google API error: %s', str(e))
def get_response_messages(self):
return self._get_response_messages()
def check_response_success(self):
if self._response is not None and self.is_success_response:
return self._response.webhook_status.code == config.DIALOGFLOW_SUCCESS_CODE
return False
def _create_query_input(self, text=None, event=None, event_payload=None, language_code='en-US'):
if event_payload is None:
event_payload = dict()
if text is not None:
text_input = dialogflow.types.TextInput(
text=text, language_code=language_code)
return dialogflow.types.QueryInput(
text=text_input)
elif event is not None:
parameters = dialogflow.types.struct_pb2.Struct()
for key, value in event_payload.items():
parameters[key] = value
event_input = dialogflow.types.EventInput(
name=event, parameters=parameters, language_code=language_code)
return dialogflow.types.QueryInput(
event=event_input)
def _create_custom_payload(self, params, context=None):
parameters = dialogflow.types.struct_pb2.Struct()
for key, value in params.items():
parameters[key] = value
if context is not None:
custom_context = [self._make_query_context(context)]
return dialogflow.types.QueryParameters(
payload=parameters, contexts=custom_context
)
return dialogflow.types.QueryParameters(
payload=parameters
)
def _make_query_context(self, context, lifespan_count=1):
return dialogflow.types.Context(
name=self._generate_context(context), lifespan_count=lifespan_count)
def get_response(self):
if self._response is not None:
return MessageToDict(self._response)
return dict()
def get_status_code(self):
if self._response is not None:
return self._response.webhook_status.code
return 1
def _get_response_messages(self):
if self._response is not None:
return self._response.query_result.fulfillment_messages
def _get_intent_name(self):
return self._response.query_result.intent.display_name
def _generate_context(self, context):
return "projects/{}/agent/sessions/{}/contexts/{}".format(self.project_id,
self.session_id, context)
Q: What is your country of residence?
A: Cyprus
Q: Please select your city from the options below.
A: Nicosia
Q: Please type your telephone number
A: 99123456
Q: Please choose from the below, your highest level of education.
A: Associate Degre
Q: Which from the below levels would you like to study towards?
A: Bachelor
Q: Which from the below programmes would you like to study towards?
A: BA (Hons) - Business Studies (Top-Up Degree) V2
Q: What is your gender?
A: Male
Q: your age?
A: 23
Q: Please select how many years of work experience do you have?
A: 1 - 3 years
Q: How much do you currently earn
A: 1800 plus
Q: Are you currently employed?
A: Yes
Q: Would you like to be contacted now by one of our advisors in order to assist you further with your application?
A: Yes
All is good when we see "been nice hearing from you."
class LiveChatEventHandler(ChatEventHandler):
def __init__(self, event_data, agent, redis_client, logger, dialog_flow):
super().__init__()
self.dialog_flow = dialog_flow
self.data = event_data
self.agent_id = agent.agent_id
self.agent = agent
self.redis = redis_client
self.logger = logger
def typing(self):
trace(self.logger, self.data, step=4)
self.agent.send_typing_indicator(self.get_sender_id())
def handle(self):
try:
self.typing()
except:
self.logger.error("Typing indicator Failed")
# self.start_processing_chat_thread_seq()
payload = self._create_custom_payload()
session_id = self.redis.get_session_id(self.get_sender_id())
trace(self.logger, self.data, step=5)
self.dialog_flow.detect_intent(session_id, text=self._get_text_from_incoming_event(), payload=payload)
action = self.data.get('action', "UNKNOWN")
self.logger.debug(
"livechat_debug sender:%s action:%s step:dialogflow request: %s payload: %s code: %d response: %s" % (
self.get_sender_id(),
action,
self._get_text_from_incoming_event(), json.dumps(payload),
self.dialog_flow.get_status_code(), json.dumps(self.dialog_flow.get_response())))
if self.dialog_flow.check_response_success():
trace(self.logger, self.data, step=6, payload=self.dialog_flow.get_response())
all_messages = self.dialog_flow.get_response_messages()
livechat_messages = self.create_messages_from_dialogflow_response(all_messages)
msgs = []
for m in livechat_messages:
msgs.append(m.get_message_payload())
trace(self.logger, self.data, step=7, payload=msgs)
ok, resps = self.agent.send_messages(livechat_messages)
if ok:
for r in resps:
if "response" in r:
event_id = r.get("response", {}).get("event_id", "no_event")
self.stop_processing_chat_response(event_id)
self.stop_processing_chat_thread_seq()
trace(self.logger, self.data, step=8, payload=resps)
self.provider_success()
else:
trace(self.logger, self.data, step=8, label="failed", payload=resps)
self.provider_failed()
else:
trace(self.logger, self.data, step=6)
self.dialogflow_failure()
trace(self.logger, self.data, step=7)
ok, response = self.agent.send_message(
TextMessage(config.SERVER_ERROR_MESSAGE_USER, self.get_sender_id(), self.agent_id)
)
if ok < 400:
trace(self.logger, self.data, step=8, payload=response)
self.provider_success()
else:
trace(self.logger, self.data, step=8, label="failed", payload=response)
self.provider_failed()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment