# bef 获取一段时间(区块)的所有安全token 保存到 f"{librarydata_path}{biaoshi}tokens.csv" 和.xlsx # now 通过bacscan 获取一个token的在一定时间的tokentx 保存到 contracttokentx_path #import pathlib import requests import threading #import json import shutil #import datetime #import os #import time #import pandas as pd #import numpy as np #import copy import urllib.parse from base_class import BaseVariableFunction from base_class import * old_print = print def timestamped_print(*args, **kwargs): old_print(datetime.datetime.utcnow().replace( microsecond=0), *args, **kwargs) print = timestamped_print baseclass = BaseVariableFunction(__file__) base_library = baseclass.open_base_library() # biaoshi = base_library["biaoshi"] # 记录一个token获取的contracttokentx baseclass.makedirpath(baseclass.contracttokentx_path) # errorcontracttokentx一般指tokentx的hash过多 # baseclass.makedirpath(baseclass.errorcontracttokentx_path) print('\n'*5) print(f"{'{:<6}'.format('ENTER')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------") # { "status": "0", "message": "No transactions found", "result": [] } # { "status": "0", "message": "NOTOK", "result": "Max rate limit reached, please use API Key for higher rate limit" } # {'status': '0', 'message': 'NOTOK', 'result': 'Error! Invalid contract address format'} apikeys = [ "1343VMST2JRK8P39A88KU8E97VGY36WFDI", "TCH422YS6KIUFFJM34IQ4JX9V5K1Q1R4GX", "GD7FYJH4YJ13WNIEQUCYEGQEIE31A5XVXY", "WJG7C1BEVRIX8KTJ57JNB1FPKKP27D3H1Q", "IAI2DEISA4W81WDD54HGSK3X9TAC8NTUSB", "GIP2AB3M4B91MT396AIX2WGRSG2REGIE8A" ] def request_onetime(req_dict_total, key_idx, write_tokens_path, # writerrorpath, writeerrorname, errorfile, loop): requests_dict = req_dict_total.get(key_idx) if requests_dict is None: return elif requests_dict['isrequseting'] == True: print(f"key_idx {key_idx} is isrequseting\n", end='') return requests_dict['isrequseting'] = True action = requests_dict['action'] actiontype = requests_dict['actiontype'] cur_conadd = baseclass.get_cur_conadd( requests_dict=requests_dict, action=action, actiontype=actiontype) print(f"enter key_idx={key_idx} cur_conadd={cur_conadd}\n", end='') # 如果此次請求結果為最大offset,請求結果不全,如果沒到最大請求次數,繼續請求, # 否則保存錯誤 但不删除这个token的csv if requests_dict["offsettimes"] >= requests_dict["offsettimes_max"]: requests_dict["need_remove"] = True if req_dict_total.get(key_idx) is not None: print(f"remove {action} {cur_conadd} {key_idx}\n", end='') del req_dict_total[key_idx] print( f"{cur_conadd} length to max\n",end='') return req_response = None try: req_response = requests.get( baseclass.baseurl, requests_dict['req_params'], timeout=requests_dict['timeout']) except Exception as e: print(f"{cur_conadd} errmsg={ str(e)}") requests_dict['timeout'] = 30 requests_dict['isrequseting'] = False return requests_dict["req_response"] = req_response write_tokens_path_name = baseclass.get_write_conadd_path_file(cur_conadd=cur_conadd, write_tokens_path=write_tokens_path, action=action, actiontype=actiontype, match_address_index_dic=match_address_index_dic) # 请求结果不成功 if requests_dict["req_response"].status_code != 200: print( f"cur_conadd {cur_conadd} response is {requests_dict['req_response'].status_code}") requests_dict['isrequseting'] = False return # 如果請求結果為成功 tokenresponse = json.loads( requests_dict["req_response"].text) # 結果不正常 continue if tokenresponse["message"] != "OK": baseclass.handle_IF_tokenresponse_NOTOK(tokenresponse=tokenresponse, action=action, cur_conadd=cur_conadd, key_idx=key_idx, requests_dict=requests_dict, req_dict_total=req_dict_total) return # 結果正常 獲取txlist txlist = None add_df = None txlist = tokenresponse["result"] add_df = pd.DataFrame(txlist) add_df["blockNumber"] = add_df["blockNumber"].astype(int) requests_dict["offsettimes"] += 1 if requests_dict["offsettimes"] == 1: requests_dict["isfirstreq"] = True else: requests_dict["isfirstreq"] = False # if requests_dict["isfirstreq"] == True: # 如果是第一次请求 删除过去的csv文件 # 如果有这个csv文件 删除 # if os.path.exists(write_tokens_path_name): # write_tokens_path_name.unlink() if (len(add_df) < 10000): # 如果此次請求結果不是最大offset,請求了全部結果,保存文本 requests_dict["need_remove"] = True if req_dict_total.get(key_idx) is not None: print(f"remove {action} {cur_conadd} {key_idx}\n", end='') del req_dict_total[key_idx] isdfTrue, add_df = baseclass.fir_handle_decivalue_confirmations_appdropcol_df( df=add_df, action=action, actiontype=actiontype) if not isdfTrue: requests_dict["req_params"]["startblock"] = int( requests_dict["endblock"]+1) baseclass.judge_req_completed(requests_dict=requests_dict, action=action, key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total) requests_dict['isrequseting']=False return add_df = add_df[["blockNumber", "timeStamp", "hash","from","to","value"]] # ,,,,,,,,,,,, # 未来可以改成 先判断 blockNumber 的取值范围 再发起请求 cur_minblock = int(add_df["blockNumber"].min()) cur_maxblock = int(add_df["blockNumber"].max()) # 如果没有这个csv文件 if not write_tokens_path_name.exists(): print(f"loop={loop} {cur_conadd} 第一次文件\n", end='') add_df.to_csv( write_tokens_path_name, mode='w', index=False) # 请求的startblock 置为比endblok大 结束请求 requests_dict["req_params"]["startblock"] = int( requests_dict["endblock"]+1) baseclass.judge_req_completed(requests_dict=requests_dict, action=action, key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total) requests_dict['isrequseting']=False return cur_file_df = pd.read_csv( write_tokens_path_name, dtype=object) if len(cur_file_df.index) == 0: print(f"{cur_conadd} 文本行数为0 全部写入\n", end='') add_df.to_csv( write_tokens_path_name, mode='a', header=False, index=False) # 请求的startblock 置为比endblok大 结束请求 requests_dict["req_params"]["startblock"] = int( requests_dict["endblock"]+1) baseclass.judge_req_completed(requests_dict=requests_dict, action=action, key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total) requests_dict['isrequseting']=False return requests_dict = baseclass.savedfblock(cur_conadd, add_df, cur_file_df, requests_dict, write_tokens_path_name, cur_minblock, cur_maxblock,loop) baseclass.judge_req_completed(requests_dict=requests_dict, action=action, key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total) requests_dict['isrequseting']=False return elif (len(add_df) >= 10000): # # 如果此次請求結果為最大offset,請求結果不全,如果沒到最大請求次數,繼續請求, # # 否則保存錯誤 但不删除这个token的csv requests_dict["need_remove"] = False # 更換 startblock cur_minblock = int(add_df["blockNumber"].min()) cur_maxblock = int(add_df["blockNumber"].max()-1) add_df = add_df[add_df["blockNumber"] <= cur_maxblock].reset_index(drop=True) isdfTrue, add_df = baseclass.fir_handle_decivalue_confirmations_appdropcol_df( df=add_df, action=action, actiontype=actiontype) if not isdfTrue: requests_dict["req_params"]["startblock"] = int( cur_maxblock+1) baseclass.judge_req_completed(requests_dict=requests_dict, action=action, key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total) requests_dict['isrequseting']=False return add_df = add_df[["blockNumber", "timeStamp", "hash","from","to","value"]] # 如果没有这个csv文件 直接写入 if not write_tokens_path_name.exists(): print(f"loop={loop} {cur_conadd} 第一次文件\n", end='') add_df.to_csv( write_tokens_path_name, mode='w', index=False) # 请求的startblock 置为比endblok大 结束请求 requests_dict["req_params"]["startblock"] = int( cur_maxblock+1) baseclass.judge_req_completed(requests_dict=requests_dict, action=action, key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total) requests_dict['isrequseting']=False return # 如果已经存在 打开已有的csv 文件 和当前的add_df 以及 整个tokentx 的startblock 和 endblock 进行比较 cur_file_df = pd.read_csv( write_tokens_path_name, dtype=object) if len(cur_file_df.index) == 0: print(f"{cur_conadd} 文本行数为0 全部写入\n", end='') add_df.to_csv( write_tokens_path_name, mode='a', header=False, index=False) requests_dict["req_params"]["startblock"] = int( cur_maxblock+1) baseclass.judge_req_completed(requests_dict=requests_dict, action=action, key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total) requests_dict['isrequseting']=False return requests_dict = baseclass.savedfblock(cur_conadd, add_df, cur_file_df, requests_dict, write_tokens_path_name, cur_minblock, cur_maxblock,loop) baseclass.judge_req_completed(requests_dict=requests_dict, action=action, key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total) requests_dict['isrequseting']=False return return def getonetran(address_list, write_tokens_path, action, actiontype, offsettimes_max): print(f"enter getonetran action={action} actiontype={actiontype}") req_dict_total = {} for idx, address in enumerate(address_list): key_idx = str(idx) req_dict = baseclass.init_req_dict( startblock=obj_match_block[address][0], endblock=obj_match_block[address][1], address=address, key_idx=key_idx, offsettimes_max=offsettimes_max, action=action, actiontype=actiontype, apikey=apikeys[idx % len( apikeys)], remainder_retry_times_max=5, timeout=20) req_dict_total[key_idx] = req_dict loop=0 while (len(req_dict_total.keys())>0): loop +=1 # 獲取可以同步請求的address長度 ,traders 長都可能小於 requests_dict定義的長度 # grequest_len 為 requests_dict長度和實際tokencontracts長度取最小值 print( f"remainder_{action}_address_len :{len(req_dict_total.keys())}") temp_req_dict_total = {} for key in list(req_dict_total.keys()): temp_req_dict_total[key] = copy.deepcopy(req_dict_total[key]) # 進行異步請求 i = 0 thread_list = [] for key in list(temp_req_dict_total.keys()): i += 1 thread = threading.Thread( target=request_onetime, args=( req_dict_total, key, write_tokens_path, loop) ) thread_list.append(thread) if (i % 5 == 0): # 总数10个,查询一遍 for thread in thread_list: thread.start() thread_list = [] time.sleep(1.5) elif i == len(temp_req_dict_total.keys()): # 总数不足10个,把剩下的也查询一遍 print('remainder len less 10' ) for thread in thread_list: thread.start() thread_list = [] time.sleep(3) return def get_3trans_byScan(address_list, write_tokens_path, ): # 此方法獲取 從bscapi獲取的 的3trans # 有 requests_3trans_dict 和requests_tokenadd_dict兩種類型對應contractaddress和address # 使用grequests 進行異步請求獲取 # offsettimes_max 為請求的數據次數,最大總數為 offsettimes_max*offset (offset 為params的offset) # param_sort 為請求參數params 的sort print("enter get_3trans_byScan_()") contractadd_tokentx_list = copy.deepcopy(address_list) getonetran(address_list=contractadd_tokentx_list, write_tokens_path=write_tokens_path, action="tokentx", actiontype="contractaddress", offsettimes_max=9999999) return def gettokens_bytokenAddress(): print("enter gettokens_bytokenAddress_()") df = pd.read_csv( baseclass.librarydata_path/f"tokens.csv", dtype="object") df=df.astype({ "startblock":int, "endblock":int }) contractaddress_list = df['contractaddress'].tolist() arr_startdate = df['startdate'].tolist() arr_enddate = df['enddate'].tolist() arr_startblock = df['startblock'].tolist() arr_endblock = df['endblock'].tolist() # contractaddress_list =["0xfeb9d41cc44c1ab22722cd51aee8accaa5885f2a"] # contractaddress_list = contractaddress_list[0:6] for idx in range(0,len(contractaddress_list)): startblock = 0 endblock =0 contractaddress = contractaddress_list[idx] match_address_index_dic[contractaddress] = contractaddress startdate = arr_startdate[idx] enddate = arr_enddate[idx] startblock = arr_startblock[idx] endblock = arr_endblock[idx] if startblock==0: startblock = baseclass.get_blocknumber_byisostr_bybscscan(isostr=startdate ) if endblock==0: endblock = baseclass.get_blocknumber_byisostr_bybscscan(isostr= enddate ) obj_match_block[contractaddress]=[startblock , endblock] print(f"contractaddress={contractaddress} startblock={startblock} endblock={endblock}") arr_startblock[idx] = startblock arr_endblock[idx] = endblock df["startblock"]=arr_startblock df["endblock"]=arr_endblock df.to_csv( baseclass.librarydata_path/f"tokens.csv", index=False) return contractaddress_list match_address_index_dic = {} obj_match_block={} delete_address_arr = [] token_contracts_list = gettokens_bytokenAddress() print('token_contracts_list=', len(token_contracts_list)) # raise get_3trans_byScan(address_list=token_contracts_list, write_tokens_path=baseclass.contracttokentx_path, ) print(f"{'{:<6}'.format('END')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------")