009_gettokentx.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. # bef 获取一段时间(区块)的所有安全token 保存到 f"{librarydata_path}{biaoshi}tokens.csv" 和.xlsx
  2. # now 通过bacscan 获取一个token的在一定时间的tokentx 保存到 contracttokentx_path
  3. #import pathlib
  4. import requests
  5. import threading
  6. #import json
  7. import shutil
  8. #import datetime
  9. #import os
  10. #import time
  11. #import pandas as pd
  12. #import numpy as np
  13. #import copy
  14. import urllib.parse
  15. from base_class import BaseVariableFunction
  16. from base_class import *
  17. old_print = print
  18. def timestamped_print(*args, **kwargs):
  19. old_print(datetime.datetime.utcnow().replace(
  20. microsecond=0), *args, **kwargs)
  21. print = timestamped_print
  22. baseclass = BaseVariableFunction(__file__)
  23. base_library = baseclass.open_base_library()
  24. # biaoshi = base_library["biaoshi"]
  25. # 记录一个token获取的contracttokentx
  26. baseclass.makedirpath(baseclass.contracttokentx_path)
  27. # errorcontracttokentx一般指tokentx的hash过多
  28. # baseclass.makedirpath(baseclass.errorcontracttokentx_path)
  29. print('\n'*5)
  30. print(f"{'{:<6}'.format('ENTER')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------")
  31. # { "status": "0", "message": "No transactions found", "result": [] }
  32. # { "status": "0", "message": "NOTOK", "result": "Max rate limit reached, please use API Key for higher rate limit" }
  33. # {'status': '0', 'message': 'NOTOK', 'result': 'Error! Invalid contract address format'}
  34. apikeys = [
  35. "1343VMST2JRK8P39A88KU8E97VGY36WFDI",
  36. "TCH422YS6KIUFFJM34IQ4JX9V5K1Q1R4GX",
  37. "GD7FYJH4YJ13WNIEQUCYEGQEIE31A5XVXY",
  38. "WJG7C1BEVRIX8KTJ57JNB1FPKKP27D3H1Q",
  39. "IAI2DEISA4W81WDD54HGSK3X9TAC8NTUSB",
  40. "GIP2AB3M4B91MT396AIX2WGRSG2REGIE8A"
  41. ]
  42. def request_onetime(req_dict_total, key_idx, write_tokens_path,
  43. # writerrorpath, writeerrorname, errorfile,
  44. loop):
  45. requests_dict = req_dict_total.get(key_idx)
  46. if requests_dict is None:
  47. return
  48. elif requests_dict['isrequseting'] == True:
  49. print(f"key_idx {key_idx} is isrequseting\n", end='')
  50. return
  51. requests_dict['isrequseting'] = True
  52. action = requests_dict['action']
  53. actiontype = requests_dict['actiontype']
  54. cur_conadd = baseclass.get_cur_conadd(
  55. requests_dict=requests_dict, action=action, actiontype=actiontype)
  56. print(f"enter key_idx={key_idx} cur_conadd={cur_conadd}\n", end='')
  57. # 如果此次請求結果為最大offset,請求結果不全,如果沒到最大請求次數,繼續請求,
  58. # 否則保存錯誤 但不删除这个token的csv
  59. if requests_dict["offsettimes"] >= requests_dict["offsettimes_max"]:
  60. requests_dict["need_remove"] = True
  61. if req_dict_total.get(key_idx) is not None:
  62. print(f"remove {action} {cur_conadd} {key_idx}\n", end='')
  63. del req_dict_total[key_idx]
  64. print(
  65. f"{cur_conadd} length to max\n",end='')
  66. return
  67. req_response = None
  68. try:
  69. req_response = requests.get(
  70. baseclass.baseurl, requests_dict['req_params'], timeout=requests_dict['timeout'])
  71. except Exception as e:
  72. print(f"{cur_conadd} errmsg={ str(e)}")
  73. requests_dict['timeout'] = 30
  74. requests_dict['isrequseting'] = False
  75. return
  76. requests_dict["req_response"] = req_response
  77. write_tokens_path_name = baseclass.get_write_conadd_path_file(cur_conadd=cur_conadd,
  78. write_tokens_path=write_tokens_path,
  79. action=action,
  80. actiontype=actiontype,
  81. match_address_index_dic=match_address_index_dic)
  82. # 请求结果不成功
  83. if requests_dict["req_response"].status_code != 200:
  84. print(
  85. f"cur_conadd {cur_conadd} response is {requests_dict['req_response'].status_code}")
  86. requests_dict['isrequseting'] = False
  87. return
  88. # 如果請求結果為成功
  89. tokenresponse = json.loads(
  90. requests_dict["req_response"].text)
  91. # 結果不正常 continue
  92. if tokenresponse["message"] != "OK":
  93. baseclass.handle_IF_tokenresponse_NOTOK(tokenresponse=tokenresponse,
  94. action=action,
  95. cur_conadd=cur_conadd,
  96. key_idx=key_idx,
  97. requests_dict=requests_dict,
  98. req_dict_total=req_dict_total)
  99. return
  100. # 結果正常 獲取txlist
  101. txlist = None
  102. add_df = None
  103. txlist = tokenresponse["result"]
  104. add_df = pd.DataFrame(txlist)
  105. add_df["blockNumber"] = add_df["blockNumber"].astype(int)
  106. requests_dict["offsettimes"] += 1
  107. if requests_dict["offsettimes"] == 1:
  108. requests_dict["isfirstreq"] = True
  109. else:
  110. requests_dict["isfirstreq"] = False
  111. # if requests_dict["isfirstreq"] == True:
  112. # 如果是第一次请求 删除过去的csv文件
  113. # 如果有这个csv文件 删除
  114. # if os.path.exists(write_tokens_path_name):
  115. # write_tokens_path_name.unlink()
  116. if (len(add_df) < 10000):
  117. # 如果此次請求結果不是最大offset,請求了全部結果,保存文本
  118. requests_dict["need_remove"] = True
  119. if req_dict_total.get(key_idx) is not None:
  120. print(f"remove {action} {cur_conadd} {key_idx}\n", end='')
  121. del req_dict_total[key_idx]
  122. isdfTrue, add_df = baseclass.fir_handle_decivalue_confirmations_appdropcol_df(
  123. df=add_df, action=action, actiontype=actiontype)
  124. if not isdfTrue:
  125. requests_dict["req_params"]["startblock"] = int(
  126. requests_dict["endblock"]+1)
  127. baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
  128. key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
  129. requests_dict['isrequseting']=False
  130. return
  131. add_df = add_df[["blockNumber", "timeStamp", "hash","from","to","value"]]
  132. # ,,,,,,,,,,,,
  133. # 未来可以改成 先判断 blockNumber 的取值范围 再发起请求
  134. cur_minblock = int(add_df["blockNumber"].min())
  135. cur_maxblock = int(add_df["blockNumber"].max())
  136. # 如果没有这个csv文件
  137. if not write_tokens_path_name.exists():
  138. print(f"loop={loop} {cur_conadd} 第一次文件\n", end='')
  139. add_df.to_csv(
  140. write_tokens_path_name, mode='w', index=False)
  141. # 请求的startblock 置为比endblok大 结束请求
  142. requests_dict["req_params"]["startblock"] = int(
  143. requests_dict["endblock"]+1)
  144. baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
  145. key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
  146. requests_dict['isrequseting']=False
  147. return
  148. cur_file_df = pd.read_csv(
  149. write_tokens_path_name, dtype=object)
  150. if len(cur_file_df.index) == 0:
  151. print(f"{cur_conadd} 文本行数为0 全部写入\n", end='')
  152. add_df.to_csv(
  153. write_tokens_path_name, mode='a', header=False, index=False)
  154. # 请求的startblock 置为比endblok大 结束请求
  155. requests_dict["req_params"]["startblock"] = int(
  156. requests_dict["endblock"]+1)
  157. baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
  158. key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
  159. requests_dict['isrequseting']=False
  160. return
  161. requests_dict = baseclass.savedfblock(cur_conadd, add_df, cur_file_df, requests_dict, write_tokens_path_name,
  162. cur_minblock, cur_maxblock,loop)
  163. baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
  164. key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
  165. requests_dict['isrequseting']=False
  166. return
  167. elif (len(add_df) >= 10000):
  168. # # 如果此次請求結果為最大offset,請求結果不全,如果沒到最大請求次數,繼續請求,
  169. # # 否則保存錯誤 但不删除这个token的csv
  170. requests_dict["need_remove"] = False
  171. # 更換 startblock
  172. cur_minblock = int(add_df["blockNumber"].min())
  173. cur_maxblock = int(add_df["blockNumber"].max()-1)
  174. add_df = add_df[add_df["blockNumber"] <=
  175. cur_maxblock].reset_index(drop=True)
  176. isdfTrue, add_df = baseclass.fir_handle_decivalue_confirmations_appdropcol_df(
  177. df=add_df, action=action, actiontype=actiontype)
  178. if not isdfTrue:
  179. requests_dict["req_params"]["startblock"] = int(
  180. cur_maxblock+1)
  181. baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
  182. key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
  183. requests_dict['isrequseting']=False
  184. return
  185. add_df = add_df[["blockNumber", "timeStamp", "hash","from","to","value"]]
  186. # 如果没有这个csv文件 直接写入
  187. if not write_tokens_path_name.exists():
  188. print(f"loop={loop} {cur_conadd} 第一次文件\n", end='')
  189. add_df.to_csv(
  190. write_tokens_path_name, mode='w', index=False)
  191. # 请求的startblock 置为比endblok大 结束请求
  192. requests_dict["req_params"]["startblock"] = int(
  193. cur_maxblock+1)
  194. baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
  195. key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
  196. requests_dict['isrequseting']=False
  197. return
  198. # 如果已经存在 打开已有的csv 文件 和当前的add_df 以及 整个tokentx 的startblock 和 endblock 进行比较
  199. cur_file_df = pd.read_csv(
  200. write_tokens_path_name, dtype=object)
  201. if len(cur_file_df.index) == 0:
  202. print(f"{cur_conadd} 文本行数为0 全部写入\n", end='')
  203. add_df.to_csv(
  204. write_tokens_path_name, mode='a', header=False, index=False)
  205. requests_dict["req_params"]["startblock"] = int(
  206. cur_maxblock+1)
  207. baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
  208. key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
  209. requests_dict['isrequseting']=False
  210. return
  211. requests_dict = baseclass.savedfblock(cur_conadd, add_df, cur_file_df, requests_dict, write_tokens_path_name,
  212. cur_minblock, cur_maxblock,loop)
  213. baseclass.judge_req_completed(requests_dict=requests_dict, action=action,
  214. key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
  215. requests_dict['isrequseting']=False
  216. return
  217. return
  218. def getonetran(address_list, write_tokens_path,
  219. action,
  220. actiontype,
  221. offsettimes_max):
  222. print(f"enter getonetran action={action} actiontype={actiontype}")
  223. req_dict_total = {}
  224. for idx, address in enumerate(address_list):
  225. key_idx = str(idx)
  226. req_dict = baseclass.init_req_dict(
  227. startblock=obj_match_block[address][0],
  228. endblock=obj_match_block[address][1],
  229. address=address,
  230. key_idx=key_idx,
  231. offsettimes_max=offsettimes_max,
  232. action=action,
  233. actiontype=actiontype,
  234. apikey=apikeys[idx % len(
  235. apikeys)],
  236. remainder_retry_times_max=5,
  237. timeout=20)
  238. req_dict_total[key_idx] = req_dict
  239. loop=0
  240. while (len(req_dict_total.keys())>0):
  241. loop +=1
  242. # 獲取可以同步請求的address長度 ,traders 長都可能小於 requests_dict定義的長度
  243. # grequest_len 為 requests_dict長度和實際tokencontracts長度取最小值
  244. print(
  245. f"remainder_{action}_address_len :{len(req_dict_total.keys())}")
  246. temp_req_dict_total = {}
  247. for key in list(req_dict_total.keys()):
  248. temp_req_dict_total[key] = copy.deepcopy(req_dict_total[key])
  249. # 進行異步請求
  250. i = 0
  251. thread_list = []
  252. for key in list(temp_req_dict_total.keys()):
  253. i += 1
  254. thread = threading.Thread(
  255. target=request_onetime,
  256. args=(
  257. req_dict_total, key, write_tokens_path,
  258. loop)
  259. )
  260. thread_list.append(thread)
  261. if (i % 5 == 0):
  262. # 总数10个,查询一遍
  263. for thread in thread_list:
  264. thread.start()
  265. thread_list = []
  266. time.sleep(1.5)
  267. elif i == len(temp_req_dict_total.keys()):
  268. # 总数不足10个,把剩下的也查询一遍
  269. print('remainder len less 10' )
  270. for thread in thread_list:
  271. thread.start()
  272. thread_list = []
  273. time.sleep(3)
  274. return
  275. def get_3trans_byScan(address_list, write_tokens_path,
  276. ):
  277. # 此方法獲取 從bscapi獲取的 的3trans
  278. # 有 requests_3trans_dict 和requests_tokenadd_dict兩種類型對應contractaddress和address
  279. # 使用grequests 進行異步請求獲取
  280. # offsettimes_max 為請求的數據次數,最大總數為 offsettimes_max*offset (offset 為params的offset)
  281. # param_sort 為請求參數params 的sort
  282. print("enter get_3trans_byScan_()")
  283. contractadd_tokentx_list = copy.deepcopy(address_list)
  284. getonetran(address_list=contractadd_tokentx_list,
  285. write_tokens_path=write_tokens_path,
  286. action="tokentx",
  287. actiontype="contractaddress",
  288. offsettimes_max=9999999)
  289. return
  290. def gettokens_bytokenAddress():
  291. print("enter gettokens_bytokenAddress_()")
  292. df = pd.read_csv(
  293. baseclass.librarydata_path/f"tokens.csv", dtype="object")
  294. df=df.astype({
  295. "startblock":int,
  296. "endblock":int
  297. })
  298. contractaddress_list = df['contractaddress'].tolist()
  299. arr_startdate = df['startdate'].tolist()
  300. arr_enddate = df['enddate'].tolist()
  301. arr_startblock = df['startblock'].tolist()
  302. arr_endblock = df['endblock'].tolist()
  303. # contractaddress_list =["0xfeb9d41cc44c1ab22722cd51aee8accaa5885f2a"]
  304. # contractaddress_list = contractaddress_list[0:6]
  305. for idx in range(0,len(contractaddress_list)):
  306. startblock = 0
  307. endblock =0
  308. contractaddress = contractaddress_list[idx]
  309. match_address_index_dic[contractaddress] = contractaddress
  310. startdate = arr_startdate[idx]
  311. enddate = arr_enddate[idx]
  312. startblock = arr_startblock[idx]
  313. endblock = arr_endblock[idx]
  314. if startblock==0:
  315. startblock = baseclass.get_blocknumber_byisostr_bybscscan(isostr=startdate )
  316. if endblock==0:
  317. endblock = baseclass.get_blocknumber_byisostr_bybscscan(isostr= enddate )
  318. obj_match_block[contractaddress]=[startblock , endblock]
  319. print(f"contractaddress={contractaddress} startblock={startblock} endblock={endblock}")
  320. arr_startblock[idx] = startblock
  321. arr_endblock[idx] = endblock
  322. df["startblock"]=arr_startblock
  323. df["endblock"]=arr_endblock
  324. df.to_csv(
  325. baseclass.librarydata_path/f"tokens.csv", index=False)
  326. return contractaddress_list
  327. match_address_index_dic = {}
  328. obj_match_block={}
  329. delete_address_arr = []
  330. token_contracts_list = gettokens_bytokenAddress()
  331. print('token_contracts_list=', len(token_contracts_list))
  332. # raise
  333. get_3trans_byScan(address_list=token_contracts_list,
  334. write_tokens_path=baseclass.contracttokentx_path,
  335. )
  336. print(f"{'{:<6}'.format('END')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------")