004_notuse_get_dalaotransfer_solanafm_byaccount.py 11 KB


  1. from base_class import *
  2. from base_class import BaseVariableFunction
  3. import urllib.parse
  4. import shutil
  5. import random
  6. import json
  7. old_print = print
  8. def timestamped_print(*args, **kwargs):
  9. old_print(datetime.datetime.utcnow().replace(
  10. microsecond=0), *args, **kwargs)
  11. print = timestamped_print
  12. baseclass = BaseVariableFunction(__file__)
  13. print('\n'*5)
  14. print(f"{'{:<6}'.format('ENTER')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------")
  15. baseclass.makedirpath(baseclass.dalao_transfer_solanafm_path)
  16. def request_onetime(req_dict_total, key_idx, write_tokens_path, loop):
  17. requests_dict = req_dict_total.get(key_idx)
  18. if requests_dict is None:
  19. return
  20. elif requests_dict['isrequseting'] == True:
  21. return
  22. elif requests_dict['need_remove'] == True:
  23. print(f"key_idx {key_idx} need_remove\n", end='')
  24. if req_dict_total.get(key_idx) is not None:
  25. print(f"remove {cur_conadd} {key_idx}\n", end='')
  26. del req_dict_total[key_idx]
  27. return
  28. requests_dict['isrequseting'] = True
  29. cur_conadd = requests_dict["req_params"]["address"]
  30. print(f"enter key_idx={key_idx} cur_conadd={cur_conadd}\n", end='')
  31. # 如果此次請求結果為最大offset,請求結果不全,如果沒到最大請求次數,繼續請求,
  32. # 否則保存錯誤 但不删除这个token的csv
  33. if requests_dict["offsettimes"] >= requests_dict["offsettimes_max"]:
  34. requests_dict["need_remove"] = True
  35. if req_dict_total.get(key_idx) is not None:
  36. print(f"remove {cur_conadd} {key_idx}\n", end='')
  37. del req_dict_total[key_idx]
  38. print(
  39. f"{cur_conadd} length to max\n", end='')
  40. return
  41. req_response = None
  42. req_url = f"https://api.solana.fm/v0/accounts/{requests_dict['req_params']['address']}/transfers?utcFrom={requests_dict['req_params']['startrange']}&utcTo={requests_dict['req_params']['endrange']}&limit=100&page=1"
  43. try:
  44. req_response = requests.get(
  45. req_url, timeout=requests_dict['timeout'], headers=baseclass.solana_fm_headers)
  46. time.sleep(0.2)
  47. except Exception as e:
  48. print(f"cur_conadd= {cur_conadd} errmsg={ str(e)}")
  49. if (" Max retries exceeded" in str(e)):
  50. time.sleep(2+random.random())
  51. requests_dict['timeout'] = 15
  52. requests_dict['isrequseting'] = False
  53. return
  54. requests_dict["req_response"] = req_response
  55. write_tokens_path_name = baseclass.dalao_transfer_solanafm_path / \
  56. f"{cur_conadd}.json"
  57. # 请求结果不成功
  58. if requests_dict["req_response"].status_code != 200:
  59. print(
  60. f"cur_conadd= {cur_conadd} response status_code= {requests_dict['req_response'].status_code}")
  61. if requests_dict["req_response"].status_code == 429:
  62. time.sleep(0.4)
  63. requests_dict['isrequseting'] = False
  64. return
  65. # 如果請求結果為成功
  66. tokenresponse = json.loads(
  67. requests_dict["req_response"].text)
  68. # 結果不正常 continue
  69. if tokenresponse["status"] != "success":
  70. baseclass.handle_IF_tokenresponse_NOTOK(tokenresponse=tokenresponse,
  71. cur_conadd=cur_conadd,
  72. key_idx=key_idx,
  73. requests_dict=requests_dict,
  74. req_dict_total=req_dict_total)
  75. requests_dict['isrequseting'] = False
  76. return
  77. # 結果正常 獲取txlist
  78. txlist = None
  79. txlist = tokenresponse["results"]
  80. if (txlist == None or len(txlist) == 0):
  81. print(f"loop={loop} add={cur_conadd} txlisst={txlist}")
  82. requests_dict["need_remove"] = True
  83. if req_dict_total.get(key_idx) is not None:
  84. print(f"remove {cur_conadd} {key_idx}\n", end='')
  85. del req_dict_total[key_idx]
  86. requests_dict['isrequseting'] = False
  87. return
  88. requests_dict["offsettimes"] += 1
  89. if requests_dict["offsettimes"] == 1:
  90. requests_dict["isfirstreq"] = True
  91. else:
  92. requests_dict["isfirstreq"] = False
  93. if requests_dict["isfirstreq"] == True:
  94. # 如果是第一次请求 删除过去的csv文件
  95. # 如果有这个csv文件 删除
  96. if os.path.exists(write_tokens_path_name):
  97. write_tokens_path_name.unlink()
  98. cur_zero_txhash = txlist[0]['transactionHash']
  99. cur_last_txhash = txlist[len(txlist)-1]['transactionHash']
  100. print(
  101. f"loop={loop} add={cur_conadd} range={requests_dict['req_params']['startrange'] } {requests_dict['req_params']['endrange']} len={len(txlist)} zero_txhash={cur_zero_txhash} last_txhash={cur_last_txhash}")
  102. if (len(txlist) < 100):
  103. # 如果此次請求結果不是最大offset,請求了全部結果,保存文本
  104. requests_dict["need_remove"] = True
  105. if req_dict_total.get(key_idx) is not None:
  106. print(f"remove {cur_conadd} {key_idx}\n", end='')
  107. del req_dict_total[key_idx]
  108. if (len(txlist) == 0):
  109. requests_dict['isrequseting'] = False
  110. return
  111. cur_minrange = txlist[len(txlist)-1]["data"][0]["timestamp"]
  112. cur_maxrange = txlist[0]["data"][0]["timestamp"]
  113. # 如果没有这个csv文件 直接写入
  114. cur_file = []
  115. if not write_tokens_path_name.exists():
  116. print(f"loop={loop} {cur_conadd} 第一次文件\n", end='')
  117. else:
  118. with write_tokens_path_name.open(mode="r") as f:
  119. cur_file = json.load(f)
  120. cur_file = cur_file + txlist
  121. print(f"{cur_conadd} 全部写入\n", end='')
  122. with write_tokens_path_name.open(mode='w') as f:
  123. json.dump(cur_file, f)
  124. requests_dict['isrequseting'] = False
  125. return
  126. elif (len(txlist) >= 100):
  127. # # 如果此次請求結果為最大offset,請求結果不全,如果沒到最大請求次數,繼續請求,
  128. # # 否則保存錯誤 但不删除这个token的csv
  129. requests_dict["need_remove"] = False
  130. # 更換 start
  131. cur_minrange = txlist[len(txlist)-1]["data"][0]["timestamp"]
  132. cur_maxrange = txlist[0]["data"][0]["timestamp"]
  133. while (True):
  134. if (txlist[len(txlist)-1]["data"][0]["timestamp"] < cur_minrange+1):
  135. txlist.pop(len(txlist)-1)
  136. continue
  137. else:
  138. break
  139. # 如果没有这个csv文件 直接写入
  140. cur_file = []
  141. if not write_tokens_path_name.exists():
  142. print(f"loop={loop} {cur_conadd} 第一次文件\n", end='')
  143. else:
  144. with write_tokens_path_name.open(mode="r") as f:
  145. cur_file = json.load(f)
  146. cur_file = cur_file + txlist
  147. print(f"{cur_conadd} 全部写入\n", end='')
  148. with write_tokens_path_name.open(mode='w') as f:
  149. json.dump(cur_file, f)
  150. requests_dict["req_params"]["endrange"] = int(
  151. cur_minrange+1)
  152. baseclass.judge_req_completed(requests_dict=requests_dict,
  153. key_idx=key_idx, cur_conadd=cur_conadd, req_dict_total=req_dict_total)
  154. requests_dict['isrequseting'] = False
  155. return
  156. return
  157. def getonetran(address_list, write_tokens_path, offsettimes_max):
  158. global startrange
  159. global endrange
  160. print(f"enter getonetran ")
  161. req_dict_total = {}
  162. for idx, address in enumerate(address_list):
  163. key_idx = str(idx)
  164. req_dict = baseclass.init_req_dict(
  165. startrange=startrange,
  166. endrange=endrange,
  167. address=address,
  168. key_idx=key_idx,
  169. offsettimes_max=offsettimes_max,
  170. remainder_retry_times_max=5,
  171. timeout=20)
  172. req_dict_total[key_idx] = req_dict
  173. loop = 0
  174. while (len(req_dict_total.keys()) > 0):
  175. loop += 1
  176. # 獲取可以同步請求的address長度 ,traders 長都可能小於 requests_dict定義的長度
  177. # grequest_len 為 requests_dict長度和實際tokencontracts長度取最小值
  178. print(
  179. f"remainder_address_len :{len(req_dict_total.keys())}")
  180. # temp_req_dict_total = {}
  181. temp_split_req_dict_total_list = list(req_dict_total.keys())
  182. temp_split_req_dict_total_list = temp_split_req_dict_total_list[0:20]
  183. temp_req_dict_total_keys_list = []
  184. for key in temp_split_req_dict_total_list:
  185. temp_req_dict_total_keys_list.append(key)
  186. # 進行異步請求
  187. i = 0
  188. thread_list = []
  189. for key in temp_req_dict_total_keys_list:
  190. i += 1
  191. request_onetime(req_dict_total, key, write_tokens_path, loop)
  192. time.sleep(0.1)
  193. # thread = threading.Thread(
  194. # target=request_onetime,
  195. # args=(
  196. # req_dict_total, key, write_tokens_path, loop)
  197. # )
  198. # thread_list.append(thread)
  199. # if (i % 4 == 0):
  200. # # 总数4个,查询一遍
  201. # for thread in thread_list:
  202. # thread.start()
  203. # for thread in thread_list:
  204. # thread.join()
  205. # thread_list = []
  206. # time.sleep(0.8)
  207. # elif i == len(temp_req_dict_total_keys_list):
  208. # # 总数不足10个,把剩下的也查询一遍
  209. # print('remainder len less 5', datetime.datetime.now())
  210. # for thread in thread_list:
  211. # thread.start()
  212. # for thread in thread_list:
  213. # thread.join()
  214. # thread_list = []
  215. time.sleep(0.3)
  216. return
  217. def get_3trans_byScan(address_list, write_tokens_path,):
  218. # 此方法獲取 從bscapi獲取的 的3trans
  219. # 有 requests_3trans_dict 和requests_tokenadd_dict兩種類型對應contractaddress和address
  220. # 使用grequests 進行異步請求獲取
  221. # offsettimes_max 為請求的數據次數,最大總數為 offsettimes_max*offset (offset 為params的offset)
  222. # param_sort 為請求參數params 的sort
  223. print("enter get_3trans_byScan_()")
  224. contractadd_tokentx_list = copy.deepcopy(address_list)
  225. getonetran(address_list=contractadd_tokentx_list,
  226. write_tokens_path=write_tokens_path,
  227. offsettimes_max=6
  228. )
  229. return
  230. now_timestamp = baseclass.get_current_timestamp()
  231. startrange = now_timestamp - 17*24*3600
  232. # startrange = now_timestamp - 2*24*3600
  233. endrange = now_timestamp+2*24*3600
  234. print(f"startrang= {startrange} endrange= {endrange}")
  235. df = pd.read_csv(baseclass.dalao_merge_path /
  236. "filter_dalao.csv", dtype=object)
  237. arr_str_dalao = df["dalaoAddress"].tolist()
  238. print('token_contracts_list', len(arr_str_dalao))
  239. get_3trans_byScan(address_list=arr_str_dalao,
  240. write_tokens_path=baseclass.dalao_transfer_solanafm_path
  241. )
  242. print(f"startrang= {startrange} endrange= {endrange}")
  243. print(f"{'{:<6}'.format('END')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------")