Created
April 10, 2023 17:28
-
-
Save zshanabek/cabe2ec953ebd01998cba3cba0b40797 to your computer and use it in GitHub Desktop.
ok.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ast | |
import constants | |
from database import SQLite | |
from amocrm.v2.entity.contact import Contact as _Contact | |
import re | |
from amocrm.v2.entity.lead import Lead as _Lead | |
from amocrm.v2.entity.task import Task | |
from amocrm.v2.entity import custom_field | |
from datetime import datetime, timedelta | |
import pytz | |
from time import sleep | |
from amocrm.v2 import tokens | |
db = SQLite(database=constants.database_dir) | |
class Contact(_Contact): | |
phone = custom_field.ContactPhoneField("Раб.тел", enum_code="WORK") | |
email = custom_field.ContactEmailField("Email раб.", enum_code="WORK") | |
def get_select_options(select_field_id): | |
materials = db.get_select_values(select_field_id) | |
lst = [] | |
for i in range(0, len(materials)): | |
lst.append({'value': materials[i][1], 'id': materials[i][0]}) | |
return lst | |
def get_material_by_name(name): | |
material = db.get_select_value_by_name(name) | |
return custom_field.SelectValue(id=material[0], value=material[1]) | |
class Lead(_Lead): | |
metal = custom_field.SelectCustomField("Метал", field_id=1062127, enums=get_select_options(1062127)) | |
product = custom_field.SelectCustomField("Изделие", field_id=1062129, enums=get_select_options(1062129)) | |
bonus = custom_field.SelectCustomField('Какой бонус Вы желаете получить до конца недели?', field_id=1071591,enums=get_select_options(1071591)) | |
question_time = custom_field.SelectCustomField('Как срочно стоит вопрос', field_id=1071589,enums=get_select_options(1071589)) | |
event = custom_field.SelectCustomField('На какое событие Вам нужны ювелирные изделия?', field_id=1071587,enums=get_select_options(1071587)) | |
def get_digits(phone_number_string): | |
digits = re.findall(r"\d+", phone_number_string) | |
return "".join(digits) | |
def get_custom_leads(_dict): | |
custom_leads_dict = {} | |
for k in _dict: | |
# 'utm' and custom_lead | |
if k[:3] == "utm": | |
custom_leads_dict[k] = _dict[k][0] | |
if k[:11] == "custom_lead": | |
custom_leads_dict[k[12:]] = _dict[k] | |
return custom_leads_dict | |
def set_custom_leads(_dict): | |
for s in _dict: | |
setattr(Lead, s, custom_field.TextCustomField(s)) | |
tz = pytz.timezone("Asia/Almaty") | |
tokens.default_token_manager( | |
client_id=constants.client_id, | |
client_secret=constants.client_secret, | |
subdomain=constants.subdomain, | |
redirect_url=constants.redirect_url, | |
storage=tokens.FileTokensStorage( | |
directory_path=constants.amocrm_tokens_storage_dir | |
), | |
) | |
tokens.default_token_manager.init(code=constants.code, skip_error=True) | |
def fill_custom_lead(_object, _dict): | |
print(_dict) | |
for k, v in _dict.items(): | |
setattr(_object, k, v) | |
return _object | |
def get_new_manager(db, group_ids): | |
""" | |
:param db: | |
:param group_ids: 1,2,3 | |
:return: manager_id | |
""" | |
if night_manager := db.get_night_manager(): | |
manager_id = night_manager | |
else: | |
manager_id = db.get_last_manager(group_ids) | |
if manager_id: | |
db.update_last_manager(manager_id) | |
return int(manager_id) if manager_id else constants.default_manager_id | |
def create_new_lead( | |
leadname, | |
responsible_user_id, | |
price, | |
tags, | |
status_id, | |
pipeline_id, | |
custom_leads, | |
metal_type, | |
product_type, | |
bonus_type, | |
event_type, | |
question_time_type, | |
contact, | |
): | |
new_lead = Lead( | |
name=leadname, | |
responsible_user=responsible_user_id, | |
price=price, | |
) | |
for tag in tags: | |
tag = tag.strip() | |
new_lead.tags.add(tag) | |
new_lead.status = status_id | |
new_lead.pipeline = pipeline_id | |
new_lead = fill_custom_lead(new_lead, custom_leads) | |
if metal_type: | |
new_lead.metal = get_material_by_name(metal_type) | |
if product_type: | |
new_lead.product = get_material_by_name(product_type) | |
if bonus_type: | |
new_lead.bonus = get_material_by_name(bonus_type) | |
if event_type: | |
new_lead.event = get_material_by_name(event_type) | |
if question_time_type: | |
new_lead.question_time = get_material_by_name(question_time_type) | |
new_lead.save() | |
new_lead.contacts.add(contact) | |
new_lead.save() | |
return new_lead | |
def runner(): | |
while True: | |
try: | |
print(datetime.now(tz=tz)) | |
db = SQLite(database=constants.database_dir) | |
c = None | |
data = db.get_pending_request() | |
if not data: | |
sleep(3) | |
db.close() | |
continue | |
request_id = data[0] | |
dict_data = ast.literal_eval(data[1]) | |
try: | |
if "phone_number" not in dict_data: | |
db.update_error_request( | |
id=request_id, error="no phone_number", retries=99 | |
) | |
db.close() | |
continue | |
except TypeError: | |
db.update_error_request( | |
id=request_id, error="not correct data", retries=99 | |
) | |
db.close() | |
continue | |
# task | |
task_text = constants.task_text | |
# get lead fields | |
leadname = dict_data.get("leadname")[0] | |
tags = ( | |
dict_data.get("tags")[0].split(",") if dict_data.get("tags")[0] else "" | |
) | |
status_id = int(dict_data.get("status_id")[0]) | |
pipeline_id = int(dict_data.get("pipeline_id")[0]) | |
intr_group = dict_data.get("intr_group")[0] | |
price = ( | |
0 | |
if not (dict_data.get("price") and dict_data.get("price")[0]) | |
else dict_data.get("price")[0] | |
) | |
# get contact fields | |
name = dict_data.get("name")[0] | |
phone_number = get_digits(dict_data.get("phone_number")[0]) | |
metal_type = "" if not dict_data.get("metal") else dict_data.get("metal")[0] | |
product_type = "" if not dict_data.get("product") else dict_data.get("product")[0] | |
bonus_type = "" if not dict_data.get("podarok") else dict_data.get("podarok")[0] | |
event_type = "" if not dict_data.get("sobytie") else dict_data.get("sobytie")[0] | |
question_time_type = "" if not dict_data.get("srochno") else dict_data.get("srochno")[0] | |
email = "" if not dict_data.get("email") else dict_data.get("email")[0] | |
custom_leads = get_custom_leads(_dict=dict_data) | |
set_custom_leads(custom_leads) | |
failed_lead = 143 # Закрыто и не реализовано | |
success_lead = 142 # Успешно реализовано | |
contact_id = None | |
try: | |
phone_last_10 = phone_number.strip()[-10:] | |
c = Contact.objects.get(query=phone_last_10) | |
contact_id = c.id | |
leads = list(c.leads) | |
except StopIteration: | |
leads = None | |
if leads: | |
lead = None | |
for _lead in leads: | |
temp_id = _lead.pipeline.id | |
if temp_id == pipeline_id: | |
lead = _lead | |
if not lead: | |
print("No Lead") | |
responsible_user_id = get_new_manager(db=db, group_ids=intr_group) | |
new_lead = create_new_lead( | |
leadname=leadname, | |
responsible_user_id=responsible_user_id, | |
price=price, | |
status_id=status_id, | |
pipeline_id=pipeline_id, | |
custom_leads=custom_leads, | |
contact=c, | |
tags=tags, | |
metal_type=metal_type, | |
product_type=product_type, | |
bonus_type=bonus_type, | |
event_type=event_type, | |
question_time_type=question_time_type, | |
) | |
db.update_success_request( | |
request_id, | |
"Lead not found in current pipeline, new lead is created " | |
"lead_id: {} contact_id: {}".format(new_lead.id, c.id), | |
) | |
db.close() | |
sleep(1.5) | |
continue | |
lead_status = lead.status.id | |
if lead_status == failed_lead: | |
print("failed lead") | |
responsible_user_id = get_new_manager(db=db, group_ids=intr_group) | |
new_lead = create_new_lead( | |
leadname=leadname, | |
responsible_user_id=responsible_user_id, | |
price=price, | |
status_id=status_id, | |
pipeline_id=pipeline_id, | |
custom_leads=custom_leads, | |
contact=c, | |
tags=tags, | |
metal_type=metal_type, | |
product_type=product_type, | |
bonus_type=bonus_type, | |
event_type=event_type, | |
question_time_type=question_time_type, | |
) | |
db.update_success_request( | |
request_id, | |
"Lead was in Failed state, new lead is created" | |
"lead_id: {} contact_id: {}".format(new_lead.id, c.id), | |
) | |
db.close() | |
sleep(1) | |
elif lead_status == success_lead: | |
print("lead is success realized {}".format(lead.id)) | |
task_deadline_unix = int( | |
(datetime.now(tz=tz) + timedelta(hours=24)).timestamp() | |
) | |
Task.objects.create( | |
entity_id=lead.id, | |
entity_type="leads", | |
responsible_user_id=lead.responsible_user.id, | |
text=task_text, | |
complete_till=task_deadline_unix, | |
) | |
for tag in tags: | |
tag = tag.strip() | |
lead.tags.append(tag) | |
lead.save() | |
db.update_success_request( | |
request_id, | |
"Success Realized Lead: Task Created " | |
"lead_id: {} contact_id: {}".format(lead.id, contact_id), | |
) | |
db.close() | |
sleep(1) | |
else: | |
print("lead exists: Check for double") | |
if status_id != lead.status.id: | |
closest_task = lead.closest_task_at | |
if closest_task is None or closest_task < datetime.utcnow(): | |
task_deadline_unix = int( | |
(datetime.now(tz=tz) + timedelta(hours=24)).timestamp() | |
) | |
# AmoCRM returns with utc format 0 tz | |
Task.objects.create( | |
entity_id=lead.id, | |
entity_type="leads", | |
responsible_user_id=lead.responsible_user.id, | |
text=task_text, | |
complete_till=task_deadline_unix, | |
) | |
db.update_success_request( | |
request_id, | |
"Task Created " | |
"lead_id: {} contact_id: {}".format( | |
lead.id, contact_id | |
), | |
) | |
db.update_success_request( | |
request_id, | |
"Lead exist, ignore " | |
"lead_id: {} contact_id: {}".format(lead.id, contact_id), | |
) | |
else: | |
db.update_success_request( | |
request_id, | |
"Contact Double " | |
"lead_id: {} contact_id: {}".format(lead.id, contact_id), | |
) | |
print("ДУБЛЬ") | |
lead.save() | |
db.close() | |
else: # No lead found | |
print("lead or contact not found:", phone_number) | |
# get and update new manager | |
# create new lead | |
responsible_user_id = get_new_manager(db=db, group_ids=intr_group) | |
if not c: # if no contacts | |
# NO CONTACTS | |
new_contact = Contact( | |
name=name, responsible_user=responsible_user_id | |
) | |
for tag in tags: | |
tag = tag.strip() | |
new_contact.tags.append(tag) | |
new_contact.phone = phone_number | |
new_contact.email = email | |
new_contact.save() | |
c = new_contact | |
contact_id = c.id | |
print("contact created 148") | |
new_lead = create_new_lead( | |
leadname=leadname, | |
responsible_user_id=responsible_user_id, | |
price=price, | |
status_id=status_id, | |
pipeline_id=pipeline_id, | |
custom_leads=custom_leads, | |
contact=c, | |
tags=tags, | |
metal_type=metal_type, | |
product_type=product_type, | |
bonus_type=bonus_type, | |
event_type=event_type, | |
question_time_type=question_time_type, | |
) | |
print("lead created 160") | |
db.update_success_request( | |
request_id, | |
"Lead Created and attached to contact " | |
"lead_id: {} contact_id: {}".format(new_lead.id, contact_id), | |
) | |
db.close() | |
sleep(1.5) | |
except Exception as e: | |
print(e) | |
error = str(e).replace("'", '"') | |
db.update_error_request( | |
id=request_id, error="Uncaught exception {}".format(error) | |
) | |
db.close() | |
sleep(0.6) | |
if __name__ == "__main__": | |
print("Worker started at:", datetime.now(tz=tz)) | |
runner() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment