from playwright.async_api import async_playwright # from datetime import datetime, timezone from playwright.sync_api import expect import random from base_class import BaseVariableFunction from base_class import * baseclass = BaseVariableFunction(__file__) baseclass.makedirpath(baseclass.dalao_activities_solscan_path) old_print = print def timestamped_print(*args, **kwargs): old_print(datetime.datetime.utcnow().replace( microsecond=0), *args, **kwargs) print = timestamped_print print('\n'*5) print(f"{'{:<6}'.format('ENTER')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------") # Python中windows路径的3种写法 # 可以是以下这样写: async def get_tbody_data(page, str_dalaoAddress): arr_tokenname_amount = None arr_tokenlink = None txhash = None try: print(f"enter {str_dalaoAddress} get_tbody_data") tbody_ = page.locator( 'table.w-full tbody') await asyncio.sleep(3) arr_trs = await tbody_.locator('tr').all() arr_trs_datares = [] for idx_tr_ in range(0, len(arr_trs)): arr_tds_datares = [None]*13 arr_trs_datares.append(arr_tds_datares) tr_ = arr_trs[idx_tr_] arr_tds = await tr_.locator('td').all() for idx_td_ in range(0, len(arr_tds)): td_ = arr_tds[idx_td_] if (idx_td_ == 0): continue elif (idx_td_ == 1): txhash = (await td_.text_content()).strip() arr_tds_datares[0] = txhash elif (idx_td_ == 2): dateString = (await td_.text_content()).strip() [dateStr, timeStr] = dateString.split(' ') [month, day, year] = dateStr.split('-') [hours, minutes, seconds] = timeStr.split(':') utc_date = datetime.datetime( int(year), int(month), int(day), int(hours), int(minutes), int(seconds)) timestamp_utc = int(utc_date.replace( tzinfo=datetime.timezone.utc).timestamp()) arr_tds_datares[1] = utc_date.strftime("%Y-%m-%d %H:%M:%S") arr_tds_datares[2] = str(timestamp_utc) elif (idx_td_ == 3): action_type = (await td_.text_content()).strip() if ('SWAP' in action_type): action_type = "SWAP" else: continue arr_tds_datares[3] = action_type elif (idx_td_ == 4): from_address = (await td_.text_content()).strip() arr_tds_datares[4] = from_address elif (idx_td_ == 5): arr_tokenname_amount = (await td_.inner_text()).strip() arr_tokenname_amount = arr_tokenname_amount.replace( ',', '').split('\n') arr_tokenlink = await td_.locator("a").all() out_tokenaddress = await arr_tokenlink[0].get_attribute('href') out_tokenaddress = out_tokenaddress.strip().split( '/')[-1] out_tokenname = arr_tokenname_amount[1] out_tokenamount = arr_tokenname_amount[0] in_tokenaddress = await arr_tokenlink[1].get_attribute( 'href') in_tokenaddress = in_tokenaddress.strip().split( '/')[-1] in_tokenname = arr_tokenname_amount[3] in_tokenamount = arr_tokenname_amount[2] arr_tds_datares[5] = in_tokenaddress arr_tds_datares[6] = out_tokenaddress arr_tds_datares[7] = in_tokenname arr_tds_datares[8] = out_tokenname arr_tds_datares[9] = in_tokenamount arr_tds_datares[10] = out_tokenamount elif (idx_td_ == 6): router_link = (await td_.locator("a").all())[0] router_link = await router_link.get_attribute( 'href') router_link = router_link.strip().split('/')[-1] arr_tds_datares[11] = router_link elif (idx_td_ == 7): arr_pool_img = await td_.locator("img").all() for idx_pool_img in range(0, len(arr_pool_img)): arr_pool_img[idx_pool_img] = await arr_pool_img[idx_pool_img].get_attribute( 'src') arr_pool_img[idx_pool_img] = arr_pool_img[idx_pool_img].strip().split( '=')[-1] if "68747470733a2f2f737461746963732e736f6c7363616e2e696f2f65782d696d672f3637356b5058394d48546a53327a7431716672314e5948757a654c5866514d394832347746535574314d70382e706e67" in arr_pool_img: arr_pool_img = '1' else: arr_pool_img = '0' arr_tds_datares[12] = arr_pool_img print(f"end {str_dalaoAddress} get_tbody_data") return arr_trs_datares except Exception as e: print(str_dalaoAddress, txhash, "arr_tokenname_amount", arr_tokenname_amount, "arr_tokenlink=", arr_tokenlink) raise activities_columns = ['txhash', 'date', 'timestamp', 'action', 'from', 'in_tokenaddress', 'out_tokenaddress', 'in_tokenname', 'out_tokenname', 'in_tokenamount', 'out_tokenamount', 'router', 'israydium', ] df = pd.read_csv(baseclass.dalao_merge_path / "filter_dalao.csv", dtype=object) arr_str_dalaoAddress = df["dalaoAddress"].tolist() temp_arr_str_dalaoAddress = copy.deepcopy(arr_str_dalaoAddress) for str_dalaoAddress in temp_arr_str_dalaoAddress: if (baseclass.dalao_activities_solscan_path / f"solscan_act_{str_dalaoAddress}.csv").exists(): arr_str_dalaoAddress.remove(str_dalaoAddress) # arr_str_dalaoAddress = arr_str_dalaoAddress[0:6] arr_str_dalaoSwapUrl = [ f"https://solscan.io/account/{str_dalaoAddress}#defiactivities" for str_dalaoAddress in arr_str_dalaoAddress] print('arr_str_dalaoAddress', len(arr_str_dalaoAddress)) driver_len = 6 if len(arr_str_dalaoAddress) >= 6 else len( arr_str_dalaoAddress) arr_driver = [None]*driver_len arr_wait = [None]*driver_len arr_home_handle = [None]*driver_len arr_split_str_dalaoAddress = [None]*driver_len arr_split_str_dalaoSwapUrl = [None]*driver_len len_split_str_dalaoAddress = int(len(arr_str_dalaoAddress)/driver_len)+1 for idx_driver in range(0, driver_len): arr_split_str_dalaoAddress[idx_driver] = arr_str_dalaoAddress[idx_driver * len_split_str_dalaoAddress: (idx_driver+1)*len_split_str_dalaoAddress] arr_split_str_dalaoSwapUrl[idx_driver] = arr_str_dalaoSwapUrl[idx_driver * len_split_str_dalaoAddress: (idx_driver+1)*len_split_str_dalaoAddress] number_onepage = 10 set_str_dalaoAddress = set(arr_str_dalaoAddress) # split_len = 3 # def switch_to_page(context, title=None, url=None): # """切换到指定title 名称 或 url 的 标签页""" # for item_page in context.pages: # if title: # if title in item_page.title(): # # 激活当前选项卡 # item_page.bring_to_front() # return item_page # elif url: # if url in item_page.url: # # 激活当前选项卡 # item_page.bring_to_front() # return item_page # else: # print("not found title or url") # return context.pages[0] async def do_some_thing(playwright, cur_arr_str_dalaoAddress, cur_arr_str_dalaoSwapUrl): browser = await playwright.chromium.launch(headless=True) context = await browser.new_context() arr_page = [None]*2 arr_page[0] = await context.new_page() await arr_page[0].goto( "https://solscan.io/") # do .... print(context.pages) # global split_len global set_str_dalaoAddress # cur_set_str_dalaoAddress = set(arr_str_dalaoAddress) arr_str_dalaoAddress = cur_arr_str_dalaoAddress arr_str_dalaoSwapUrl = cur_arr_str_dalaoSwapUrl for str_dalaoAddress_idx in range(0, len(cur_arr_str_dalaoAddress)): str_dalaoAddress = arr_str_dalaoAddress[str_dalaoAddress_idx] str_dalaoTransferUrl = arr_str_dalaoSwapUrl[str_dalaoAddress_idx] print(f"enter str_dalaoAddress={str_dalaoAddress}") arr_page[1] = await context.new_page() await arr_page[1].goto(str_dalaoTransferUrl) page = None for item_page in context.pages: if item_page.url == str_dalaoTransferUrl: await item_page.bring_to_front() page = item_page break res_df = pd.DataFrame(columns=activities_columns) # if (baseclass.dalao_activities_solscan_path / # f"solscan_act_{str_dalaoAddress}.csv").exists(): # res_df = pd.read_csv(baseclass.dalao_activities_solscan_path / # f"solscan_act_{str_dalaoAddress}.csv", dtype=object) # continue # text_pagee = await page.content() # text_pagee = await page.html() # print("text_pagee=", text_pagee) th_ = page.locator( 'table.w-full thead th') # await expect(th_).toBeVisible() text_th_ = (await th_.text_content()).strip() print("text_th_=", text_th_) div_time_click = th_.locator( 'div.items-center.cursor-pointer') text_time_click = (await div_time_click.text_content()).strip() print( f"str_dalaoAddress={str_dalaoAddress} text_time_click={text_time_click}") if not "UTC" in text_time_click: await div_time_click.click() await page.wait_for_load_state('load') prev_page_height = await page.evaluate( "document.documentElement.scrollHeight") await asyncio.sleep(0.4) retytimes = 3 while retytimes > 0: retytimes -= 1 await page.evaluate( "window.scrollTo(0, document.body.scrollHeight);") await asyncio.sleep(0.4) cur_page_height = await page.evaluate( "document.documentElement.scrollHeight") if cur_page_height > prev_page_height: prev_page_height = cur_page_height elif cur_page_height == prev_page_height: break [button_num_onepage, number_onepage, arr_text_page_info, button_aft_onepage] = await get_pageselect_element(page=page, str_dalaoAddress=str_dalaoAddress) print(f"str_dalaoAddress={str_dalaoAddress} scrollTo 0") while (number_onepage != 40): await button_num_onepage.click(force=True) div_data_radix_select_dep1 = page.locator( 'div[data-radix-select-viewport][role="presentation"]') arr_div_data_radix_select = div_data_radix_select_dep1.locator( 'div[role="option"]') div_data_radix_select = (await arr_div_data_radix_select.all())[-1] await div_data_radix_select.click() await asyncio.sleep(0.8) [button_num_onepage, number_onepage, arr_text_page_info, button_aft_onepage] = await get_pageselect_element(page=page, str_dalaoAddress=str_dalaoAddress) retytimes = 3 while retytimes > 0: retytimes -= 1 await page.evaluate( "window.scrollTo(0, document.body.scrollHeight);") await asyncio.sleep(0.4) cur_page_height = await page.evaluate( "document.documentElement.scrollHeight") if cur_page_height > prev_page_height: prev_page_height = cur_page_height elif cur_page_height == prev_page_height: break break [button_num_onepage, number_onepage, arr_text_page_info, button_aft_onepage] = await get_pageselect_element(page=page, str_dalaoAddress=str_dalaoAddress) number_total_page = arr_text_page_info[1] if arr_text_page_info[1] <= 3 else 3 for number_cur_page in range(1, number_total_page+1): print( f"str_dalaoAddress={str_dalaoAddress} number_cur_page={number_cur_page} number_total_page={number_total_page}") retytimes = 3 while retytimes > 0: retytimes -= 1 await page.evaluate( "window.scrollTo(0, document.body.scrollHeight);") await asyncio.sleep(0.4) cur_page_height = await page.evaluate( "document.documentElement.scrollHeight") if cur_page_height > prev_page_height: prev_page_height = cur_page_height elif cur_page_height == prev_page_height: break [button_num_onepage, number_onepage, arr_text_page_info, button_aft_onepage] = await get_pageselect_element(page=page, str_dalaoAddress=str_dalaoAddress) while True: try: tbody_data = await get_tbody_data(page=page, str_dalaoAddress=str_dalaoAddress) except Exception as e: print(traceback.format_exc()) await asyncio.sleep(2) continue if tbody_data is None: await asyncio.sleep(0.8) continue elif (number_cur_page < number_total_page and len(tbody_data) != 40): print("tbody_data_len=", len(tbody_data)) await asyncio.sleep(0.8) continue else: break page_df = pd.DataFrame( tbody_data, columns=activities_columns) if page_df['txhash'].tolist()[0] in res_df['txhash']: # 已经存在了 不用在遍历了直接跳过 break res_df = pd.concat([res_df, page_df], ignore_index=True).reset_index(drop=True) if (number_cur_page == number_total_page): break await button_aft_onepage.click() await asyncio.sleep(1) # df.to_csv(baseclass.dalao_activities_solscan_path) res_df = res_df.drop_duplicates() res_df.to_csv(baseclass.dalao_activities_solscan_path / f"solscan_act_{str_dalaoAddress}.csv", index=False) set_str_dalaoAddress.discard(str_dalaoAddress) await arr_page[1].close() # time.sleep(10) await asyncio.sleep(5) await context.close() await browser.close() async def get_pageselect_element(page, str_dalaoAddress): while True: try: div_select_page = page.locator( "div.items-center.justify-end.flex-row") button_num_onepage = div_select_page.locator( 'button[type="button"].border-input') number_onepage = int((await button_num_onepage.text_content()).strip()) text_page_info = (await div_select_page.text_content()).strip() # text_page_info = Show10per pagePage 1 of 2 print(f'{ str_dalaoAddress} text_page_info=', text_page_info) if not "of" in text_page_info: # print("arr_text_page_info no of") raise Exception("arr_text_page_info no of") arr_text_page_info = text_page_info.split(' ') arr_text_page_info = [int(arr_text_page_info[-3]), int(arr_text_page_info[-1])] break except Exception as e: print(str(e)) await asyncio.sleep(1) continue # raise # arr_button_left_right = div_select_page.locator( # 'button.inline-flex.items-center.justify-center.whitespace-nowrap' # ) # button_pre_onepage = arr_button_left_right.all()[1] # button_aft_onepage = (await arr_button_left_right.all())[2] button_aft_onepage = page.get_by_role("button", name="right") return [button_num_onepage, number_onepage, arr_text_page_info, button_aft_onepage] async def get_onedriver_swapactivities(cur_arr_str_dalaoAddress, cur_arr_str_dalaoSwapUrl): async with async_playwright() as playwright: await do_some_thing(playwright, cur_arr_str_dalaoAddress, cur_arr_str_dalaoSwapUrl) async def main(): print("enter main()") tasks = [get_onedriver_swapactivities( arr_split_str_dalaoAddress[idx_driver], arr_split_str_dalaoSwapUrl[idx_driver]) for idx_driver in range(0, driver_len)] await asyncio.gather(*tasks) asyncio.run(main()) print(f"{'{:<6}'.format('END')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------") # main() # asyncio.get_event_loop().run_until_complete(main())