import time import asyncio from telethon import TelegramClient, events from telethon.errors.rpcerrorlist import AuthKeyError from telethon.sync import TelegramClient as SyncTelegramClient import json import datetime import signal import sys import math import requests from telethon.tl.types import MessageEntityTextUrl from apscheduler.schedulers.background import BackgroundScheduler from telethon.tl.types import PeerUser, PeerChat, PeerChannel import os import traceback import re from apscheduler.schedulers.asyncio import AsyncIOScheduler old_print = print def timestamped_print(*args, **kwargs): old_print(datetime.datetime.utcnow().replace( microsecond=0), *args, **kwargs) print = timestamped_print print('\n'*5) def global_exception_hook(exc_type, exc_value, exc_traceback): print(f"Global exception: {exc_type.__name__} - {exc_value}") traceback.print_tb(exc_traceback) raise def keep_alnum_forgmgn(string): return re.sub(r'[^a-zA-Z0-9\.\s\+\-]', ' ', string) def keep_alnum(string): return re.sub(r'[^a-zA-Z0-9\.\s:+-]', ' ', string) def term_sig_handler(signum, frame): # save_obj_unraydium_token() print(f'catched singal: {signum}') sys.exit() def global_exception_hook(exc_type, exc_value, exc_traceback): print(f"Global exception: {exc_type.__name__} - {exc_value}") traceback.print_tb(exc_traceback) raise int_pattern = r"^\d+$" api_id_gate = 29760088 api_hash_gate = "7cd2222e0629c770789f9c6cf280d051" # ID: 6541198030, Name: Fluxbot api_id_bill = 26338039 api_hash_bill = "09bcd8c34c696df21466303aa13c8913" GMGN_bot_id = 6344329830 Trojan_on_Solana_Odysseus_id = 6997957200 # Trojan_on_Solana_Achilles_id = 7164284518 feishu_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/a68cffbf-9104-427b-8e25-62dfaf363a47' utc_plus_8 = datetime.timezone(datetime.timedelta(hours=8)) client_gate = TelegramClient('anon_gate', api_id_gate, api_hash_gate) client_gate.start() async def send_tokenaddress_message(GMGN_id , str_tokenaddress): print(f"enter send_token_address_message str_tokenaddress={str_tokenaddress}") await client_gate.send_message(GMGN_id, str_tokenaddress) return async def send_pairaddress_message(chat_Trojan_on_Solana_Odysseus_id , str_pairaddress): print(f"enter send_pairaddress_message str_pairaddress={str_pairaddress}") await client_gate.send_message(chat_Trojan_on_Solana_Odysseus_id, str_pairaddress) return # flux 获取dlao swap info 保存进 obj_queueing_pairaddress # 在一个定时器 的函数中 便利读取 obj_queueing_pairaddress ,对每一个元素进行check 如果可以买卖则进行买卖 def send_token_info_to_feishu(token_info): print("send_token_info_to_feishu token_info=",token_info) return import pathlib import shutil set_has_read_file=set() obj_receive_pair={ # "str_pairaddress":{ # "str_tokenaddress":"", # "timestamp":0, # } } obj_queueing_pairaddress={} obj_history_pariaddress={ # "str_pairaddress":{ # "send_feishu_timestamp":0, # "number_open":0 # "arr_history_timestamp"=[] # } } obj_last_20m_tokens={ # "str_pairaddress":{ # "str_pairaddress":"", # "timestamp":0, # "str_tokenaddress":"", # "number_open": 0, # "number_holders": 0, # } } import pandas as pd async def ready_send_feishu_last_20m_tokens_timed(): # 获取最近20分钟在dex出现的token 将其发送给飞书 global obj_last_20m_tokens arr_str_pairaddress = list(obj_last_20m_tokens.keys()) if len(arr_str_pairaddress)==0: return cur_now_timestamp = int(time.time()*1000) arr_queueing_send_feishu_last_20min = [] for str_pairaddress in arr_str_pairaddress: if obj_last_20m_tokens[str_pairaddress]["timestamp"] < cur_now_timestamp - 20 * 60 * 1000: del obj_last_20m_tokens[str_pairaddress] continue else: str_tokenaddress = "unknown" if obj_receive_pair.get(str_pairaddress) is not None : str_tokenaddress=obj_receive_pair.get(str_pairaddress)["str_tokenaddress"] arr_queueing_send_feishu_last_20min.append({ "str_pairaddress":str_pairaddress, "str_tokenaddress":str_tokenaddress, "str_number_open":str(obj_last_20m_tokens[str_pairaddress]["number_open"]) , "str_number_holders":str(obj_last_20m_tokens[str_pairaddress]["number_holders"] ), }) if len(arr_queueing_send_feishu_last_20min)==0: return send_feishu_last_20m(arr_queueing_send_feishu_last_20min) return async def count_recent_tokenaddress_And_send_feishu(): global obj_history_pariaddress arr_str_pairaddress = list(obj_history_pariaddress.keys()) if len(arr_str_pairaddress)==0: return cur_now_timestamp = int(time.time()*1000) arr_queueing_send_feishu_cout_pairaddress = [] for str_pairaddress in arr_str_pairaddress: cur_arr_history_timestamp = obj_history_pariaddress[str_pairaddress]["arr_history_timestamp"] obj_history_pariaddress[str_pairaddress]["arr_history_timestamp"] = [history_timestamp for history_timestamp in cur_arr_history_timestamp if (history_timestamp > cur_now_timestamp - 0.5 *3600*1000 ) ] if len(obj_history_pariaddress[str_pairaddress]["arr_history_timestamp"])<=1: continue obj_history_pariaddress[str_pairaddress]["arr_history_timestamp"].sort() cur_max_timestamp = max(obj_history_pariaddress[str_pairaddress]["arr_history_timestamp"]) cur_secmax_timestamp = obj_history_pariaddress[str_pairaddress]["arr_history_timestamp"][-2] # print( f"{str_pairaddress} cur_max_timestamp={cur_max_timestamp} obj_history_pariaddress= ",obj_history_pariaddress[str_pairaddress]) if cur_max_timestamp > obj_history_pariaddress[str_pairaddress]["send_feishu_timestamp"] : obj_history_pariaddress[str_pairaddress]["send_feishu_timestamp"] = cur_now_timestamp if cur_secmax_timestamp > cur_now_timestamp - 0.25*3600*1000: # 上一次 获取到此pair信息是在30分钟内 str_tokenaddress = "unknown" if obj_receive_pair.get(str_pairaddress) is not None: str_tokenaddress=obj_receive_pair.get(str_pairaddress)["str_tokenaddress"] arr_queueing_send_feishu_cout_pairaddress.append({ "str_pairaddress":str_pairaddress, "counts":str(len(obj_history_pariaddress[str_pairaddress]["arr_history_timestamp"])), "str_number_open":str( (obj_history_pariaddress[str_pairaddress]["number_open"])), "str_tokenaddress":str_tokenaddress, }) # print("arr_queueing_send_feishu_cout_pairaddress=",arr_queueing_send_feishu_cout_pairaddress) continue if len(arr_queueing_send_feishu_cout_pairaddress)==0: return send_feishu_counts_pairaddress(arr_queueing_send_feishu_cout_pairaddress) return def save_pairaddress_2_obj_last_20m_tokens(str_pairaddress): global obj_last_20m_tokens if obj_last_20m_tokens.get(str_pairaddress) is None: obj_last_20m_tokens[str_pairaddress] = { "str_pairaddress": str_pairaddress , "timestamp":0, "str_tokenaddress":"unknown", "number_open": 0 , "number_holders": 0 , } obj_last_20m_tokens[str_pairaddress]["timestamp"] = (int(time.time()*1000)) return def save_pairaddress_2_obj_history_pariaddress(str_pairaddress): global obj_history_pariaddress if obj_history_pariaddress.get(str_pairaddress) is None: obj_history_pariaddress[str_pairaddress] = { "send_feishu_timestamp":0, "number_open":0, "arr_history_timestamp":[] } obj_history_pariaddress[str_pairaddress]["arr_history_timestamp"].append(int(time.time()*1000)) return is_handleing_pairaddress_file = False async def handle_pairaddress_file(): global set_has_read_file,is_handleing_pairaddress_file ,obj_receive_pair ,obj_queueing_pairaddress if is_handleing_pairaddress_file==True: return is_handleing_pairaddress_file = True arr_csv_file = [f for f in os.listdir( "/root/Downloads/") if f.endswith(".csv")] cur_now_timestamp = int(time.time()*1000) for csv_file in arr_csv_file: handle_csv_file = csv_file.replace(".csv","") cur_csv_type = None if "short" in handle_csv_file: cur_csv_type = "short" str_timestamp = handle_csv_file.replace("_short","") elif "long" in handle_csv_file: cur_csv_type = "long" str_timestamp = handle_csv_file.replace("_long","") else: os.remove(f"/root/Downloads/{csv_file}") continue if not re.match(int_pattern, str_timestamp): os.remove(f"/root/Downloads/{csv_file}") continue csv_file_timestamp = int(str_timestamp) # 2分钟以前的文件直接删除 if cur_now_timestamp - csv_file_timestamp >= 2 * 60 * 1000: # os.path.exists(f"/root/Downloads/{csv_file}"): os.remove(f"/root/Downloads/{csv_file}") #重新获取当前csv文本 arr_csv_file = [f for f in os.listdir( "/root/Downloads/") if f.endswith(".csv")] set_cur_file = set(arr_csv_file) arr_cur_no_read_file =list (set_cur_file - set_has_read_file) if len(arr_cur_no_read_file)==0: is_handleing_pairaddress_file = False return df = None arr_pairaddress =[] try: for csv_file in arr_cur_no_read_file: cur_df = pd.read_csv(f"/root/Downloads/{csv_file}",dtype=object) df =pd.concat([df,cur_df]) arr_pairaddress = df["pairaddress"].dropna().tolist() except Exception as e: print(e) print("arr_cur_no_read_file=",arr_cur_no_read_file ) is_handleing_pairaddress_file = False return if os.path.exists(f"/root/Desktop/gendan/total_df.csv" ): df.to_csv(f"/root/Desktop/gendan/total_df.csv", mode='a',header =False , index = False ) else: df.to_csv(f"/root/Desktop/gendan/total_df.csv", index = False ) # for pairaddress in arr_pairaddress: for idx,str_pairaddress in enumerate(arr_pairaddress): save_pairaddress_2_obj_history_pariaddress(str_pairaddress) save_pairaddress_2_obj_last_20m_tokens(str_pairaddress) if obj_receive_pair.get(str_pairaddress) is None: obj_receive_pair[str_pairaddress]={ "str_tokenaddress":"unknown", "timestamp":cur_now_timestamp } obj_receive_pair[str_pairaddress]["timestamp"] =cur_now_timestamp cur_key = cur_now_timestamp while True: cur_key = cur_key - (idx+2)*2 if obj_queueing_pairaddress.get(cur_key) is None: obj_queueing_pairaddress[cur_key] = { "str_pairaddress":str_pairaddress, "is_sending":False, "sending_time":0, "send_counts":0, "pair_timestamp" :cur_key, } break else: continue elif obj_receive_pair.get(str_pairaddress)["str_tokenaddress"] != "unknown" and cur_now_timestamp - obj_receive_pair.get(str_pairaddress)["timestamp"] >= 4 * 60* 1000: # 离上次处理过了 4分钟 再处理 重新获取此pair 信息 # 为什么是4 因为 油猴脚本是5分钟重新获取已存在的pair ,为了可以在油猴脚本处理后在处理pair ,要先处理当前py的记录 obj_receive_pair[str_pairaddress]["timestamp"] =cur_now_timestamp cur_key = cur_now_timestamp while True: cur_key = cur_key + (idx+11)*2 if obj_queueing_tokenaddress.get(cur_key) is None: obj_queueing_tokenaddress[cur_key] = { "str_tokenaddress": obj_receive_pair[str_pairaddress]["str_tokenaddress"] , "str_pairaddress": str_pairaddress, "is_sending":False, "sending_time":0, "send_counts":0, "token_timestamp" :cur_key, } break else: continue continue else: continue set_has_read_file = set(arr_csv_file) is_handleing_pairaddress_file = False return async def handle_obj_queueing_tokenaddress(obj_one_queueing_tokenaddress): global obj_queueing_tokenaddress cur_now_timestamp = int(time.time()*1000) str_tokenaddress = obj_one_queueing_tokenaddress["str_tokenaddress"] token_timestamp = obj_one_queueing_tokenaddress["token_timestamp"] if obj_one_queueing_tokenaddress["is_sending"] == True: if cur_now_timestamp - obj_one_queueing_tokenaddress["sending_time"] >=obj_one_queueing_tokenaddress["send_counts"]* 40*1000: print(f"token_timestamp={token_timestamp} str_tokenaddress={str_tokenaddress} is_sending_too_long del") del obj_queueing_tokenaddress[token_timestamp] else: print(f"token_timestamp={token_timestamp} str_tokenaddress={str_tokenaddress} is_sending") return True obj_one_queueing_tokenaddress["sending_time"] = cur_now_timestamp obj_one_queueing_tokenaddress["send_counts"]+=1 obj_one_queueing_tokenaddress["is_sending"] = True await send_tokenaddress_message(GMGN_id=GMGN_bot_id , str_tokenaddress=str_tokenaddress) return True async def handle_obj_queueing_pairaddress(obj_one_queueing_pairaddress ): global obj_queueing_pairaddress , Trojan_on_Solana_Odysseus_id cur_now_timestamp = int(time.time()*1000) str_pairaddress = obj_one_queueing_pairaddress["str_pairaddress"] pair_timestamp = obj_one_queueing_pairaddress["pair_timestamp"] if obj_one_queueing_pairaddress["is_sending"] == True: if cur_now_timestamp - obj_one_queueing_pairaddress["sending_time"] >=obj_one_queueing_pairaddress["send_counts"]* 40*1000: print(f"pair_timestamp={pair_timestamp} str_pairaddress={str_pairaddress} is_sending_too_long del") del obj_queueing_pairaddress[pair_timestamp] else: print(f"pair_timestamp={pair_timestamp} str_pairaddress={str_pairaddress} is_sending") return True # print((f"handle_obj_queueing_pairaddress str_pairaddress={str_pairaddress} ")) obj_one_queueing_pairaddress["sending_time"] = cur_now_timestamp obj_one_queueing_pairaddress["send_counts"]+=1 obj_one_queueing_pairaddress["is_sending"] = True await send_pairaddress_message(chat_Trojan_on_Solana_Odysseus_id=Trojan_on_Solana_Odysseus_id,str_pairaddress=str_pairaddress) return True obj_queueing_send_feishu={ # "feishu_timestamp":{ # "is_sending":False, # "str_tokenaddress":str_tokenaddress, # "str_now_price":str_now_price, # "number_open":number_open, # "number_liq":number_liq, # "number_holders":number_holders, # "number_top_10_holding":number_top_10_holding, # "last_5m_change":last_5m_change, # "last_1h_change":last_1h_change, # "feishu_timestamp":cur_now_timestamp, # } } async def ready_send_feishu(obj_temp_queueing_send_feishu): global obj_queueing_send_feishu , utc_plus_8 cur_now_timestamp = int(time.time()*1000) arr_send_feishu_info = [] for obj_one_queueing_send_feishu in obj_temp_queueing_send_feishu.values(): str_tokenaddress = obj_one_queueing_send_feishu["str_tokenaddress"] feishu_timestamp = obj_one_queueing_send_feishu["feishu_timestamp"] str_pairaddress = obj_one_queueing_send_feishu["str_pairaddress"] if obj_one_queueing_send_feishu["is_sending"] == True: if cur_now_timestamp - obj_one_queueing_send_feishu["sending_time"] >=obj_one_queueing_send_feishu["send_counts"]* 15*1000: print(f"feishu_timestamp={feishu_timestamp} str_tokenaddress={str_tokenaddress} is_sending_too_long del") del obj_queueing_send_feishu[feishu_timestamp] else: print(f"feishu_timestamp={feishu_timestamp} str_tokenaddress={str_tokenaddress} is_sending") return True obj_one_queueing_send_feishu["sending_time"] = cur_now_timestamp obj_one_queueing_send_feishu["send_counts"]+=1 obj_one_queueing_send_feishu["is_sending"] = True # 将时间戳转换为datetime对象,并指定时区 ,转换为ISO 8601格式的时间字符串,包含时区信息 str_iso8601_date = datetime.datetime.fromtimestamp(feishu_timestamp/1000, tz=utc_plus_8).isoformat().split('.')[0] str_now_price = obj_one_queueing_send_feishu["str_now_price"] str_last_5m_change = str(obj_one_queueing_send_feishu["last_5m_change"]) str_last_1h_change = str(obj_one_queueing_send_feishu["last_1h_change"]) str_number_holders = str(obj_one_queueing_send_feishu["number_holders"]) str_number_open = "" if obj_one_queueing_send_feishu["number_open"] >24: str_number_open = str(obj_one_queueing_send_feishu["number_open"]/24) + "d" else: str_number_open = str(obj_one_queueing_send_feishu["number_open"]) + "h" arr_send_feishu_info.append( { "str_iso8601_date": str_iso8601_date, "str_tokenaddress":str_tokenaddress, "str_pairaddress":str_pairaddress, "str_number_holders":str_number_holders, "str_last_5m_change":str_last_5m_change, "str_last_1h_change":str_last_1h_change, "str_now_price": str_now_price, "str_number_open":str_number_open, }) # 发给飞书 send_feishu( arr_send_feishu_info ) for obj_one_queueing_send_feishu in obj_temp_queueing_send_feishu.values(): str_tokenaddress = obj_one_queueing_send_feishu["str_tokenaddress"] feishu_timestamp = obj_one_queueing_send_feishu["feishu_timestamp"] del obj_queueing_send_feishu[feishu_timestamp] return def send_feishu_last_20m(arr_obj_sendmsg): str_iso8601_date = datetime.datetime.fromtimestamp((time.time()), tz=utc_plus_8).isoformat().split('.')[0] payload_message = { "msg_type": "post", "content": { "post": { "zh_cn": { "title": "LastToken20Min " + str_iso8601_date, "content": [ ] } } } } headers = { "Content-Type": "application/json; charset=utf-8", } content = [] for obj_sendmsg in arr_obj_sendmsg: content.append([ { "tag": "text", 'text': obj_sendmsg['str_pairaddress'] }, { "tag": "text", 'text': ' ' } , { "tag": "text", 'text': "ho:" + obj_sendmsg["str_number_holders"] }, { "tag": "text", 'text': ' ' } , { "tag": "text", 'text': "op:" + obj_sendmsg["str_number_open"] }, ]) # content.append([{ # "tag": "text", # "text":obj_sendmsg["str_tokenaddress"], # }]) if obj_sendmsg["str_tokenaddress"] == "unknown": content.append([{ "tag": "a", "href": "https://dexscreener.com/solana/"+ obj_sendmsg["str_pairaddress"], "text": "DEX "+ obj_sendmsg["str_pairaddress"][-4:] }]) else: content.append([{ "tag": "a", "href": "https://gmgn.ai/sol/token/" + obj_sendmsg["str_tokenaddress"], "text": "GMGN " + obj_sendmsg["str_tokenaddress"][-4:] }]) content.append([{ "tag": "text", "text": "---------------" }]) content.append([{ "tag": "at", "user_id": "all", "user_name": "allman" }]) payload_message["content"]["post"]["zh_cn"]["content"] = content payload_message["content"] = json.dumps(payload_message["content"]) payload_message = json.dumps(payload_message) response = requests.post( url=feishu_url, headers=headers, data=payload_message) # response_json = response.json() # print("response_json=",response_json) return def send_feishu_counts_pairaddress( arr_obj_sendmsg ): str_iso8601_date = datetime.datetime.fromtimestamp((time.time()), tz=utc_plus_8).isoformat().split('.')[0] payload_message = { "msg_type": "post", "content": { "post": { "zh_cn": { "title": "Count " + str_iso8601_date, "content": [ ] } } } } headers = { "Content-Type": "application/json; charset=utf-8", } content = [] for obj_sendmsg in arr_obj_sendmsg: content.append([ { "tag": "text", 'text': obj_sendmsg['str_pairaddress'] }, { "tag": "text", 'text': ' ' } , { "tag": "text", 'text': 'co:' + obj_sendmsg["counts"] }, { "tag": "text", 'text': ' ' } , { "tag": "text", 'text': 'op:' + obj_sendmsg["str_number_open"] }, ]) # content.append([{ # "tag": "text", # "text":obj_sendmsg["str_tokenaddress"], # }]) if obj_sendmsg["str_tokenaddress"] == "unknown": content.append([{ "tag": "a", "href": "https://dexscreener.com/solana/"+ obj_sendmsg["str_pairaddress"], "text": "DEX " + obj_sendmsg["str_pairaddress"][-4:] }]) else: content.append([{ "tag": "a", "href": "https://gmgn.ai/sol/token/" + obj_sendmsg["str_tokenaddress"], "text": "GMGN " + obj_sendmsg["str_tokenaddress"][-4:] }]) content.append([{ "tag": "text", "text": "---------------" }]) content.append([{ "tag": "at", "user_id": "all", "user_name": "allman" }]) payload_message["content"]["post"]["zh_cn"]["content"] = content payload_message["content"] = json.dumps(payload_message["content"]) payload_message = json.dumps(payload_message) response = requests.post( url=feishu_url, headers=headers, data=payload_message) # response_json = response.json() # print("response_json=",response_json) return def send_feishu(arr_obj_sendmsg): payload_message = { "msg_type": "post", "content": { "post": { "zh_cn": { "title": arr_obj_sendmsg[0]["str_iso8601_date"], "content": [ ] } } } } headers = { "Content-Type": "application/json; charset=utf-8", } content = [] for obj_sendmsg in arr_obj_sendmsg: content.append([{ "tag": "text", 'text': obj_sendmsg['str_pairaddress'] } ]) content.append([{ "tag": "text", 'text': obj_sendmsg['str_iso8601_date'] } , { "tag": "text", 'text': ' ' } , { "tag": "text", 'text': 'op:' + obj_sendmsg['str_number_open'] } ]) content.append([ { "tag": "text", 'text': "price:" + obj_sendmsg['str_now_price'] }, { "tag": "text", 'text': ' ' } , { "tag": "text", 'text': 'ho:' + obj_sendmsg["str_number_holders"] } ]) content.append([ { "tag": "text", 'text': "5m: " + obj_sendmsg['str_last_5m_change'] + "%" }, { "tag": "text", 'text': ' ' } , { "tag": "text", 'text': '1h: ' + obj_sendmsg["str_last_1h_change"] + "%" } ]) content.append([{ "tag": "a", "href": "https://gmgn.ai/sol/token/" + obj_sendmsg["str_tokenaddress"], "text": "GMGN " + obj_sendmsg["str_tokenaddress"][-4:] }]) content.append([{ "tag": "text", "text": "*****************" }]) payload_message["content"]["post"]["zh_cn"]["content"] = content payload_message["content"] = json.dumps(payload_message["content"]) payload_message = json.dumps(payload_message) response = requests.post( url=feishu_url, headers=headers, data=payload_message) # response_json = response.json() # print("response_json=",response_json) return async def ready_send_feishu_timed(): global obj_queueing_send_feishu if len(obj_queueing_send_feishu.keys())==0: return arr_feishu_timestamp = (list(obj_queueing_send_feishu.keys())) obj_temp_queueing_send_feishu = {} for feishu_timestamp in arr_feishu_timestamp: obj_temp_queueing_send_feishu[feishu_timestamp] = (obj_queueing_send_feishu[feishu_timestamp]) # del obj_queueing_send_feishu[feishu_timestamp] # obj_one_queueing_send_feishu = obj_queueing_send_feishu[feishu_timestamp] handle_res = await ready_send_feishu(obj_temp_queueing_send_feishu=obj_temp_queueing_send_feishu ) return async def handle_obj_queueing_pairaddress_timed(): global obj_queueing_pairaddress while True: if len(obj_queueing_pairaddress.keys())==0: break pair_timestamp = min(list(obj_queueing_pairaddress.keys())) obj_one_queueing_pairaddress = obj_queueing_pairaddress[pair_timestamp] handle_res = await handle_obj_queueing_pairaddress(obj_one_queueing_pairaddress=obj_one_queueing_pairaddress ) if handle_res == True: break else: continue return async def handle_obj_queueing_tokenaddress_timed(): global obj_queueing_tokenaddress while True: if len(obj_queueing_tokenaddress.keys())==0: break token_timestamp = min(list(obj_queueing_tokenaddress.keys())) obj_one_queueing_tokenaddress = obj_queueing_tokenaddress[token_timestamp] handle_res = await handle_obj_queueing_tokenaddress(obj_one_queueing_tokenaddress=obj_one_queueing_tokenaddress ) if handle_res == True: break else: continue return @client_gate.on(events.NewMessage(chats=[Trojan_on_Solana_Odysseus_id ], incoming=True)) async def handle_incoming_gate_Trojan_NewMessage(event): global obj_queueing_tokenaddress ,obj_queueing_pairaddress event_message = event.message message_text = event.message.message print("enter handle_incoming_gate_Trojan_NewMessage client_gate.on(events.NewMessage message_text=",message_text) pair_timestamp = min(list(obj_queueing_pairaddress.keys())) obj_one_queueing_pairaddress = obj_queueing_pairaddress[pair_timestamp] str_pairaddress = obj_one_queueing_pairaddress["str_pairaddress"] if "Share token with your Reflink" not in message_text: # 不能要 pass del obj_queueing_pairaddress[pair_timestamp] return message_text=keep_alnum(message_text) arr_message_text = message_text.split("\n") str_tokenaddress = None str_tokenaddress=arr_message_text[1].strip() obj_receive_pair[str_pairaddress]["str_tokenaddress"]=str_tokenaddress obj_last_20m_tokens[str_pairaddress]["str_tokenaddress"]=str_tokenaddress obj_tokenaddress_2_pairaddress[str_tokenaddress] = str_pairaddress if str_pairaddress==str_tokenaddress.lower(): # 没有获取到 str_tokenaddress 直接在send_feishu 赋值 global obj_queueing_send_feishu cur_now_timestamp = int(time.time()*1000) cur_key = cur_now_timestamp while True: cur_key = cur_key + 2*2 if obj_queueing_tokenaddress.get(cur_key) is None: obj_queueing_send_feishu[cur_key] = { "is_sending":False, "send_counts":0, "sending_time":0, "str_pairaddress":str_pairaddress, "str_tokenaddress":str_tokenaddress, "str_now_price":"0", "number_open": 0 , "number_liq_usd":"0", "number_holders":"0", "number_top_10_holding":"0", "last_5m_change":"0", "last_1h_change":"0", "feishu_timestamp":cur_key, } break else: continue del obj_queueing_pairaddress[pair_timestamp] return cur_now_timestamp = int(time.time()*1000) obj_queueing_tokenaddress[cur_now_timestamp]={ "str_tokenaddress":str_tokenaddress, "str_pairaddress":str_pairaddress, "is_sending":False, "sending_time":0, "send_counts":0, "token_timestamp" :cur_now_timestamp, } del obj_queueing_pairaddress[pair_timestamp] return obj_queueing_tokenaddress={ } obj_tokenaddress_2_pairaddress={ # "str_tokenaddress":"str_pairaddress" } @client_gate.on(events.NewMessage(chats=[GMGN_bot_id ], incoming=True)) async def handle_incoming_GMGN_bot_NewMessage(event): # 【Solana】MAD (MAD) # $0.0{4}16505 🔴 -2.26% Price Chart (https://gmgn.ai/sol/token/madHpjRn6bd8t78Rsy7NuSuNwWa2HU8ByPobZprHbHv?utm_source=telegram&utm_campaign=tg_cmdbot_hot) | Snipe with Bot (https://t.me/GMGN_sol_bot?start=madHpjRn6bd8t78Rsy7NuSuNwWa2HU8ByPobZprHbHv) # 📈 5m | 1h | 6h: -0.25% | 0.76% | -3.7% # 💰 MC: $16.5M # 💧 Liq: 1,862.71 SOL ($512K 🔥99.98%) # 💰 Initial LP: 15 SOL (100% supply) # 👥 Holders: 12326 # 👨‍🍳 Creator: CNBx....FmP (https://solscan.io/account/CNBxpRqrzZZenHU4jT9XXpcxPChtDafTnnrUfSyYFmP8) (2.0708 SOL) # 🔥 DEV Burnt: -- # 🕒 Open: 43d ago # 🔥 Smart Buy/Sell: -/- # 🦅 DEXScreener: Advertised ❌ / Update Social ❌ # 🔔 Audit: NoMint ✅ / Blacklist ✅ / Burnt ✅ # 👥 Top 10 holdings: 6.92%<30% ✅ # 1.65 (https://gmgn.ai/sol/address/ASTyfSima4LLAdDgoFGkgqoKowG1LZFDr9fAQrg7iaJZ) | 0.71 (https://gmgn.ai/sol/address/4SGsfKU4EZT2QUSKgmHxumaEg7TPqw7Ry5N7QE6Ncu8f) | 0.6 (https://gmgn.ai/sol/address/9N115DvPaQ8JBRpNKYEqHb6zpDroMghY3XTgEY9bPYDY) | 0.6 (https://gmgn.ai/sol/address/4hAFR3vWVxsz72GbRtpSgRLgoHSTbrqBB5XtAsoSjxdW) | 0.59 (https://gmgn.ai/sol/address/DzxJZe5fJiLR2yV5x2VRWrczQFcTu5f267HFMBZk1n5k) | 0.57 (https://gmgn.ai/sol/address/Gi7nuWAcJ1EkbXGxdDiopEbT1f4nP1bcotJefWgM9wPf) | 0.57 (https://gmgn.ai/sol/address/Fx4vXtW56dHegfKjEJJnaTX5JtFg1cdzWVxuHQiZqGxM) | 0.57 (https://gmgn.ai/sol/address/2aYpTsDpE1PRAoEPGfcv9Uo66oBHkBkFLPbnQiRkFJqd) | 0.54 (https://gmgn.ai/sol/address/AxLSsHE6iXssJe6MjLbbgecpbwg2jdgHnM7umMsY3QGz) | 0.52 (https://gmgn.ai/sol/address/4xK8gtJ22sNB7JzYhF6MuckECLWaJLpNaSmUU82FPMWm) # 🐀 Insiders: 2.22% # 📕 Rug Probability: -- # 📒 Rug history: -- # 🎯 First 70 buyers (Snipers 6): # 🌑🌑🌑🌑🌑🌗🌑🌑🌑🌕 # 🌑🌕🌑🌑🌑🌕🌕🌑🌑🌑 # 🌗🌑🌑🌑🌕🌑🌑🌑🌕🌑 # 🌕 Hold: 13 # 🌝 Bought more: 0 # 🌗 Sold part: 3 # 🌑 Sold out: 54 # Total Bought: 23.99% # Current Total Holdings: 0.01% # Token # madHpjRn6bd8t78Rsy7NuSuNwWa2HU8ByPobZprHbHv # Backup BOT: US (https://t.me/US_GMGNBOT?start=madHpjRn6bd8t78Rsy7NuSuNwWa2HU8ByPobZprHbHv) | 01 (https://t.me/GMGN_sol_bot?start=madHpjRn6bd8t78Rsy7NuSuNwWa2HU8ByPobZprHbHv) | 02 (https://t.me/GMGN_sol02_bot?start=madHpjRn6bd8t78Rsy7NuSuNwWa2HU8ByPobZprHbHv) | 03 (https://t.me/GMGN_sol03_bot?start=madHpjRn6bd8t78Rsy7NuSuNwWa2HU8ByPobZprHbHv) | 04 (https://t.me/GMGN_sol04_bot?start=madHpjRn6bd8t78Rsy7NuSuNwWa2HU8ByPobZprHbHv) # 🐦 Twitter (https://twitter.com/madcoinvip) | 🌏 Website (https://www.madcoin.vip/) | ✈️ Telegram (https://t.me/MadCoinVip) global obj_queueing_tokenaddress , obj_last_20m_tokens event_message = event.message message_text = event.message.message print("enter handle_incoming_GMGN_bot_NewMessage client_gate.on(events.NewMessage message_text=",message_text) if len(list(obj_queueing_tokenaddress.keys()))==0: return token_timestamp = min(list(obj_queueing_tokenaddress.keys())) obj_one_queueing_tokenaddress = obj_queueing_tokenaddress[token_timestamp] str_tokenaddress= obj_one_queueing_tokenaddress["str_tokenaddress"] str_pairaddress= obj_one_queueing_tokenaddress["str_pairaddress"] # # 通过 tokenaddress 获取 pairaddress # str_pairaddress = obj_tokenaddress_2_pairaddress[str_tokenaddress] message_text=message_text.lower() message_text=message_text.strip() # if str_tokenaddress.lower() not in message_text: # return str_now_price = None last_5m_change = None last_1h_change = None number_open = 0 number_liq_usd = 0 nomint_flag = False blacklist_flag = False burnt_flag = False number_insiders = 0 number_holders = 0 number_top_10_holding = 0 is_normal = True arr_total_split_message = re.split('\n+', message_text) if "pump status: trading on raydium" in message_text: arr_total_split_message.pop(1) for idx,split_message in enumerate( arr_total_split_message): if idx <= 0: continue split_message=split_message.lower().strip() if "$" in split_message and "price chart" in split_message and str_now_price is None: # 获取当前价格 temp_now_price = split_message.split(" ")[0] temp_now_price=temp_now_price.replace("$","") arr_temp_now_price = temp_now_price.split(".") integer_now_price = arr_temp_now_price[0] fractional_now_price = arr_temp_now_price[1] if "{" in fractional_now_price: fractional_now_price = re.sub(r"[{}]", " ", fractional_now_price) arr_fractional_now_price = fractional_now_price.split(" ") fractional_now_price = "0" * (int(arr_fractional_now_price[-2]))+ arr_fractional_now_price[-1] str_now_price = integer_now_price + "." + fractional_now_price elif "5m" in split_message : # 获取最近时间的价格变化 # split_message = re.sub(r"[|%:📈]", "", split_message) # split_message = re.sub(r"\s+", " ", split_message) split_message = keep_alnum_forgmgn(split_message) split_message = re.sub(r"\s+", " ", split_message) split_message=split_message.strip() arr_price_change = split_message.split(" ") if len(arr_price_change)==6: last_5m_change = (arr_price_change[3]) last_1h_change = (arr_price_change[4]) elif "liq" in split_message: split_message=split_message.lower() if "usdc" in split_message or "usdt" in split_message: # pair 不是sol 是usd pass is_normal = False break split_message=split_message.replace("sol","") split_message=split_message.replace(",","") split_message = keep_alnum_forgmgn(split_message) split_message=split_message.strip() split_message = re.sub(r"\s+", " ", split_message) arr_split_message = split_message.split(' ') # number_liq = float(arr_split_message[1]) split_message_usd = arr_split_message[2] base_usd = 1 if "m" in split_message_usd: split_message_usd =split_message_usd.replace("m","") base_usd = 10**6 elif "k" in split_message_usd: split_message_usd =split_message_usd.replace("k","") base_usd = 10**3 elif "b" in split_message_usd: split_message_usd =split_message_usd.replace("b","") base_usd = 10**9 number_liq_usd = int(base_usd * float(split_message_usd) ) elif "holders" in split_message: split_message = keep_alnum_forgmgn(split_message) split_message=split_message.strip() split_message = re.sub(r"\s+", " ", split_message) arr_split_message = split_message.split(' ') number_holders = int(arr_split_message[1]) elif "open" in split_message: split_message = keep_alnum_forgmgn(split_message) split_message=split_message.strip() split_message = re.sub(r"\s+", " ", split_message) arr_split_message = split_message.split(' ') if "d" in arr_split_message[1]: number_open = int(arr_split_message[1].replace("d","")) *24 elif "h" in arr_split_message[1]: number_open = int(arr_split_message[1].replace("h","")) elif "min" in arr_split_message[1]: number_open = round(int(arr_split_message[1].replace("min","")) / 60 , 2 ) elif "audit" in split_message: split_message =split_message.replace("✅","yes") split_message =split_message.replace("❌","no") split_message =split_message.replace("?","no") split_message = keep_alnum_forgmgn(split_message) split_message=split_message.strip() split_message = re.sub(r"\s+", " ", split_message) arr_split_message = split_message.split(' ') nomint_flag = (arr_split_message[2]=="yes") blacklist_flag = (arr_split_message[4]=="yes") burnt_flag = (arr_split_message[6]=="yes") elif "top 10 holdings" in split_message: split_message = keep_alnum_forgmgn(split_message) split_message=split_message.replace("--","0") split_message=split_message.strip() split_message = re.sub(r"\s+", " ", split_message) arr_split_message = split_message.split(' ') number_top_10_holding = float(arr_split_message[3]) elif "insiders" in split_message: split_message = keep_alnum_forgmgn(split_message) split_message=split_message.replace("--","0") split_message=split_message.strip() split_message = re.sub(r"\s+", " ", split_message) arr_split_message = split_message.split(' ') number_insiders = float(arr_split_message[1]) if (nomint_flag and blacklist_flag and burnt_flag ) == False: is_normal = False # if number_liq_usd <= 150: # # 1 sol = 150 $ # # 700sol = 100k $ # is_normal =False # if number_open <= 24: # is_normal =False if number_holders <= 300: is_normal =False if number_top_10_holding >= 35: is_normal =False if number_insiders >= 20 : is_normal = False if is_normal ==False: print(f"{str_tokenaddress} is unnormal") obj_queueing_tokenaddress[token_timestamp]["need_remove"] = True del obj_queueing_tokenaddress[token_timestamp] if obj_last_20m_tokens.get(str_pairaddress) is not None: del obj_last_20m_tokens[str_pairaddress] return else: print(f"{str_tokenaddress} is normal") obj_last_20m_tokens[str_pairaddress]["number_open"] =number_open obj_last_20m_tokens[str_pairaddress]["number_holders"] =number_holders obj_history_pariaddress[str_pairaddress]["number_open"] =number_open global obj_queueing_send_feishu cur_now_timestamp = int(time.time()*1000) obj_queueing_send_feishu[cur_now_timestamp] = { "is_sending":False, "send_counts":0, "sending_time":0, "str_tokenaddress":str_tokenaddress, "str_pairaddress":str_pairaddress, "str_now_price":str_now_price, "number_open":number_open, "number_liq_usd":number_liq_usd, "number_holders":number_holders, "number_top_10_holding":number_top_10_holding, "last_5m_change":last_5m_change, "last_1h_change":last_1h_change, "feishu_timestamp":cur_now_timestamp, } obj_queueing_tokenaddress[token_timestamp]["need_remove"] = True del obj_queueing_tokenaddress[token_timestamp] return obj_global_info ={ "timedTasks_idx":0 } sys.excepthook = global_exception_hook signal.signal(signal.SIGTERM, term_sig_handler) signal.signal(signal.SIGINT, term_sig_handler) print(f"Begin Listening for messages containing in chats ichat names...") async def timedTasks(): global obj_global_info obj_global_info["timedTasks_idx"]+=1 cur_timedTasks_idx =obj_global_info["timedTasks_idx"] if cur_timedTasks_idx <5: return if cur_timedTasks_idx%3609 == 0: obj_global_info["timedTasks_idx"]=10 if cur_timedTasks_idx%3==0: await handle_pairaddress_file() if cur_timedTasks_idx%5 == 0: await handle_obj_queueing_pairaddress_timed() elif cur_timedTasks_idx%5 == 4: await handle_obj_queueing_tokenaddress_timed() if cur_timedTasks_idx%15==0: # 15s一次飞书 await ready_send_feishu_timed() elif cur_timedTasks_idx% 61==0: await count_recent_tokenaddress_And_send_feishu() elif cur_timedTasks_idx% (1201)==0: await ready_send_feishu_last_20m_tokens_timed() return # scheduler = BackgroundScheduler({'apscheduler.timezone': 'UTC'}) scheduler = AsyncIOScheduler({'apscheduler.timezone': 'UTC'}) scheduler.add_job(timedTasks, "interval", seconds=1) # scheduler.add_job(func=timedTasks, trigger="cron", seconds=10) # '*/1 * * * * *' scheduler.start() client_gate.run_until_disconnected() asyncio.get_event_loop().run_forever() # client_bill.run_until_disconnected()