# !/usr/bin/env python3 import difflib import os import re import time import asyncio import asyncio.subprocess import logging from telethon import TelegramClient, events, errors from telethon.tl.types import MessageMediaWebPage #***********************************************************************************# api_id = 1234567 # your telegram api id api_hash = '1234567890abcdefgh' # your telegram api hash bot_token = '1234567890:ABCDEFGHIJKLMNOPQRST' # your bot_token admin_id = 1234567890 # your chat id save_path = '/root/Help' # file save path upload_file_set = False # set upload file to google drive drive_id = '5FyJClXmsqNw0-Rz19' # google teamdrive id 如果使用OD,删除''内的内容即可。 drive_name = 'gc' # rclone drive name max_num = 5 # 同时下载数量 # filter file name/文件名过滤 filter_list = ['你好,欢迎加入 Quantumu', '\n'] # filter chat id /指定某些频道下载 whitelist = [] donwload_all_chat = False # 监控所有你加入的频道,收到的新消息如果包含媒体都会下载,默认关闭 filter_file_name = ['sh'] # 指定文件后缀,可以填jpg、avi、mkv、rar等。 # pip install telethon cryptg pillow aiohttp hachoir # 所需的依赖模块 #***********************************************************************************# logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.WARNING) logger = logging.getLogger(__name__) queue = asyncio.Queue() # 文件夹/文件名称处理 def validateTitle(title): r_str = r"[\/\\\:\*\?\"\<\>\|\n]" # '/ \ : * ? " < > |' new_title = re.sub(r_str, "_", title) # 替换为下划线 return new_title # 获取相册标题 async def get_group_caption(message): group_caption = "" entity = await client.get_entity(message.to_id) async for msg in client.iter_messages(entity=entity, reverse=True, offset_id=message.id - 9, limit=10): if msg.grouped_id == message.grouped_id: if msg.text != "": group_caption = msg.text return group_caption return group_caption # 获取本地时间 def get_local_time(): return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 判断相似率 def get_equal_rate(str1, str2): return difflib.SequenceMatcher(None, str1, str2).quick_ratio() # 返回文件大小 def bytes_to_string(byte_count): suffix_index = 0 while byte_count >= 1024: byte_count /= 1024 suffix_index += 1 return '{:.2f}{}'.format( byte_count, [' bytes', 'KB', 'MB', 'GB', 'TB'][suffix_index] ) async def worker(name): while True: queue_item = await queue.get() message = queue_item[0] chat_title = queue_item[1] entity = queue_item[2] file_name = queue_item[3] for filter_file in filter_file_name: if not file_name.endswith(filter_file): return dirname = validateTitle(f'{chat_title}({entity.id})') datetime_dir_name = message.date.strftime("%Y年%m月") file_save_path = os.path.join(save_path, dirname, datetime_dir_name) if not os.path.exists(file_save_path): os.makedirs(file_save_path) # 判断文件是否在本地存在 if file_name in os.listdir(file_save_path): os.remove(os.path.join(file_save_path, file_name)) print(f"{get_local_time()} 开始下载: {chat_title} - {file_name}") try: loop = asyncio.get_event_loop() task = loop.create_task(client.download_media( message, os.path.join(file_save_path, file_name))) await asyncio.wait_for(task, timeout=3600) if upload_file_set: proc = await asyncio.create_subprocess_exec('fclone', 'move', os.path.join( file_save_path, file_name), f"{drive_name}:{{{drive_id}}}/{dirname}/{datetime_dir_name}", '--ignore-existing', stdout=asyncio.subprocess.DEVNULL) await proc.wait() if proc.returncode == 0: print(f"{get_local_time()} - {file_name} 下载并上传完成") except (errors.FileReferenceExpiredError, asyncio.TimeoutError): logging.warning(f'{get_local_time()} - {file_name} 出现异常,重新尝试下载!') async for new_message in client.iter_messages(entity=entity, offset_id=message.id - 1, reverse=True, limit=1): await queue.put((new_message, chat_title, entity, file_name)) except Exception as e: print(f"{get_local_time()} - {file_name} {e}") await bot.send_message(admin_id, f'Error!\n\n{e}\n\n{file_name}') finally: queue.task_done() # 无论是否上传成功都删除文件。 if upload_file_set: try: os.remove(os.path.join(file_save_path, file_name)) except: pass @events.register(events.NewMessage(pattern='/start', from_users=admin_id)) async def handler(update): text = update.message.text.split(' ') if len(text) == 1: await bot.send_message(admin_id, '参数错误,请按照参考格式输入:\n\n ' '/start https://t.me/fkdhlg 0 \n\n' 'Tips:如果不输入offset_id,默认从第一条开始下载。', parse_mode='HTML') return elif len(text) == 2: chat_id = text[1] try: entity = await client.get_entity(chat_id) chat_title = entity.title offset_id = 0 await update.reply(f'开始从{chat_title}的第一条消息下载。') except: await update.reply('chat输入错误,请输入频道或群组的链接') return elif len(text) == 3: chat_id = text[1] offset_id = int(text[2]) try: entity = await client.get_entity(chat_id) chat_title = entity.title await update.reply(f'开始从{chat_title}的第{offset_id}条消息下载。') except: await update.reply('chat输入错误,请输入频道或群组的链接') return else: await bot.send_message(admin_id, '参数错误,请按照参考格式输入:\n\n ' '/start https://t.me/fkdhlg 0 \n\n' 'Tips:如果不输入offset_id,默认从第一条开始下载。', parse_mode='HTML') return if chat_title: print(f'{get_local_time()} - 开始下载:{chat_title}({entity.id}) - {offset_id}') last_msg_id = 0 async for message in client.iter_messages(entity, offset_id=offset_id, reverse=True, limit=None): if message.media: # 如果是一组媒体 caption = await get_group_caption(message) if ( message.grouped_id and message.text == "") else message.text # 过滤文件名称中的广告等词语 if len(filter_list) and caption != "": for filter_keyword in filter_list: caption = caption.replace(filter_keyword, "") # 如果文件文件名不是空字符串,则进行过滤和截取,避免文件名过长导致的错误 caption = "" if caption == "" else f'{validateTitle(caption)} - '[ :50] file_name = '' # 如果是文件 if message.document: if type(message.media) == MessageMediaWebPage: continue if message.media.document.mime_type == "image/webp": continue if message.media.document.mime_type == "application/x-tgsticker": continue for i in message.document.attributes: try: file_name = i.file_name except: continue if file_name == '': file_name = f'{message.id} - {caption}.{message.document.mime_type.split("/")[-1]}' else: # 如果文件名中已经包含了标题,则过滤标题 if get_equal_rate(caption, file_name) > 0.6: caption = "" file_name = f'{message.id} - {caption}{file_name}' elif message.photo: file_name = f'{message.id} - {caption}{message.photo.id}.jpg' else: continue await queue.put((message, chat_title, entity, file_name)) last_msg_id = message.id await bot.send_message(admin_id, f'{chat_title} all message added to task queue, last message is:{last_msg_id}') @events.register(events.NewMessage()) async def all_chat_download(update): message = update.message if message.media: chat_id = update.message.to_id entity = await client.get_entity(chat_id) if not entity.id in whitelist: return chat_title = entity.title # 如果是一组媒体 caption = await get_group_caption(message) if ( message.grouped_id and message.text == "") else message.text if caption != "": for fw in filter_list: caption = caption.replace(fw, '') # 如果文件文件名不是空字符串,则进行过滤和截取,避免文件名过长导致的错误 caption = "" if caption == "" else f'{validateTitle(caption)} - '[:50] file_name = '' # 如果是文件 if message.document: try: if type(message.media) == MessageMediaWebPage: return if message.media.document.mime_type == "image/webp": file_name = f'{message.media.document.id}.webp' if message.media.document.mime_type == "application/x-tgsticker": file_name = f'{message.media.document.id}.tgs' for i in message.document.attributes: try: file_name = i.file_name except: continue if file_name == '': file_name = f'{message.id} - {caption}.{message.document.mime_type.split("/")[-1]}' else: # 如果文件名中已经包含了标题,则过滤标题 if get_equal_rate(caption, file_name) > 0.6: caption = "" file_name = f'{message.id} - {caption}{file_name}' except: print(message.media) elif message.photo: file_name = f'{message.id} - {caption}{message.photo.id}.jpg' else: return # 过滤文件名称中的广告等词语 for filter_keyword in filter_list: file_name = file_name.replace(filter_keyword, "") print(chat_title, file_name) await queue.put((message, chat_title, entity, file_name)) if __name__ == '__main__': bot = TelegramClient('telegram_channel_downloader_bot', api_id, api_hash).start(bot_token=str(bot_token)) client = TelegramClient( 'telegram_channel_downloader', api_id, api_hash).start() bot.add_event_handler(handler) if donwload_all_chat: client.add_event_handler(all_chat_download) tasks = [] try: for i in range(max_num): loop = asyncio.get_event_loop() task = loop.create_task(worker(f'worker-{i}')) tasks.append(task) print('Successfully started (Press Ctrl+C to stop)') client.run_until_disconnected() finally: for task in tasks: task.cancel() client.disconnect() print('Stopped!')