I'm having trouble figuring out how to update the networkx dag_find_longest_path() algorithm to return "N" for ties instead of returning the first max edge found, or returning a list of all edges that are tied for max weight.
I first created a DAG from pandas dataframe that contains an edgelist like the following subset:
edge1 edge2 weight
115252161:T 115252162:A 1.0
115252162:A 115252163:G 1.0
115252163:G 115252164:C 3.0
115252164:C 115252165:A 5.5
115252165:A 115252166:C 5.5
115252162:T 115252163:G 1.0
115252166:C 115252167:A 7.5
115252167:A 115252168:A 7.5
115252168:A 115252169:A 6.5
115252165:A 115252166:G 0.5
Then I use the following code to topologically sort the graph and then find the longest path according to the weights of the edges:
G = nx.from_pandas_edgelist(edge_df, source="edge1",
target="edge2",
edge_attr=['weight'],
create_using=nx.OrderedDiGraph())
longest_path = pd.DataFrame(nx.dag_longest_path(G))
This works great, except when there are ties for max weighted edge, it returns the first max edge found, and instead I would like it to just return an "N" representing "Null". So currently, the output is:
115252161 T
115252162 A
115252163 G
115252164 C
115252165 A
115252166 C
But what I actually need is:
115252161 T
115252162 N (or [A,T] )
115252163 G
115252164 C
115252165 A
115252166 C
The algorithm for finding the longest path is:
def dag_longest_path(G):
dist = {} # stores [node, distance] pair
for node in nx.topological_sort(G):
# pairs of dist,node for all incoming edges
pairs = [(dist[v][0] + 1, v) for v in G.pred[node]]
if pairs:
dist[node] = max(pairs)
else:
dist[node] = (0, node)
node, (length, _) = max(dist.items(), key=lambda x: x[1])
path = []
while length > 0:
path.append(node)
length, node = dist[node]
return list(reversed(path))
Copy-pastable definition of G
.
import pandas as pd
import networkx as nx
import numpy as np
edge_df = pd.read_csv(
pd.compat.StringIO(
"""edge1 edge2 weight
115252161:T 115252162:A 1.0
115252162:A 115252163:G 1.0
115252163:G 115252164:C 3.0
115252164:C 115252165:A 5.5
115252165:A 115252166:C 5.5
115252162:T 115252163:G 1.0
115252166:C 115252167:A 7.5
115252167:A 115252168:A 7.5
115252168:A 115252169:A 6.5
115252165:A 115252166:G 0.5"""
),
sep=r" +",
)
G = nx.from_pandas_edgelist(
edge_df,
source="edge1",
target="edge2",
edge_attr=["weight"],
create_using=nx.OrderedDiGraph(),
)
longest_path = pd.DataFrame(nx.dag_longest_path(G))
I ended up just modeling the behavior in a defaultdict counter object.
from collections import defaultdict, Counter
I modified my edgelist to a tuple of (position, nucleotide, weight):
test = [(112,"A",23.0), (113, "T", 27), (112, "T", 12.0), (113, "A", 27), (112,"A", 1.0)]
Then used defaultdict(counter) to quickly sum occurences of each nucleotide at each position:
nucs = defaultdict(Counter)
for key, nuc, weight in test:
nucs[key][nuc] += weight
And then looped through the dictionary to pull out all nucleotides that equal the max value:
for key, nuc in nucs.items():
seq_list = []
max_nuc = []
max_val = max(nuc.values())
for x, y in nuc.items():
if y == max_val:
max_nuc.append(x)
if len(max_nuc) != 1:
max_nuc = "N"
else:
max_nuc = ''.join(max_nuc)
seq_list.append(max_nuc)
sequence = ''.join(seq_list)
This returns the final sequence for the nucleotide with the max value found, and returns N in the position of a tie:
TNGCACAAATGCTGAAAGCTGTACCATANCTGTCTGGTCTTGGCTGAGGTTTCAATGAATGGAATCCCGTAACTCTTGGCCAGTTCGTGGGCTTGTTTTGTATCAACTGTCCTTGTTGGCAAATCACACTTGTTTCCCACTAGCACCAT
However, the question bothered me, so I ended up using node attributes in networkx as a means to flag each node as being a tie or not. Now, when a node is returned in the longest path, I can then check the "tie" attribute and replace the node name with "N" if it has been flagged:
def get_path(self, edge_df):
G = nx.from_pandas_edgelist(
edge_df,
source="edge1",
target="edge2",
edge_attr=["weight"],
create_using=nx.OrderedDiGraph()
)
# flag all nodes as not having a tie
nx.set_node_attributes(G, "tie", False)
# check nodes for ties
for node in G.nodes:
# create list of all edges for each node
edges = G.in_edges(node, data=True)
# if there are multiple edges
if len(edges) > 1:
# find max weight
max_weight = max([edge[2]['weight'] for edge in edges])
tie_check = []
for edge in edges:
# pull out all edges that match max weight
if edge[2]["weight"] == max_weight:
tie_check.append(edge)
# check if there are ties
if len(tie_check) > 1:
for x in tie_check:
# flag node as being a tie
G.node[x[0]]["tie"] = True
# return longest path
longest_path = nx.dag_longest_path(G)
return longest_path