021_nouse_get_selected_dalaotransfer_solanafm.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. from base_class import *
  2. from base_class import BaseVariableFunction
  3. import urllib.parse
  4. import shutil
  5. import random
  6. import json
  7. old_print = print
  8. def timestamped_print(*args, **kwargs):
  9. old_print(datetime.datetime.utcnow().replace(
  10. microsecond=0), *args, **kwargs)
  11. print = timestamped_print
  12. baseclass = BaseVariableFunction(__file__)
  13. print('\n'*5)
  14. print(f"{'{:<6}'.format('ENTER')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------")
  15. baseclass.makedirpath(baseclass.dalao_transfer_solanafm_path)
  16. def request_onetime(req_dict_total, key_idx, write_tokens_path, loop):
  17. global arr_len_too_max_dalaoaddress
  18. requests_dict = req_dict_total.get(key_idx)
  19. if requests_dict is None:
  20. return
  21. elif requests_dict['isrequseting'] == True:
  22. return
  23. elif requests_dict['need_remove'] == True:
  24. print(f"key_idx {key_idx} need_remove\n", end='')
  25. if req_dict_total.get(key_idx) is not None:
  26. print(f"remove {cur_conadd} {key_idx}\n", end='')
  27. del req_dict_total[key_idx]
  28. return
  29. requests_dict['isrequseting'] = True
  30. cur_conadd = requests_dict["req_params"]["address"]
  31. print(f"enter key_idx={key_idx} cur_conadd={cur_conadd}\n", end='')
  32. # 如果此次請求結果為最大offset,請求結果不全,如果沒到最大請求次數,繼續請求,
  33. # 否則保存錯誤 但不删除这个token的csv
  34. if requests_dict["offsettimes"] >= requests_dict["offsettimes_max"]:
  35. requests_dict["need_remove"] = True
  36. if req_dict_total.get(key_idx) is not None:
  37. print(f"remove {cur_conadd} {key_idx}\n", end='')
  38. del req_dict_total[key_idx]
  39. print(
  40. f"{cur_conadd} length to max\n", end='')
  41. arr_len_too_max_dalaoaddress.append(cur_conadd)
  42. arr_len_too_max_dalaoaddress.append(
  43. requests_dict['req_params']['endrange'])
  44. return
  45. req_response = None
  46. req_url = f"https://api.solana.fm/v0/accounts/{requests_dict['req_params']['address']}/transfers?utcFrom={requests_dict['req_params']['startrange']}&utcTo={requests_dict['req_params']['endrange']}&limit=100&page=1"
  47. try:
  48. req_response = requests.get(
  49. req_url, timeout=requests_dict['timeout'], headers=baseclass.solana_fm_headers)
  50. time.sleep(0.2)
  51. except Exception as e:
  52. print(f"cur_conadd= {cur_conadd} errmsg={ str(e)}")
  53. if (" Max retries exceeded" in str(e)):
  54. time.sleep(2+random.random())
  55. requests_dict['timeout'] = 15
  56. requests_dict['isrequseting'] = False
  57. return
  58. requests_dict["req_response"] = req_response
  59. write_tokens_path_name = baseclass.dalao_transfer_solanafm_path / \
  60. f"{cur_conadd}.json"
  61. # 请求结果不成功
  62. if requests_dict["req_response"].status_code != 200:
  63. print(
  64. f"cur_conadd= {cur_conadd} response status_code= {requests_dict['req_response'].status_code}")
  65. if requests_dict["req_response"].status_code == 429:
  66. time.sleep(0.4)
  67. requests_dict['isrequseting'] = False
  68. return
  69. # 如果請求結果為成功
  70. tokenresponse = json.loads(
  71. requests_dict["req_response"].text)
  72. # 結果不正常 continue
  73. if tokenresponse["status"] != "success":
  74. baseclass.handle_IF_tokenresponse_NOTOK(tokenresponse=tokenresponse,
  75. cur_conadd=cur_conadd,
  76. key_idx=key_idx,
  77. requests_dict=requests_dict,
  78. req_dict_total=req_dict_total)
  79. requests_dict['isrequseting'] = False
  80. return
  81. # 結果正常 獲取txlist
  82. txlist = None
  83. txlist = tokenresponse["results"]
  84. if (txlist == None or len(txlist) == 0):
  85. print(f"loop={loop} add={cur_conadd} txlisst={txlist}")
  86. requests_dict["need_remove"] = True
  87. if req_dict_total.get(key_idx) is not None:
  88. print(f"remove {cur_conadd} {key_idx}\n", end='')
  89. del req_dict_total[key_idx]
  90. requests_dict['isrequseting'] = False
  91. return
  92. requests_dict["offsettimes"] += 1
  93. if requests_dict["offsettimes"] == 1:
  94. requests_dict["isfirstreq"] = True
  95. else:
  96. requests_dict["isfirstreq"] = False
  97. # 因为只运行一次可能不全 ,会多运行几次 所以不能删除
  98. # if requests_dict["isfirstreq"] == True:
  99. # 如果是第一次请求 删除过去的csv文件
  100. # 如果有这个csv文件 删除
  101. # if os.path.exists(write_tokens_path_name):
  102. # write_tokens_path_name.unlink()
  103. cur_zero_txhash = txlist[0]['transactionHash']
  104. cur_last_txhash = txlist[len(txlist)-1]['transactionHash']
  105. print(
  106. f"loop={loop} add={cur_conadd} range={requests_dict['req_params']['startrange'] } {requests_dict['req_params']['endrange']} len={len(txlist)} zero_txhash={cur_zero_txhash} last_txhash={cur_last_txhash}")
  107. if (len(txlist) < 100):
  108. # 如果此次請求結果不是最大offset,請求了全部結果,保存文本
  109. requests_dict["need_remove"] = True
  110. if req_dict_total.get(key_idx) is not None:
  111. print(f"remove {cur_conadd} {key_idx}\n", end='')
  112. del req_dict_total[key_idx]
  113. if (len(txlist) == 0):
  114. requests_dict['isrequseting'] = False
  115. return
  116. cur_minrange = txlist[len(txlist)-1]["data"][0]["timestamp"]
  117. cur_maxrange = txlist[0]["data"][0]["timestamp"]
  118. # 如果没有这个csv文件 直接写入
  119. cur_file = []
  120. if not write_tokens_path_name.exists():
  121. print(f"loop={loop} {cur_conadd} 第一次文件\n", end='')
  122. else:
  123. with write_tokens_path_name.open(mode="r") as f:
  124. cur_file = json.load(f)
  125. cur_file = cur_file + txlist
  126. print(f"{cur_conadd} 全部写入\n", end='')
  127. with write_tokens_path_name.open(mode='w') as f:
  128. json.dump(cur_file, f)
  129. requests_dict['isrequseting'] = False
  130. return
  131. elif (len(txlist) >= 100):
  132. # # 如果此次請求結果為最大offset,請求結果不全,如果沒到最大請求次數,繼續請求,
  133. # # 否則保存錯誤 但不删除这个token的csv
  134. requests_dict["need_remove"] = False
  135. # 更換 start
  136. cur_minrange = txlist[len(txlist)-1]["data"][0]["timestamp"]
  137. cur_maxrange = txlist[0]["data"][0]["timestamp"]
  138. if (cur_maxrange == cur_minrange):
  139. cur_minrange -= 1
  140. print("cur_minrange", cur_minrange, cur_maxrange)
  141. # while (True):
  142. # if (txlist[len(txlist)-1]["data"][0]["timestamp"] < cur_minrange+1):
  143. # txlist.pop(len(txlist)-1)
  144. # continue
  145. # else:
  146. # break
  147. # 如果没有这个csv文件 直接写入
  148. cur_file = []
  149. if not write_tokens_path_name.exists():
  150. print(f"loop={loop} {cur_conadd} 第一次文件\n", end='')
  151. else:
  152. with write_tokens_path_name.open(mode="r", encoding='utf-8') as f:
  153. cur_file = json.load(f)
  154. cur_file = cur_file + txlist
  155. print(f"{cur_conadd} 全部写入\n", end='')
  156. try:
  157. with write_tokens_path_name.open(mode='w', encoding='utf-8') as f:
  158. json.dump(cur_file, f)
  159. except Exception as e:
  160. print("txlist=", txlist)
  161. raise
  162. requests_dict["req_params"]["endrange"] = int(
  163. cur_minrange)
  164. baseclass.judge_req_completed(requests_dict=requests_dict,
  165. key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
  166. requests_dict['isrequseting'] = False
  167. return
  168. return
  169. def getonetran(address_list, write_tokens_path, offsettimes_max):
  170. global startrange
  171. global endrange
  172. print(f"enter getonetran ")
  173. req_dict_total = {}
  174. for idx, address in enumerate(address_list):
  175. key_idx = str(idx)
  176. req_dict = baseclass.init_req_dict(
  177. startrange=startrange,
  178. endrange=endrange,
  179. address=address,
  180. key_idx=key_idx,
  181. offsettimes_max=offsettimes_max,
  182. remainder_retry_times_max=5,
  183. timeout=10)
  184. req_dict_total[key_idx] = req_dict
  185. loop = 0
  186. while (len(req_dict_total.keys()) > 0):
  187. loop += 1
  188. # 獲取可以同步請求的address長度 ,traders 長都可能小於 requests_dict定義的長度
  189. # grequest_len 為 requests_dict長度和實際tokencontracts長度取最小值
  190. print(
  191. f"remainder_address_len :{len(req_dict_total.keys())}")
  192. # temp_req_dict_total = {}
  193. temp_split_req_dict_total_list = list(req_dict_total.keys())
  194. temp_split_req_dict_total_list = temp_split_req_dict_total_list[0:20]
  195. temp_req_dict_total_keys_list = []
  196. for key in temp_split_req_dict_total_list:
  197. temp_req_dict_total_keys_list.append(key)
  198. # 進行異步請求
  199. i = 0
  200. thread_list = []
  201. for key in temp_req_dict_total_keys_list:
  202. i += 1
  203. request_onetime(req_dict_total, key, write_tokens_path, loop)
  204. # thread = threading.Thread(
  205. # target=request_onetime,
  206. # args=(
  207. # req_dict_total, key, write_tokens_path, loop)
  208. # )
  209. # thread_list.append(thread)
  210. # if (i % 4 == 0):
  211. # # 总数4个,查询一遍
  212. # for thread in thread_list:
  213. # thread.start()
  214. # for thread in thread_list:
  215. # thread.join()
  216. # thread_list = []
  217. # time.sleep(0.8)
  218. # elif i == len(temp_req_dict_total_keys_list):
  219. # # 总数不足10个,把剩下的也查询一遍
  220. # print('remainder len less 5', datetime.datetime.now())
  221. # for thread in thread_list:
  222. # thread.start()
  223. # for thread in thread_list:
  224. # thread.join()
  225. # thread_list = []
  226. time.sleep(0.3)
  227. return
  228. def get_3trans_byScan(address_list, write_tokens_path,):
  229. # 此方法獲取 從bscapi獲取的 的3trans
  230. # 有 requests_3trans_dict 和requests_tokenadd_dict兩種類型對應contractaddress和address
  231. # 使用grequests 進行異步請求獲取
  232. # offsettimes_max 為請求的數據次數,最大總數為 offsettimes_max*offset (offset 為params的offset)
  233. # param_sort 為請求參數params 的sort
  234. print("enter get_3trans_byScan_()")
  235. contractadd_tokentx_list = copy.deepcopy(address_list)
  236. getonetran(address_list=contractadd_tokentx_list,
  237. write_tokens_path=write_tokens_path,
  238. offsettimes_max=5
  239. )
  240. return
  241. now_timestamp = baseclass.get_current_timestamp()
  242. startrange = now_timestamp - 3*24*3600
  243. endrange = now_timestamp+2*24*3600
  244. endrange = 1717323091
  245. print(f"startrang= {startrange} endrange= {endrange}")
  246. df = pd.read_excel(baseclass.dalao_total_ana_fm_path /
  247. "anaing_dalao.xlsx", dtype=object)
  248. arr_str_dalao = df["add"].tolist()
  249. print('token_contracts_list', len(arr_str_dalao))
  250. arr_len_too_max_dalaoaddress = []
  251. get_3trans_byScan(address_list=arr_str_dalao,
  252. write_tokens_path=baseclass.dalao_transfer_solanafm_path
  253. )
  254. print("arr_len_too_max_dalaoaddress=", arr_len_too_max_dalaoaddress)
  255. print(f"startrang= {startrange} endrange= {endrange}")
  256. print(f"{'{:<6}'.format('END')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------")