from telethon import TelegramClient, sync
from telethon.tl.functions.channels import GetParticipantsRequest
from telethon.tl.functions.messages import GetHistoryRequest
from PIL import Image
from wordcloud import WordCloud, STOPWORDS
import matplotlib.pyplot as plt
import numpy as np
import argparse
from getpass import getpass
API_ID ="xxxx"
API_HASH = "xxxxx"
PHONE_NUM = "xxxxxx"
def connect_telegram(api_id, api_hash, phone_number):
print("Trying to connect to Telegram...")
client = TelegramClient("Session", api_id, api_hash)
if not client.start():
print("Could not connect to Telegram servers.")
return None
else:
if not client.is_user_authorized():
print(
"Session file not found. This is the first run, sending code request..."
)
client.sign_in(phone_number)
self_user = None
while self_user is None:
code = input("Enter the code you just received: ")
try:
self_user = client.sign_in(code=code)
except Exception as e:
print(f"Maybe 2FA error for {str(e)}")
pw = getpass(
"Two step verification is enabled. Please enter your password: "
)
self_user = client.sign_in(password=pw)
raise Exception("Please check your password or 2FA")
return client
def get_history_messaes_list(client, chat_info):
offset_id = 0
limit = 100
message_list = []
while 1:
posts = client(
GetHistoryRequest(
peer=chat_info,
limit=100,
offset_date=None,
offset_id=offset_id,
max_id=0,
min_id=0,
add_offset=0,
hash=0,
)
)
if not posts.messages or len(posts.messages) < limit:
break
messages = posts.messages
offset_id = messages[-1].id
print(offset_id)
for message in messages:
if message.message:
message_list.append(message.message)
# for test
return message_list
def generate_word_cloud(text):
wc = WordCloud(background_color="white",
max_words=2000,
height=400,
width=800,
font_path="hei.ttf",
max_font_size=50,)
wc.generate(text)
wc.to_file("test3.png")
def main():
client = connect_telegram(API_ID, API_HASH, PHONE_NUM)
if not client:
raise Exception("Can not auth telegram")
print("Getting chat basic info...")
# uncomments these print line to get the channel or group id
for d in client.iter_dialogs():
# print(d.name, d.id)
pass
# id 1001492060815 is pythonhunter
chat_info = client.get_entity(1001492060815)
message_list = get_history_messaes_list(client, chat_info)
text = "\n".join(message_list)
print(len(text))
generate_word_cloud(text)
return text
if __name__ == "__main__":
main()
Created
November 26, 2021 04:24
-
-
Save yihong0618/a1f904b3fdbdebaabafa04b5cf7551a8 to your computer and use it in GitHub Desktop.
pythonhunter
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment