from base_class import * from base_class import BaseVariableFunction import urllib.parse import shutil import random import json old_print = print def timestamped_print(*args, **kwargs): old_print(datetime.datetime.utcnow().replace( microsecond=0), *args, **kwargs) print = timestamped_print baseclass = BaseVariableFunction(__file__) print('\n'*5) print(f"{'{:<6}'.format('ENTER')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------") baseclass.makedirpath(baseclass.dalao_transfer_solanafm_path) def request_onetime(req_dict_total, key_idx, write_tokens_path, loop): global arr_len_too_max_dalaoaddress requests_dict = req_dict_total.get(key_idx) if requests_dict is None: return elif requests_dict['isrequseting'] == True: return elif requests_dict['need_remove'] == True: print(f"key_idx {key_idx} need_remove\n", end='') if req_dict_total.get(key_idx) is not None: print(f"remove {cur_conadd} {key_idx}\n", end='') del req_dict_total[key_idx] return requests_dict['isrequseting'] = True cur_conadd = requests_dict["req_params"]["address"] print(f"enter key_idx={key_idx} cur_conadd={cur_conadd}\n", end='') # 如果此次請求結果為最大offset,請求結果不全,如果沒到最大請求次數,繼續請求, # 否則保存錯誤 但不删除这个token的csv if requests_dict["offsettimes"] >= requests_dict["offsettimes_max"]: requests_dict["need_remove"] = True if req_dict_total.get(key_idx) is not None: print(f"remove {cur_conadd} {key_idx}\n", end='') del req_dict_total[key_idx] print( f"{cur_conadd} length to max\n", end='') arr_len_too_max_dalaoaddress.append(cur_conadd) arr_len_too_max_dalaoaddress.append( requests_dict['req_params']['endrange']) return req_response = None req_url = f"https://api.solana.fm/v0/accounts/{requests_dict['req_params']['address']}/transfers?utcFrom={requests_dict['req_params']['startrange']}&utcTo={requests_dict['req_params']['endrange']}&limit=100&page=1" try: req_response = requests.get( req_url, timeout=requests_dict['timeout'], headers=baseclass.solana_fm_headers) time.sleep(0.2) except Exception as e: print(f"cur_conadd= {cur_conadd} errmsg={ str(e)}") if (" Max retries exceeded" in str(e)): time.sleep(2+random.random()) requests_dict['timeout'] = 15 requests_dict['isrequseting'] = False return requests_dict["req_response"] = req_response write_tokens_path_name = baseclass.dalao_transfer_solanafm_path / \ f"{cur_conadd}.json" # 请求结果不成功 if requests_dict["req_response"].status_code != 200: print( f"cur_conadd= {cur_conadd} response status_code= {requests_dict['req_response'].status_code}") if requests_dict["req_response"].status_code == 429: time.sleep(0.4) requests_dict['isrequseting'] = False return # 如果請求結果為成功 tokenresponse = json.loads( requests_dict["req_response"].text) # 結果不正常 continue if tokenresponse["status"] != "success": baseclass.handle_IF_tokenresponse_NOTOK(tokenresponse=tokenresponse, cur_conadd=cur_conadd, key_idx=key_idx, requests_dict=requests_dict, req_dict_total=req_dict_total) requests_dict['isrequseting'] = False return # 結果正常 獲取txlist txlist = None txlist = tokenresponse["results"] if (txlist == None or len(txlist) == 0): print(f"loop={loop} add={cur_conadd} txlisst={txlist}") requests_dict["need_remove"] = True if req_dict_total.get(key_idx) is not None: print(f"remove {cur_conadd} {key_idx}\n", end='') del req_dict_total[key_idx] requests_dict['isrequseting'] = False return requests_dict["offsettimes"] += 1 if requests_dict["offsettimes"] == 1: requests_dict["isfirstreq"] = True else: requests_dict["isfirstreq"] = False # 因为只运行一次可能不全 ,会多运行几次 所以不能删除 # if requests_dict["isfirstreq"] == True: # 如果是第一次请求 删除过去的csv文件 # 如果有这个csv文件 删除 # if os.path.exists(write_tokens_path_name): # write_tokens_path_name.unlink() cur_zero_txhash = txlist[0]['transactionHash'] cur_last_txhash = txlist[len(txlist)-1]['transactionHash'] print( f"loop={loop} add={cur_conadd} range={requests_dict['req_params']['startrange'] } {requests_dict['req_params']['endrange']} len={len(txlist)} zero_txhash={cur_zero_txhash} last_txhash={cur_last_txhash}") if (len(txlist) < 100): # 如果此次請求結果不是最大offset,請求了全部結果,保存文本 requests_dict["need_remove"] = True if req_dict_total.get(key_idx) is not None: print(f"remove {cur_conadd} {key_idx}\n", end='') del req_dict_total[key_idx] if (len(txlist) == 0): requests_dict['isrequseting'] = False return cur_minrange = txlist[len(txlist)-1]["data"][0]["timestamp"] cur_maxrange = txlist[0]["data"][0]["timestamp"] # 如果没有这个csv文件 直接写入 cur_file = [] if not write_tokens_path_name.exists(): print(f"loop={loop} {cur_conadd} 第一次文件\n", end='') else: with write_tokens_path_name.open(mode="r") as f: cur_file = json.load(f) cur_file = cur_file + txlist print(f"{cur_conadd} 全部写入\n", end='') with write_tokens_path_name.open(mode='w') as f: json.dump(cur_file, f) requests_dict['isrequseting'] = False return elif (len(txlist) >= 100): # # 如果此次請求結果為最大offset,請求結果不全,如果沒到最大請求次數,繼續請求, # # 否則保存錯誤 但不删除这个token的csv requests_dict["need_remove"] = False # 更換 start cur_minrange = txlist[len(txlist)-1]["data"][0]["timestamp"] cur_maxrange = txlist[0]["data"][0]["timestamp"] if (cur_maxrange == cur_minrange): cur_minrange -= 1 print("cur_minrange", cur_minrange, cur_maxrange) # while (True): # if (txlist[len(txlist)-1]["data"][0]["timestamp"] < cur_minrange+1): # txlist.pop(len(txlist)-1) # continue # else: # break # 如果没有这个csv文件 直接写入 cur_file = [] if not write_tokens_path_name.exists(): print(f"loop={loop} {cur_conadd} 第一次文件\n", end='') else: with write_tokens_path_name.open(mode="r", encoding='utf-8') as f: cur_file = json.load(f) cur_file = cur_file + txlist print(f"{cur_conadd} 全部写入\n", end='') try: with write_tokens_path_name.open(mode='w', encoding='utf-8') as f: json.dump(cur_file, f) except Exception as e: print("txlist=", txlist) raise requests_dict["req_params"]["endrange"] = int( cur_minrange) baseclass.judge_req_completed(requests_dict=requests_dict, key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total) requests_dict['isrequseting'] = False return return def getonetran(address_list, write_tokens_path, offsettimes_max): global startrange global endrange print(f"enter getonetran ") req_dict_total = {} for idx, address in enumerate(address_list): key_idx = str(idx) req_dict = baseclass.init_req_dict( startrange=startrange, endrange=endrange, address=address, key_idx=key_idx, offsettimes_max=offsettimes_max, remainder_retry_times_max=5, timeout=10) req_dict_total[key_idx] = req_dict loop = 0 while (len(req_dict_total.keys()) > 0): loop += 1 # 獲取可以同步請求的address長度 ,traders 長都可能小於 requests_dict定義的長度 # grequest_len 為 requests_dict長度和實際tokencontracts長度取最小值 print( f"remainder_address_len :{len(req_dict_total.keys())}") # temp_req_dict_total = {} temp_split_req_dict_total_list = list(req_dict_total.keys()) temp_split_req_dict_total_list = temp_split_req_dict_total_list[0:20] temp_req_dict_total_keys_list = [] for key in temp_split_req_dict_total_list: temp_req_dict_total_keys_list.append(key) # 進行異步請求 i = 0 thread_list = [] for key in temp_req_dict_total_keys_list: i += 1 request_onetime(req_dict_total, key, write_tokens_path, loop) # thread = threading.Thread( # target=request_onetime, # args=( # req_dict_total, key, write_tokens_path, loop) # ) # thread_list.append(thread) # if (i % 4 == 0): # # 总数4个,查询一遍 # for thread in thread_list: # thread.start() # for thread in thread_list: # thread.join() # thread_list = [] # time.sleep(0.8) # elif i == len(temp_req_dict_total_keys_list): # # 总数不足10个,把剩下的也查询一遍 # print('remainder len less 5', datetime.datetime.now()) # for thread in thread_list: # thread.start() # for thread in thread_list: # thread.join() # thread_list = [] time.sleep(0.3) return def get_3trans_byScan(address_list, write_tokens_path,): # 此方法獲取 從bscapi獲取的 的3trans # 有 requests_3trans_dict 和requests_tokenadd_dict兩種類型對應contractaddress和address # 使用grequests 進行異步請求獲取 # offsettimes_max 為請求的數據次數,最大總數為 offsettimes_max*offset (offset 為params的offset) # param_sort 為請求參數params 的sort print("enter get_3trans_byScan_()") contractadd_tokentx_list = copy.deepcopy(address_list) getonetran(address_list=contractadd_tokentx_list, write_tokens_path=write_tokens_path, offsettimes_max=5 ) return now_timestamp = baseclass.get_current_timestamp() startrange = now_timestamp - 3*24*3600 endrange = now_timestamp+2*24*3600 endrange = 1717323091 print(f"startrang= {startrange} endrange= {endrange}") df = pd.read_excel(baseclass.dalao_total_ana_fm_path / "anaing_dalao.xlsx", dtype=object) arr_str_dalao = df["add"].tolist() print('token_contracts_list', len(arr_str_dalao)) arr_len_too_max_dalaoaddress = [] get_3trans_byScan(address_list=arr_str_dalao, write_tokens_path=baseclass.dalao_transfer_solanafm_path ) print("arr_len_too_max_dalaoaddress=", arr_len_too_max_dalaoaddress) print(f"startrang= {startrange} endrange= {endrange}") print(f"{'{:<6}'.format('END')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------")