123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- from base_class import *
- from base_class import BaseVariableFunction
- import urllib.parse
- import shutil
- import random
- import json
- old_print = print
- def timestamped_print(*args, **kwargs):
- old_print(datetime.datetime.utcnow().replace(
- microsecond=0), *args, **kwargs)
- print = timestamped_print
- baseclass = BaseVariableFunction(__file__)
- print('\n'*5)
- print(f"{'{:<6}'.format('ENTER')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------")
- baseclass.makedirpath(baseclass.dalao_transfer_solanafm_path)
- def request_onetime(req_dict_total, key_idx, write_tokens_path, loop):
- global arr_len_too_max_dalaoaddress
- requests_dict = req_dict_total.get(key_idx)
- if requests_dict is None:
- return
- elif requests_dict['isrequseting'] == True:
- return
- elif requests_dict['need_remove'] == True:
- print(f"key_idx {key_idx} need_remove\n", end='')
- if req_dict_total.get(key_idx) is not None:
- print(f"remove {cur_conadd} {key_idx}\n", end='')
- del req_dict_total[key_idx]
- return
- requests_dict['isrequseting'] = True
- cur_conadd = requests_dict["req_params"]["address"]
- print(f"enter key_idx={key_idx} cur_conadd={cur_conadd}\n", end='')
- # 如果此次請求結果為最大offset,請求結果不全,如果沒到最大請求次數,繼續請求,
- # 否則保存錯誤 但不删除这个token的csv
- if requests_dict["offsettimes"] >= requests_dict["offsettimes_max"]:
- requests_dict["need_remove"] = True
- if req_dict_total.get(key_idx) is not None:
- print(f"remove {cur_conadd} {key_idx}\n", end='')
- del req_dict_total[key_idx]
- print(
- f"{cur_conadd} length to max\n", end='')
- arr_len_too_max_dalaoaddress.append(cur_conadd)
- arr_len_too_max_dalaoaddress.append(
- requests_dict['req_params']['endrange'])
- return
- req_response = None
- req_url = f"https://api.solana.fm/v0/accounts/{requests_dict['req_params']['address']}/transfers?utcFrom={requests_dict['req_params']['startrange']}&utcTo={requests_dict['req_params']['endrange']}&limit=100&page=1"
- try:
- req_response = requests.get(
- req_url, timeout=requests_dict['timeout'], headers=baseclass.solana_fm_headers)
- time.sleep(0.2)
- except Exception as e:
- print(f"cur_conadd= {cur_conadd} errmsg={ str(e)}")
- if (" Max retries exceeded" in str(e)):
- time.sleep(2+random.random())
- requests_dict['timeout'] = 15
- requests_dict['isrequseting'] = False
- return
- requests_dict["req_response"] = req_response
- write_tokens_path_name = baseclass.dalao_transfer_solanafm_path / \
- f"{cur_conadd}.json"
- # 请求结果不成功
- if requests_dict["req_response"].status_code != 200:
- print(
- f"cur_conadd= {cur_conadd} response status_code= {requests_dict['req_response'].status_code}")
- if requests_dict["req_response"].status_code == 429:
- time.sleep(0.4)
- requests_dict['isrequseting'] = False
- return
- # 如果請求結果為成功
- tokenresponse = json.loads(
- requests_dict["req_response"].text)
- # 結果不正常 continue
- if tokenresponse["status"] != "success":
- baseclass.handle_IF_tokenresponse_NOTOK(tokenresponse=tokenresponse,
- cur_conadd=cur_conadd,
- key_idx=key_idx,
- requests_dict=requests_dict,
- req_dict_total=req_dict_total)
- requests_dict['isrequseting'] = False
- return
- # 結果正常 獲取txlist
- txlist = None
- txlist = tokenresponse["results"]
- if (txlist == None or len(txlist) == 0):
- print(f"loop={loop} add={cur_conadd} txlisst={txlist}")
- requests_dict["need_remove"] = True
- if req_dict_total.get(key_idx) is not None:
- print(f"remove {cur_conadd} {key_idx}\n", end='')
- del req_dict_total[key_idx]
- requests_dict['isrequseting'] = False
- return
- requests_dict["offsettimes"] += 1
- if requests_dict["offsettimes"] == 1:
- requests_dict["isfirstreq"] = True
- else:
- requests_dict["isfirstreq"] = False
- # 因为只运行一次可能不全 ,会多运行几次 所以不能删除
- # if requests_dict["isfirstreq"] == True:
- # 如果是第一次请求 删除过去的csv文件
- # 如果有这个csv文件 删除
- # if os.path.exists(write_tokens_path_name):
- # write_tokens_path_name.unlink()
- cur_zero_txhash = txlist[0]['transactionHash']
- cur_last_txhash = txlist[len(txlist)-1]['transactionHash']
- print(
- f"loop={loop} add={cur_conadd} range={requests_dict['req_params']['startrange'] } {requests_dict['req_params']['endrange']} len={len(txlist)} zero_txhash={cur_zero_txhash} last_txhash={cur_last_txhash}")
- if (len(txlist) < 100):
- # 如果此次請求結果不是最大offset,請求了全部結果,保存文本
- requests_dict["need_remove"] = True
- if req_dict_total.get(key_idx) is not None:
- print(f"remove {cur_conadd} {key_idx}\n", end='')
- del req_dict_total[key_idx]
- if (len(txlist) == 0):
- requests_dict['isrequseting'] = False
- return
- cur_minrange = txlist[len(txlist)-1]["data"][0]["timestamp"]
- cur_maxrange = txlist[0]["data"][0]["timestamp"]
- # 如果没有这个csv文件 直接写入
- cur_file = []
- if not write_tokens_path_name.exists():
- print(f"loop={loop} {cur_conadd} 第一次文件\n", end='')
- else:
- with write_tokens_path_name.open(mode="r") as f:
- cur_file = json.load(f)
- cur_file = cur_file + txlist
- print(f"{cur_conadd} 全部写入\n", end='')
- with write_tokens_path_name.open(mode='w') as f:
- json.dump(cur_file, f)
- requests_dict['isrequseting'] = False
- return
- elif (len(txlist) >= 100):
- # # 如果此次請求結果為最大offset,請求結果不全,如果沒到最大請求次數,繼續請求,
- # # 否則保存錯誤 但不删除这个token的csv
- requests_dict["need_remove"] = False
- # 更換 start
- cur_minrange = txlist[len(txlist)-1]["data"][0]["timestamp"]
- cur_maxrange = txlist[0]["data"][0]["timestamp"]
- if (cur_maxrange == cur_minrange):
- cur_minrange -= 1
- print("cur_minrange", cur_minrange, cur_maxrange)
- # while (True):
- # if (txlist[len(txlist)-1]["data"][0]["timestamp"] < cur_minrange+1):
- # txlist.pop(len(txlist)-1)
- # continue
- # else:
- # break
- # 如果没有这个csv文件 直接写入
- cur_file = []
- if not write_tokens_path_name.exists():
- print(f"loop={loop} {cur_conadd} 第一次文件\n", end='')
- else:
- with write_tokens_path_name.open(mode="r", encoding='utf-8') as f:
- cur_file = json.load(f)
- cur_file = cur_file + txlist
- print(f"{cur_conadd} 全部写入\n", end='')
- try:
- with write_tokens_path_name.open(mode='w', encoding='utf-8') as f:
- json.dump(cur_file, f)
- except Exception as e:
- print("txlist=", txlist)
- raise
- requests_dict["req_params"]["endrange"] = int(
- cur_minrange)
- baseclass.judge_req_completed(requests_dict=requests_dict,
- key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
- requests_dict['isrequseting'] = False
- return
- return
- def getonetran(address_list, write_tokens_path, offsettimes_max):
- global startrange
- global endrange
- print(f"enter getonetran ")
- req_dict_total = {}
- for idx, address in enumerate(address_list):
- key_idx = str(idx)
- req_dict = baseclass.init_req_dict(
- startrange=startrange,
- endrange=endrange,
- address=address,
- key_idx=key_idx,
- offsettimes_max=offsettimes_max,
- remainder_retry_times_max=5,
- timeout=10)
- req_dict_total[key_idx] = req_dict
- loop = 0
- while (len(req_dict_total.keys()) > 0):
- loop += 1
- # 獲取可以同步請求的address長度 ,traders 長都可能小於 requests_dict定義的長度
- # grequest_len 為 requests_dict長度和實際tokencontracts長度取最小值
- print(
- f"remainder_address_len :{len(req_dict_total.keys())}")
- # temp_req_dict_total = {}
- temp_split_req_dict_total_list = list(req_dict_total.keys())
- temp_split_req_dict_total_list = temp_split_req_dict_total_list[0:20]
- temp_req_dict_total_keys_list = []
- for key in temp_split_req_dict_total_list:
- temp_req_dict_total_keys_list.append(key)
- # 進行異步請求
- i = 0
- thread_list = []
- for key in temp_req_dict_total_keys_list:
- i += 1
- request_onetime(req_dict_total, key, write_tokens_path, loop)
- # thread = threading.Thread(
- # target=request_onetime,
- # args=(
- # req_dict_total, key, write_tokens_path, loop)
- # )
- # thread_list.append(thread)
- # if (i % 4 == 0):
- # # 总数4个,查询一遍
- # for thread in thread_list:
- # thread.start()
- # for thread in thread_list:
- # thread.join()
- # thread_list = []
- # time.sleep(0.8)
- # elif i == len(temp_req_dict_total_keys_list):
- # # 总数不足10个,把剩下的也查询一遍
- # print('remainder len less 5', datetime.datetime.now())
- # for thread in thread_list:
- # thread.start()
- # for thread in thread_list:
- # thread.join()
- # thread_list = []
- time.sleep(0.3)
- return
- def get_3trans_byScan(address_list, write_tokens_path,):
- # 此方法獲取 從bscapi獲取的 的3trans
- # 有 requests_3trans_dict 和requests_tokenadd_dict兩種類型對應contractaddress和address
- # 使用grequests 進行異步請求獲取
- # offsettimes_max 為請求的數據次數,最大總數為 offsettimes_max*offset (offset 為params的offset)
- # param_sort 為請求參數params 的sort
- print("enter get_3trans_byScan_()")
- contractadd_tokentx_list = copy.deepcopy(address_list)
- getonetran(address_list=contractadd_tokentx_list,
- write_tokens_path=write_tokens_path,
- offsettimes_max=5
- )
- return
- now_timestamp = baseclass.get_current_timestamp()
- startrange = now_timestamp - 3*24*3600
- endrange = now_timestamp+2*24*3600
- endrange = 1717323091
- print(f"startrang= {startrange} endrange= {endrange}")
- df = pd.read_excel(baseclass.dalao_total_ana_fm_path /
- "anaing_dalao.xlsx", dtype=object)
- arr_str_dalao = df["add"].tolist()
-
- print('token_contracts_list', len(arr_str_dalao))
- arr_len_too_max_dalaoaddress = []
- get_3trans_byScan(address_list=arr_str_dalao,
- write_tokens_path=baseclass.dalao_transfer_solanafm_path
- )
- print("arr_len_too_max_dalaoaddress=", arr_len_too_max_dalaoaddress)
- print(f"startrang= {startrange} endrange= {endrange}")
- print(f"{'{:<6}'.format('END')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------")
|