Skip to content

Instantly share code, notes, and snippets.

@zshanabek
Created April 10, 2023 17:28
Show Gist options
  • Save zshanabek/cabe2ec953ebd01998cba3cba0b40797 to your computer and use it in GitHub Desktop.
Save zshanabek/cabe2ec953ebd01998cba3cba0b40797 to your computer and use it in GitHub Desktop.
ok.py
import ast
import constants
from database import SQLite
from amocrm.v2.entity.contact import Contact as _Contact
import re
from amocrm.v2.entity.lead import Lead as _Lead
from amocrm.v2.entity.task import Task
from amocrm.v2.entity import custom_field
from datetime import datetime, timedelta
import pytz
from time import sleep
from amocrm.v2 import tokens
db = SQLite(database=constants.database_dir)
class Contact(_Contact):
phone = custom_field.ContactPhoneField("Раб.тел", enum_code="WORK")
email = custom_field.ContactEmailField("Email раб.", enum_code="WORK")
def get_select_options(select_field_id):
materials = db.get_select_values(select_field_id)
lst = []
for i in range(0, len(materials)):
lst.append({'value': materials[i][1], 'id': materials[i][0]})
return lst
def get_material_by_name(name):
material = db.get_select_value_by_name(name)
return custom_field.SelectValue(id=material[0], value=material[1])
class Lead(_Lead):
metal = custom_field.SelectCustomField("Метал", field_id=1062127, enums=get_select_options(1062127))
product = custom_field.SelectCustomField("Изделие", field_id=1062129, enums=get_select_options(1062129))
bonus = custom_field.SelectCustomField('Какой бонус Вы желаете получить до конца недели?', field_id=1071591,enums=get_select_options(1071591))
question_time = custom_field.SelectCustomField('Как срочно стоит вопрос', field_id=1071589,enums=get_select_options(1071589))
event = custom_field.SelectCustomField('На какое событие Вам нужны ювелирные изделия?', field_id=1071587,enums=get_select_options(1071587))
def get_digits(phone_number_string):
digits = re.findall(r"\d+", phone_number_string)
return "".join(digits)
def get_custom_leads(_dict):
custom_leads_dict = {}
for k in _dict:
# 'utm' and custom_lead
if k[:3] == "utm":
custom_leads_dict[k] = _dict[k][0]
if k[:11] == "custom_lead":
custom_leads_dict[k[12:]] = _dict[k]
return custom_leads_dict
def set_custom_leads(_dict):
for s in _dict:
setattr(Lead, s, custom_field.TextCustomField(s))
tz = pytz.timezone("Asia/Almaty")
tokens.default_token_manager(
client_id=constants.client_id,
client_secret=constants.client_secret,
subdomain=constants.subdomain,
redirect_url=constants.redirect_url,
storage=tokens.FileTokensStorage(
directory_path=constants.amocrm_tokens_storage_dir
),
)
tokens.default_token_manager.init(code=constants.code, skip_error=True)
def fill_custom_lead(_object, _dict):
print(_dict)
for k, v in _dict.items():
setattr(_object, k, v)
return _object
def get_new_manager(db, group_ids):
"""
:param db:
:param group_ids: 1,2,3
:return: manager_id
"""
if night_manager := db.get_night_manager():
manager_id = night_manager
else:
manager_id = db.get_last_manager(group_ids)
if manager_id:
db.update_last_manager(manager_id)
return int(manager_id) if manager_id else constants.default_manager_id
def create_new_lead(
leadname,
responsible_user_id,
price,
tags,
status_id,
pipeline_id,
custom_leads,
metal_type,
product_type,
bonus_type,
event_type,
question_time_type,
contact,
):
new_lead = Lead(
name=leadname,
responsible_user=responsible_user_id,
price=price,
)
for tag in tags:
tag = tag.strip()
new_lead.tags.add(tag)
new_lead.status = status_id
new_lead.pipeline = pipeline_id
new_lead = fill_custom_lead(new_lead, custom_leads)
if metal_type:
new_lead.metal = get_material_by_name(metal_type)
if product_type:
new_lead.product = get_material_by_name(product_type)
if bonus_type:
new_lead.bonus = get_material_by_name(bonus_type)
if event_type:
new_lead.event = get_material_by_name(event_type)
if question_time_type:
new_lead.question_time = get_material_by_name(question_time_type)
new_lead.save()
new_lead.contacts.add(contact)
new_lead.save()
return new_lead
def runner():
while True:
try:
print(datetime.now(tz=tz))
db = SQLite(database=constants.database_dir)
c = None
data = db.get_pending_request()
if not data:
sleep(3)
db.close()
continue
request_id = data[0]
dict_data = ast.literal_eval(data[1])
try:
if "phone_number" not in dict_data:
db.update_error_request(
id=request_id, error="no phone_number", retries=99
)
db.close()
continue
except TypeError:
db.update_error_request(
id=request_id, error="not correct data", retries=99
)
db.close()
continue
# task
task_text = constants.task_text
# get lead fields
leadname = dict_data.get("leadname")[0]
tags = (
dict_data.get("tags")[0].split(",") if dict_data.get("tags")[0] else ""
)
status_id = int(dict_data.get("status_id")[0])
pipeline_id = int(dict_data.get("pipeline_id")[0])
intr_group = dict_data.get("intr_group")[0]
price = (
0
if not (dict_data.get("price") and dict_data.get("price")[0])
else dict_data.get("price")[0]
)
# get contact fields
name = dict_data.get("name")[0]
phone_number = get_digits(dict_data.get("phone_number")[0])
metal_type = "" if not dict_data.get("metal") else dict_data.get("metal")[0]
product_type = "" if not dict_data.get("product") else dict_data.get("product")[0]
bonus_type = "" if not dict_data.get("podarok") else dict_data.get("podarok")[0]
event_type = "" if not dict_data.get("sobytie") else dict_data.get("sobytie")[0]
question_time_type = "" if not dict_data.get("srochno") else dict_data.get("srochno")[0]
email = "" if not dict_data.get("email") else dict_data.get("email")[0]
custom_leads = get_custom_leads(_dict=dict_data)
set_custom_leads(custom_leads)
failed_lead = 143 # Закрыто и не реализовано
success_lead = 142 # Успешно реализовано
contact_id = None
try:
phone_last_10 = phone_number.strip()[-10:]
c = Contact.objects.get(query=phone_last_10)
contact_id = c.id
leads = list(c.leads)
except StopIteration:
leads = None
if leads:
lead = None
for _lead in leads:
temp_id = _lead.pipeline.id
if temp_id == pipeline_id:
lead = _lead
if not lead:
print("No Lead")
responsible_user_id = get_new_manager(db=db, group_ids=intr_group)
new_lead = create_new_lead(
leadname=leadname,
responsible_user_id=responsible_user_id,
price=price,
status_id=status_id,
pipeline_id=pipeline_id,
custom_leads=custom_leads,
contact=c,
tags=tags,
metal_type=metal_type,
product_type=product_type,
bonus_type=bonus_type,
event_type=event_type,
question_time_type=question_time_type,
)
db.update_success_request(
request_id,
"Lead not found in current pipeline, new lead is created "
"lead_id: {} contact_id: {}".format(new_lead.id, c.id),
)
db.close()
sleep(1.5)
continue
lead_status = lead.status.id
if lead_status == failed_lead:
print("failed lead")
responsible_user_id = get_new_manager(db=db, group_ids=intr_group)
new_lead = create_new_lead(
leadname=leadname,
responsible_user_id=responsible_user_id,
price=price,
status_id=status_id,
pipeline_id=pipeline_id,
custom_leads=custom_leads,
contact=c,
tags=tags,
metal_type=metal_type,
product_type=product_type,
bonus_type=bonus_type,
event_type=event_type,
question_time_type=question_time_type,
)
db.update_success_request(
request_id,
"Lead was in Failed state, new lead is created"
"lead_id: {} contact_id: {}".format(new_lead.id, c.id),
)
db.close()
sleep(1)
elif lead_status == success_lead:
print("lead is success realized {}".format(lead.id))
task_deadline_unix = int(
(datetime.now(tz=tz) + timedelta(hours=24)).timestamp()
)
Task.objects.create(
entity_id=lead.id,
entity_type="leads",
responsible_user_id=lead.responsible_user.id,
text=task_text,
complete_till=task_deadline_unix,
)
for tag in tags:
tag = tag.strip()
lead.tags.append(tag)
lead.save()
db.update_success_request(
request_id,
"Success Realized Lead: Task Created "
"lead_id: {} contact_id: {}".format(lead.id, contact_id),
)
db.close()
sleep(1)
else:
print("lead exists: Check for double")
if status_id != lead.status.id:
closest_task = lead.closest_task_at
if closest_task is None or closest_task < datetime.utcnow():
task_deadline_unix = int(
(datetime.now(tz=tz) + timedelta(hours=24)).timestamp()
)
# AmoCRM returns with utc format 0 tz
Task.objects.create(
entity_id=lead.id,
entity_type="leads",
responsible_user_id=lead.responsible_user.id,
text=task_text,
complete_till=task_deadline_unix,
)
db.update_success_request(
request_id,
"Task Created "
"lead_id: {} contact_id: {}".format(
lead.id, contact_id
),
)
db.update_success_request(
request_id,
"Lead exist, ignore "
"lead_id: {} contact_id: {}".format(lead.id, contact_id),
)
else:
db.update_success_request(
request_id,
"Contact Double "
"lead_id: {} contact_id: {}".format(lead.id, contact_id),
)
print("ДУБЛЬ")
lead.save()
db.close()
else: # No lead found
print("lead or contact not found:", phone_number)
# get and update new manager
# create new lead
responsible_user_id = get_new_manager(db=db, group_ids=intr_group)
if not c: # if no contacts
# NO CONTACTS
new_contact = Contact(
name=name, responsible_user=responsible_user_id
)
for tag in tags:
tag = tag.strip()
new_contact.tags.append(tag)
new_contact.phone = phone_number
new_contact.email = email
new_contact.save()
c = new_contact
contact_id = c.id
print("contact created 148")
new_lead = create_new_lead(
leadname=leadname,
responsible_user_id=responsible_user_id,
price=price,
status_id=status_id,
pipeline_id=pipeline_id,
custom_leads=custom_leads,
contact=c,
tags=tags,
metal_type=metal_type,
product_type=product_type,
bonus_type=bonus_type,
event_type=event_type,
question_time_type=question_time_type,
)
print("lead created 160")
db.update_success_request(
request_id,
"Lead Created and attached to contact "
"lead_id: {} contact_id: {}".format(new_lead.id, contact_id),
)
db.close()
sleep(1.5)
except Exception as e:
print(e)
error = str(e).replace("'", '"')
db.update_error_request(
id=request_id, error="Uncaught exception {}".format(error)
)
db.close()
sleep(0.6)
if __name__ == "__main__":
print("Worker started at:", datetime.now(tz=tz))
runner()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment