I want to find out the pair of who have contacted with one another. Following is the data:
Input is
K-\> M, H
M-\> K, E
H-\> F
B-\> T, H
E-\> K, H
F-\> K, H, E
A-\> Z
And the output is:
Output:
K, M //(this means K has supplied goods to M and M has also supplied some good to K)
H, F
Here is what I have written the code.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.regression import LinearRegression
import re
from itertools import combinations
spark = SparkContext("local", "DoubleRDD")
def findpairs(ls):
lst = []
for i in range(0,len(ls)-1):
for j in range(i+1, len(ls)):
if ls[i] == tuple(reversed(ls[j])):
lst.append(ls[i])
return(lst)
text = spark.textFile("path to the .txt")
text = text.map(lambda s: s.replace("->",","))
text = text.map(lambda s: s.replace(",",""))
text = text.map(lambda s: s.replace(" ",""))
pairs = text.flatMap(lambda x: [(x[0],y) for y in x[1:]])
commonpairs = pairs.filter(lambda x: findpairs(x))
pairs.collect()
The output is: []
Dont use RDD the problem can be solved using native spark dataframe functions. Read the text file as a spark dataframe
df = spark.read.csv('data.txt', header=False, sep=r'-\\> ').toDF('x', 'y')
# +---+-------+
# | x| y|
# +---+-------+
# | K| M, H|
# | M| K, E|
# | H| F|
# | B| T, H|
# | E| K, H|
# | F|K, H, E|
# | A| Zs|
# +---+-------+
Split and explode the recipients (y) column
df1 = df.withColumn('y', F.explode(F.split('y', r',\s+')))
# +---+---+
# | x| y|
# +---+---+
# | K| M|
# | K| H|
# | M| K|
# | M| E|
# | H| F|
# | B| T|
# | B| H|
# | E| K|
# | E| H|
# | F| K|
# | F| H|
# | F| E|
# | A| Zs|
# +---+---+
Self join the dataframe where recipient in left is the sender in right dataframe. Then filter the datframe such that sender in left is same as the recipient in the right
df1 = df1.alias('left').join(df1.alias('right'), on=F.expr("left.y == right.x"))
df1 = df1.filter("left.x == right.y")
# +---+---+---+---+
# | x| y| x| y|
# +---+---+---+---+
# | K| M| M| K|
# | M| K| K| M|
# | H| F| F| H|
# | F| H| H| F|
# +---+---+---+---+
Drop the duplicate combination of sender and recipients
df1 = df1.select('left.*').withColumn('pairs', F.array_sort(F.array('x', 'y')))
df1 = df1.dropDuplicates(['pairs']).drop('pairs')
# +---+---+
# | x| y|
# +---+---+
# | H| F|
# | K| M|
# +---+---+