|
import csv |
|
import asyncio |
|
import re |
|
import os |
|
import logging |
|
from datetime import datetime, timedelta |
|
from dataclasses import dataclass, asdict |
|
from typing import List, Set, Optional |
|
from telethon import TelegramClient |
|
from telethon.tl.types import Channel, MessageMediaPhoto |
|
from telethon.errors import FloodWaitError, ChatAdminRequiredError |
|
import argparse |
|
from collections import defaultdict |
|
import pandas as pd |
|
from dotenv import load_dotenv |
|
|
|
# Configure logging |
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler('rental_scraper.log'), |
|
logging.StreamHandler() |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
# Default search locations and societies |
|
DEFAULT_LOCATIONS = ['balewadi', 'baner'] |
|
SKIP_KEYWORDS = ['mahalunge', 'female', 'females', 'ladies', 'girls'] |
|
DEFAULT_SOCIETIES = [ |
|
"Anp Atlantis", "Mantra Monarch", "Malpani Vivanta", "Kunal The Canary", |
|
"Majestique Signature Towers", "Paranjape Magnolia", "Pride Platinum", |
|
"Supreme Amadore", "Surnil Impressions", "Amar Eternity", "Tejas Ela", |
|
"Garden Avenue", "Icon Windsor", "Chordia Solitaire", "Shree Felicita", |
|
"Deron Tresor", "ABIL Imperial", "Shagun Perfect 10", "Rigved Uptown", |
|
"Smile Kaizen", "Kunal Aspiree", "Jhamtani Nandan", "Almal The Terraces", |
|
"Majestique 27", "VTP One Earth", "Kakkad La Vida", "Metropark County", |
|
"Vistaar Icon", "Bhandari 43 Privet Drive", "Avon Vista", "Namoville", |
|
"ANP Universe" |
|
] |
|
|
|
@dataclass |
|
class Message: |
|
group: str |
|
sender_id: int |
|
text: str |
|
date_time: str |
|
photo_path: Optional[str] |
|
contact: str |
|
matched_keywords: List[str] |
|
|
|
class MessageBatch: |
|
def __init__(self): |
|
self.sender_id: Optional[int] = None |
|
self.text: List[str] = [] |
|
self.date: Optional[datetime] = None |
|
self.photo_path: Optional[str] = None |
|
self.contact: Set[str] = set() |
|
self.group: Optional[str] = None |
|
self.matched_keywords: Set[str] = set() |
|
|
|
def is_same_batch(self, sender_id: int, date: datetime, group: str) -> bool: |
|
return ( |
|
self.sender_id == sender_id and |
|
self.date and |
|
self.group == group and |
|
abs(date - self.date) < timedelta(minutes=5) |
|
) |
|
|
|
class MessageProcessor: |
|
def __init__(self, location_keywords: Optional[List[str]] = None, |
|
society_keywords: Optional[List[str]] = None): |
|
self.messages: List[Message] = [] |
|
self.seen_messages: Set[str] = set() |
|
self.location_keywords = location_keywords or DEFAULT_LOCATIONS |
|
self.society_keywords = society_keywords or DEFAULT_SOCIETIES |
|
self.skip_keywords = SKIP_KEYWORDS # Add skip keywords |
|
self.current_batch = MessageBatch() |
|
self.stats = defaultdict(int) |
|
|
|
def _get_message_fingerprint(self, text: str) -> str: |
|
"""Generate a unique fingerprint for message text""" |
|
return ''.join(text.lower().split()) |
|
|
|
def _is_relevant_message(self, text: str) -> List[str]: |
|
"""Check if message contains relevant keywords and return matches""" |
|
text_lower = text.lower() |
|
matches = [] |
|
|
|
# Check locations |
|
for loc in self.location_keywords: |
|
if re.search(rf'\b{re.escape(loc.lower())}\b', text_lower): |
|
matches.append(loc) |
|
|
|
# Skip messages containing skip keywords |
|
for skip_word in self.skip_keywords: |
|
if re.search(rf'\b{re.escape(skip_word.lower())}\b', text_lower): |
|
self.stats['skipped'] += 1 # Track skipped messages |
|
return [] |
|
|
|
# Check societies |
|
for society in self.society_keywords: |
|
if re.search(rf'\b{re.escape(society.lower())}\b', text_lower): |
|
matches.append(society) |
|
|
|
return matches |
|
|
|
def _extract_contacts(self, text: str) -> List[str]: |
|
"""Extract phone numbers from message text""" |
|
phone_patterns = [ |
|
r'(?:(?:\+|0{0,2})91[\s-]?)?[789]\d{9}', # Standard Indian mobile numbers |
|
r'(?:(?:\+|0{0,2})91[\s-]?)?[789]\d{4}[\s-]?\d{6}' # Numbers with space/hyphen |
|
] |
|
contacts = set() |
|
for pattern in phone_patterns: |
|
matches = re.findall(pattern, text) |
|
contacts.update(re.sub(r'\D', '', match) for match in matches) |
|
return list(contacts) |
|
|
|
def process_message(self, sender_id: int, message_text: str, |
|
message_date: datetime, group_name: str, |
|
photo_path: Optional[str] = None) -> bool: |
|
"""Process a single message and determine if it should be saved""" |
|
if not message_text: |
|
return False |
|
|
|
self.stats['total_processed'] += 1 |
|
|
|
try: |
|
matching_keywords = self._is_relevant_message(message_text) |
|
if not matching_keywords: |
|
return False |
|
|
|
fingerprint = self._get_message_fingerprint(message_text) |
|
if fingerprint in self.seen_messages: |
|
self.stats['duplicates'] += 1 |
|
return False |
|
|
|
self.seen_messages.add(fingerprint) |
|
self.stats['relevant_found'] += 1 |
|
|
|
contacts = self._extract_contacts(message_text) |
|
|
|
self._handle_message_batch(sender_id, message_text, message_date, |
|
group_name, photo_path, contacts, matching_keywords) |
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing message: {e}") |
|
return False |
|
|
|
def _handle_message_batch(self, sender_id: int, message_text: str, |
|
message_date: datetime, group_name: str, |
|
photo_path: Optional[str], contacts: List[str], |
|
matching_keywords: List[str]) -> None: |
|
"""Handle message batching logic""" |
|
if self.current_batch.is_same_batch(sender_id, message_date, group_name): |
|
self.current_batch.text.append(message_text) |
|
if photo_path: |
|
self.current_batch.photo_path = photo_path |
|
if contacts: |
|
self.current_batch.contact.update(contacts) |
|
self.current_batch.matched_keywords.update(matching_keywords) |
|
else: |
|
self._save_current_batch() |
|
self.current_batch = MessageBatch() |
|
self.current_batch.sender_id = sender_id |
|
self.current_batch.text = [message_text] |
|
self.current_batch.date = message_date |
|
self.current_batch.photo_path = photo_path |
|
self.current_batch.contact = set(contacts) |
|
self.current_batch.group = group_name |
|
self.current_batch.matched_keywords = set(matching_keywords) |
|
|
|
def _save_current_batch(self) -> None: |
|
"""Save the current batch if it contains messages""" |
|
if self.current_batch.text: |
|
message = Message( |
|
group=self.current_batch.group, |
|
sender_id=self.current_batch.sender_id, |
|
text='\n\n'.join(self.current_batch.text), |
|
date_time=self.current_batch.date.strftime("%Y-%m-%d %H:%M:%S"), |
|
photo_path=self.current_batch.photo_path, |
|
contact=', '.join(self.current_batch.contact), |
|
matched_keywords=list(self.current_batch.matched_keywords) |
|
) |
|
self.messages.append(message) |
|
|
|
class TelegramScraper: |
|
def __init__(self, client: TelegramClient, download_photos: bool = False): |
|
self.client = client |
|
self.download_photos = download_photos |
|
os.makedirs('photos', exist_ok=True) |
|
os.makedirs('csv_files', exist_ok=True) |
|
|
|
async def download_photo(self, message, post_id: str) -> Optional[str]: |
|
"""Download photo from message if available""" |
|
try: |
|
if message.media and isinstance(message.media, MessageMediaPhoto): |
|
path = f'photos/{post_id}.jpg' |
|
await self.client.download_media(message, path) |
|
return path |
|
except Exception as e: |
|
logger.error(f"Error downloading photo: {e}") |
|
return None |
|
|
|
async def fetch_messages(self, groups: List[Channel], processor: MessageProcessor, |
|
days: int = 1) -> None: |
|
"""Fetch and process messages from all groups""" |
|
cutoff_date = datetime.now() - timedelta(days=days) |
|
|
|
for group in groups: |
|
try: |
|
logger.info(f"Processing: {group.title}") |
|
async for message in self.client.iter_messages(group): |
|
if message.date.replace(tzinfo=None) < cutoff_date: |
|
logger.info(f"Reached messages older than {days} days in {group.title}") |
|
break |
|
|
|
if not message.text: |
|
continue |
|
|
|
photo_path = None |
|
if self.download_photos: |
|
post_id = f"{group.id}_{message.id}" |
|
photo_path = await self.download_photo(message, post_id) |
|
|
|
processor.process_message( |
|
sender_id=message.sender_id, |
|
message_text=message.text, |
|
message_date=message.date, |
|
group_name=group.title, |
|
photo_path=photo_path |
|
) |
|
|
|
# Print progress |
|
if processor.stats['total_processed'] % 100 == 0: |
|
logger.info( |
|
f"Processed: {processor.stats['total_processed']} | " |
|
f"Relevant: {processor.stats['relevant_found']} | " |
|
f"Duplicates: {processor.stats['duplicates']}" |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"Error in {group.title}: {e}") |
|
continue |
|
|
|
async def list_groups(self) -> List[Channel]: |
|
"""List all relevant Telegram groups""" |
|
groups = [] |
|
logger.info("Scanning groups...") |
|
try: |
|
async for dialog in self.client.iter_dialogs(): |
|
if isinstance(dialog.entity, Channel): |
|
if any(keyword in dialog.entity.title.lower() |
|
for keyword in ['flat', 'room', 'rental', 'property']): |
|
groups.append(dialog.entity) |
|
logger.info(f"Found group: {dialog.entity.title}") |
|
except Exception as e: |
|
logger.error(f"Error listing groups: {e}") |
|
return groups |
|
|
|
def save_to_csv(messages: List[Message], filename: str, days: int) -> None: |
|
"""Save processed messages to CSV file""" |
|
if not messages: |
|
logger.warning("No messages to save") |
|
return |
|
|
|
|
|
|
|
|
|
|
|
filename = f"{filename[:-4]}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_last_{days}days.csv" |
|
|
|
filepath = os.path.join("csv_files", filename) |
|
|
|
try: |
|
df = pd.DataFrame([asdict(msg) for msg in messages]) |
|
df = df.drop_duplicates(subset=['text', 'sender_id'], keep='first') |
|
df.to_csv(filepath, index=False, encoding='utf-8-sig') |
|
logger.info(f"Saved {len(df)} messages to {filepath}") |
|
except Exception as e: |
|
logger.error(f"Error saving to CSV: {e}") |
|
|
|
async def main(): |
|
parser = argparse.ArgumentParser(description='Telegram Rental Listing Scraper') |
|
parser.add_argument('--download-photos', action='store_true', |
|
help='Download photos from messages') |
|
parser.add_argument('--days', type=int, default=1, |
|
help='Number of days to look back (default: 1)') |
|
parser.add_argument('--locations', nargs='+', |
|
default=DEFAULT_LOCATIONS, |
|
help='Location keywords to filter messages') |
|
args = parser.parse_args() |
|
|
|
load_dotenv() |
|
|
|
try: |
|
async with TelegramClient('rental_scraper', |
|
os.getenv('api_id'), |
|
os.getenv('api_hash')) as client: |
|
|
|
scraper = TelegramScraper(client, args.download_photos) |
|
groups = await scraper.list_groups() |
|
|
|
if groups: |
|
processor = MessageProcessor(args.locations, DEFAULT_SOCIETIES) |
|
await scraper.fetch_messages(groups, processor, args.days) |
|
processor._save_current_batch() # Save any remaining batched messages |
|
messages = processor.messages |
|
save_to_csv(messages, 'rental_listings.csv', args.days) |
|
else: |
|
logger.warning("No relevant groups found") |
|
|
|
except Exception as e: |
|
logger.error(f"Application error: {e}") |
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |