123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400 |
- # bef 获取一段时间(区块)的所有安全token 保存到 f"{librarydata_path}{biaoshi}tokens.csv" 和.xlsx
- # now 通过bacscan 获取一个token的在一定时间的tokentx 保存到 contracttokentx_path
- #import pathlib
- import requests
- import threading
- #import json
- import shutil
- #import datetime
- #import os
- #import time
- #import pandas as pd
- #import numpy as np
- #import copy
- import urllib.parse
- from base_class import BaseVariableFunction
- from base_class import *
- old_print = print
- def timestamped_print(*args, **kwargs):
- old_print(datetime.datetime.utcnow().replace(
- microsecond=0), *args, **kwargs)
- print = timestamped_print
- baseclass = BaseVariableFunction(__file__)
- base_library = baseclass.open_base_library()
- # biaoshi = base_library["biaoshi"]
- # 记录一个token获取的contracttokentx
- baseclass.makedirpath(baseclass.contracttokentx_path)
- # errorcontracttokentx一般指tokentx的hash过多
- # baseclass.makedirpath(baseclass.errorcontracttokentx_path)
-
- print('\n'*5)
- print(f"{'{:<6}'.format('ENTER')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------")
- # { "status": "0", "message": "No transactions found", "result": [] }
- # { "status": "0", "message": "NOTOK", "result": "Max rate limit reached, please use API Key for higher rate limit" }
- # {'status': '0', 'message': 'NOTOK', 'result': 'Error! Invalid contract address format'}
-
- apikeys = [
- "1343VMST2JRK8P39A88KU8E97VGY36WFDI",
- "TCH422YS6KIUFFJM34IQ4JX9V5K1Q1R4GX",
- "GD7FYJH4YJ13WNIEQUCYEGQEIE31A5XVXY",
- "WJG7C1BEVRIX8KTJ57JNB1FPKKP27D3H1Q",
- "IAI2DEISA4W81WDD54HGSK3X9TAC8NTUSB",
- "GIP2AB3M4B91MT396AIX2WGRSG2REGIE8A"
- ]
-
-
-
- def request_onetime(req_dict_total, key_idx, write_tokens_path,
- # writerrorpath, writeerrorname, errorfile,
- loop):
-
- requests_dict = req_dict_total.get(key_idx)
- if requests_dict is None:
- return
- elif requests_dict['isrequseting'] == True:
- print(f"key_idx {key_idx} is isrequseting\n", end='')
- return
- requests_dict['isrequseting'] = True
- action = requests_dict['action']
- actiontype = requests_dict['actiontype']
- cur_conadd = baseclass.get_cur_conadd(
- requests_dict=requests_dict, action=action, actiontype=actiontype)
- print(f"enter key_idx={key_idx} cur_conadd={cur_conadd}\n", end='')
- # 如果此次請求結果為最大offset,請求結果不全,如果沒到最大請求次數,繼續請求,
- # 否則保存錯誤 但不删除这个token的csv
- if requests_dict["offsettimes"] >= requests_dict["offsettimes_max"]:
- requests_dict["need_remove"] = True
- if req_dict_total.get(key_idx) is not None:
- print(f"remove {action} {cur_conadd} {key_idx}\n", end='')
- del req_dict_total[key_idx]
- print(
- f"{cur_conadd} length to max\n",end='')
- return
- req_response = None
- try:
- req_response = requests.get(
- baseclass.baseurl, requests_dict['req_params'], timeout=requests_dict['timeout'])
- except Exception as e:
- print(f"{cur_conadd} errmsg={ str(e)}")
- requests_dict['timeout'] = 30
- requests_dict['isrequseting'] = False
- return
- requests_dict["req_response"] = req_response
- write_tokens_path_name = baseclass.get_write_conadd_path_file(cur_conadd=cur_conadd,
- write_tokens_path=write_tokens_path,
- action=action,
- actiontype=actiontype,
- match_address_index_dic=match_address_index_dic)
- # 请求结果不成功
- if requests_dict["req_response"].status_code != 200:
- print(
- f"cur_conadd {cur_conadd} response is {requests_dict['req_response'].status_code}")
- requests_dict['isrequseting'] = False
- return
- # 如果請求結果為成功
- tokenresponse = json.loads(
- requests_dict["req_response"].text)
- # 結果不正常 continue
- if tokenresponse["message"] != "OK":
- baseclass.handle_IF_tokenresponse_NOTOK(tokenresponse=tokenresponse,
- action=action,
- cur_conadd=cur_conadd,
- key_idx=key_idx,
- requests_dict=requests_dict,
- req_dict_total=req_dict_total)
- return
- # 結果正常 獲取txlist
- txlist = None
- add_df = None
- txlist = tokenresponse["result"]
- add_df = pd.DataFrame(txlist)
- add_df["blockNumber"] = add_df["blockNumber"].astype(int)
- requests_dict["offsettimes"] += 1
- if requests_dict["offsettimes"] == 1:
- requests_dict["isfirstreq"] = True
- else:
- requests_dict["isfirstreq"] = False
- # if requests_dict["isfirstreq"] == True:
- # 如果是第一次请求 删除过去的csv文件
- # 如果有这个csv文件 删除
- # if os.path.exists(write_tokens_path_name):
- # write_tokens_path_name.unlink()
- if (len(add_df) < 10000):
- # 如果此次請求結果不是最大offset,請求了全部結果,保存文本
- requests_dict["need_remove"] = True
- if req_dict_total.get(key_idx) is not None:
- print(f"remove {action} {cur_conadd} {key_idx}\n", end='')
- del req_dict_total[key_idx]
- isdfTrue, add_df = baseclass.fir_handle_decivalue_confirmations_appdropcol_df(
- df=add_df, action=action, actiontype=actiontype)
- if not isdfTrue:
- requests_dict["req_params"]["startblock"] = int(
- requests_dict["endblock"]+1)
- baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
- key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
- requests_dict['isrequseting']=False
- return
-
- add_df = add_df[["blockNumber", "timeStamp", "hash","from","to","value"]]
- # ,,,,,,,,,,,,
- # 未来可以改成 先判断 blockNumber 的取值范围 再发起请求
- cur_minblock = int(add_df["blockNumber"].min())
- cur_maxblock = int(add_df["blockNumber"].max())
- # 如果没有这个csv文件
- if not write_tokens_path_name.exists():
- print(f"loop={loop} {cur_conadd} 第一次文件\n", end='')
- add_df.to_csv(
- write_tokens_path_name, mode='w', index=False)
- # 请求的startblock 置为比endblok大 结束请求
- requests_dict["req_params"]["startblock"] = int(
- requests_dict["endblock"]+1)
- baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
- key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
- requests_dict['isrequseting']=False
- return
- cur_file_df = pd.read_csv(
- write_tokens_path_name, dtype=object)
- if len(cur_file_df.index) == 0:
- print(f"{cur_conadd} 文本行数为0 全部写入\n", end='')
- add_df.to_csv(
- write_tokens_path_name, mode='a', header=False, index=False)
- # 请求的startblock 置为比endblok大 结束请求
- requests_dict["req_params"]["startblock"] = int(
- requests_dict["endblock"]+1)
- baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
- key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
- requests_dict['isrequseting']=False
- return
- requests_dict = baseclass.savedfblock(cur_conadd, add_df, cur_file_df, requests_dict, write_tokens_path_name,
- cur_minblock, cur_maxblock,loop)
- baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
- key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
- requests_dict['isrequseting']=False
- return
- elif (len(add_df) >= 10000):
- # # 如果此次請求結果為最大offset,請求結果不全,如果沒到最大請求次數,繼續請求,
- # # 否則保存錯誤 但不删除这个token的csv
- requests_dict["need_remove"] = False
- # 更換 startblock
- cur_minblock = int(add_df["blockNumber"].min())
- cur_maxblock = int(add_df["blockNumber"].max()-1)
- add_df = add_df[add_df["blockNumber"] <=
- cur_maxblock].reset_index(drop=True)
- isdfTrue, add_df = baseclass.fir_handle_decivalue_confirmations_appdropcol_df(
- df=add_df, action=action, actiontype=actiontype)
- if not isdfTrue:
- requests_dict["req_params"]["startblock"] = int(
- cur_maxblock+1)
- baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
- key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
- requests_dict['isrequseting']=False
- return
- add_df = add_df[["blockNumber", "timeStamp", "hash","from","to","value"]]
- # 如果没有这个csv文件 直接写入
- if not write_tokens_path_name.exists():
- print(f"loop={loop} {cur_conadd} 第一次文件\n", end='')
- add_df.to_csv(
- write_tokens_path_name, mode='w', index=False)
- # 请求的startblock 置为比endblok大 结束请求
- requests_dict["req_params"]["startblock"] = int(
- cur_maxblock+1)
- baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
- key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
- requests_dict['isrequseting']=False
- return
- # 如果已经存在 打开已有的csv 文件 和当前的add_df 以及 整个tokentx 的startblock 和 endblock 进行比较
- cur_file_df = pd.read_csv(
- write_tokens_path_name, dtype=object)
- if len(cur_file_df.index) == 0:
- print(f"{cur_conadd} 文本行数为0 全部写入\n", end='')
- add_df.to_csv(
- write_tokens_path_name, mode='a', header=False, index=False)
- requests_dict["req_params"]["startblock"] = int(
- cur_maxblock+1)
- baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
- key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
- requests_dict['isrequseting']=False
- return
- requests_dict = baseclass.savedfblock(cur_conadd, add_df, cur_file_df, requests_dict, write_tokens_path_name,
- cur_minblock, cur_maxblock,loop)
- baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
- key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
- requests_dict['isrequseting']=False
- return
- return
- def getonetran(address_list, write_tokens_path,
- action,
- actiontype,
- offsettimes_max):
- print(f"enter getonetran action={action} actiontype={actiontype}")
- req_dict_total = {}
- for idx, address in enumerate(address_list):
- key_idx = str(idx)
- req_dict = baseclass.init_req_dict(
- startblock=obj_match_block[address][0],
- endblock=obj_match_block[address][1],
- address=address,
- key_idx=key_idx,
- offsettimes_max=offsettimes_max,
- action=action,
- actiontype=actiontype,
- apikey=apikeys[idx % len(
- apikeys)],
- remainder_retry_times_max=5,
- timeout=20)
- req_dict_total[key_idx] = req_dict
- loop=0
- while (len(req_dict_total.keys())>0):
- loop +=1
- # 獲取可以同步請求的address長度 ,traders 長都可能小於 requests_dict定義的長度
- # grequest_len 為 requests_dict長度和實際tokencontracts長度取最小值
- print(
- f"remainder_{action}_address_len :{len(req_dict_total.keys())}")
- temp_req_dict_total = {}
- for key in list(req_dict_total.keys()):
- temp_req_dict_total[key] = copy.deepcopy(req_dict_total[key])
- # 進行異步請求
- i = 0
- thread_list = []
- for key in list(temp_req_dict_total.keys()):
- i += 1
- thread = threading.Thread(
- target=request_onetime,
- args=(
- req_dict_total, key, write_tokens_path,
-
- loop)
- )
- thread_list.append(thread)
- if (i % 5 == 0):
- # 总数10个,查询一遍
- for thread in thread_list:
- thread.start()
- thread_list = []
- time.sleep(1.5)
- elif i == len(temp_req_dict_total.keys()):
- # 总数不足10个,把剩下的也查询一遍
- print('remainder len less 10' )
- for thread in thread_list:
- thread.start()
- thread_list = []
- time.sleep(3)
- return
- def get_3trans_byScan(address_list, write_tokens_path,
- ):
- # 此方法獲取 從bscapi獲取的 的3trans
- # 有 requests_3trans_dict 和requests_tokenadd_dict兩種類型對應contractaddress和address
- # 使用grequests 進行異步請求獲取
- # offsettimes_max 為請求的數據次數,最大總數為 offsettimes_max*offset (offset 為params的offset)
- # param_sort 為請求參數params 的sort
- print("enter get_3trans_byScan_()")
- contractadd_tokentx_list = copy.deepcopy(address_list)
- getonetran(address_list=contractadd_tokentx_list,
- write_tokens_path=write_tokens_path,
- action="tokentx",
- actiontype="contractaddress",
- offsettimes_max=9999999)
- return
- def gettokens_bytokenAddress():
- print("enter gettokens_bytokenAddress_()")
-
- df = pd.read_csv(
- baseclass.librarydata_path/f"tokens.csv", dtype="object")
- df=df.astype({
- "startblock":int,
- "endblock":int
- })
- contractaddress_list = df['contractaddress'].tolist()
- arr_startdate = df['startdate'].tolist()
- arr_enddate = df['enddate'].tolist()
- arr_startblock = df['startblock'].tolist()
- arr_endblock = df['endblock'].tolist()
-
- # contractaddress_list =["0xfeb9d41cc44c1ab22722cd51aee8accaa5885f2a"]
- # contractaddress_list = contractaddress_list[0:6]
- for idx in range(0,len(contractaddress_list)):
- startblock = 0
- endblock =0
- contractaddress = contractaddress_list[idx]
- match_address_index_dic[contractaddress] = contractaddress
- startdate = arr_startdate[idx]
- enddate = arr_enddate[idx]
- startblock = arr_startblock[idx]
- endblock = arr_endblock[idx]
- if startblock==0:
- startblock = baseclass.get_blocknumber_byisostr_bybscscan(isostr=startdate )
- if endblock==0:
- endblock = baseclass.get_blocknumber_byisostr_bybscscan(isostr= enddate )
- obj_match_block[contractaddress]=[startblock , endblock]
- print(f"contractaddress={contractaddress} startblock={startblock} endblock={endblock}")
- arr_startblock[idx] = startblock
- arr_endblock[idx] = endblock
- df["startblock"]=arr_startblock
- df["endblock"]=arr_endblock
- df.to_csv(
- baseclass.librarydata_path/f"tokens.csv", index=False)
- return contractaddress_list
- match_address_index_dic = {}
- obj_match_block={}
- delete_address_arr = []
-
- token_contracts_list = gettokens_bytokenAddress()
- print('token_contracts_list=', len(token_contracts_list))
- # raise
- get_3trans_byScan(address_list=token_contracts_list,
- write_tokens_path=baseclass.contracttokentx_path,
- )
- print(f"{'{:<6}'.format('END')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------")
-
|