from base_class import BaseVariableFunction from base_class import * baseclass = BaseVariableFunction(__file__) old_print = print def timestamped_print(*args, **kwargs): old_print(datetime.datetime.utcnow().replace( microsecond=0), *args, **kwargs) print = timestamped_print print('\n'*5) print(f"{'{:<6}'.format('ENTER')} {baseclass.scriptfilename} ----------------NOTE-----------NOTE---------------") # 获取输入节点相关的图 def get_connected_edges(df, str_dalaoaddress, max_connections=6): # 初始化结果列表 result = [] # 初始化待检查的节点集合 to_check = {str_dalaoaddress} # 初始化已检查的节点集合 checked = set() # 初始化连接数字典 connection_counts = {node: 0 for node in df['from_owner'].tolist() + df['to_owner'].tolist()} while to_check: # 取出一个节点 node = to_check.pop() # 添加到已检查集合 checked.add(node) # 如果节点连接数超过最大限制,则跳过 if connection_counts[node] > max_connections: continue # 找到所有从这个节点出发的边 from_edges = df[df['from_owner'] == node] # 找到所有进入这个节点的边 to_edges = df[df['to_owner'] == node] # 更新连接数 connection_counts[node] += len(from_edges) + len(to_edges) # 如果连接数超过最大限制,则跳过 if connection_counts[node] > max_connections: continue # 将这些边添加到结果列表 result.extend(from_edges.index.tolist()) result.extend(to_edges.index.tolist()) # 将所有连接的节点添加到待检查集合 for edge in from_edges['to_owner'].tolist() + to_edges['from_owner'].tolist(): if edge not in checked: to_check.add(edge) # 返回与起始节点连接的所有边的索引 connected_edges_indices = result connected_df = df.loc[connected_edges_indices] connected_df=connected_df.drop_duplicates().reset_index(drop=True) return connected_df excutionid ='01JBXRQNBWHRQ0RT66W4AC1QFE' file_name = f"one_token_relation_graph_{excutionid}.csv" obj_excutionid_2_tokeninfo = None obj_tokeninfo = None with open(baseclass.dune_excution_path/"obj_excutionid_2_tokeninfo.json",mode='r',encoding='utf8') as f: obj_excutionid_2_tokeninfo = json.load(f) obj_tokeninfo = obj_excutionid_2_tokeninfo[excutionid] str_tokenaddress = obj_tokeninfo["tokenaddress"] str_tokenname = obj_tokeninfo["tokenname"] # df =pd.read_excel(baseclass.librarydata_path/"test.xlsx",dtype=object) df =pd.read_csv(baseclass.dune_excution_path/file_name , dtype=object) str_dalaoaddress ='EeS7DBots7zF7JA4iCPun1uA779aXCPCBhe4kGkuCuRA' df = get_connected_edges(df, str_dalaoaddress) df =df.astype( { "owner_count":int, } ) # def makeurl( self,url,url_show): # return '=HYPERLINK("{}","{}")'.format(url, "url_"+url_show) df["gmgn_from"] = df.apply(lambda ser: baseclass.makeurl( f"https://gmgn.ai/sol/token/{str_tokenaddress}?maker={ser['from_owner']}" , f"gmgn_{ser['from_owner'][0:6]}" ), axis=1) df["gmgn_to"] = df.apply(lambda ser: baseclass.makeurl( f"https://gmgn.ai/sol/token/{str_tokenaddress}?maker={ser['to_owner']}" , f"gmgn_{ser['to_owner'][0:6]}" ), axis=1) df["url_from_ata"] = df.apply(lambda ser: baseclass.makeurl( f"https://solscan.io/account/{ser['from_token_account']}" , f"sol_{ser['from_token_account'][0:6]}" ), axis=1) df["url_to_ata"] = df.apply(lambda ser: baseclass.makeurl( f"https://solscan.io/account/{ser['to_token_account']}" , f"sol_{ser['to_token_account'][0:6]}" ), axis=1) df =df.sort_values(by=['from_owner']).reset_index(drop=True) baseclass.makedirpath(baseclass.dune_token_graph_path/ str_tokenaddress) df.to_excel(baseclass.dune_token_graph_path/ str_tokenaddress/f"{str_dalaoaddress}.xlsx",index=False)