I understand how to programmatically receive the output, as well as how to run a MRJob job. This is clearly explained here. However I'm struggling to understand how to pass a list of dictionaries or any variables from another file into a MrJob job. Instead of having an input file the same way I might have "words.txt", I would instead like to pass said "words" as a variable (type list) containing those words.
To be more specific. Assume I have said list:
mylist = [
{"name": "Kayer", "Job": "Programmer"},
{"name": "Angela", "Job": "Designer"},
{"name": "Eve", "Job": "Programmer"},
{"name": "Robert", "Job": "Programmer"},
]
And I wanted to run a MrJob job which take said list, and would return me (for example) the number of people who's job is to be a programmer. How would I go about it?
Lastly, for the design of the system, I may not temporarily store the list into a text file or any file.
As I currently understand, I cannot run the following code within the same class and/or file of the job I'm trying to run:
mr_job = MRWordCounter(args=['-r', 'emr'])
with mr_job.make_runner() as runner:
runner.run()
for key, value in mr_job.parse_output(runner.cat_output()):
... # do something with the parsed output
Therefore, I set it up with a different file and then I could not figure out how to send the data over to my MrJob... I don't even understand if there is a way for me to pass the data over to MrJob in the first place.
First of all, I would highly recommend switching to Spark as it is way better than MrJob, if alike me you can't or you just wish to know, then here is a solution.
Looking over the MrJob documentation, we can see that we can programmatically run jobs (as shown here). This allows us to take an input (in our case a dictionary) and the process it as a MapReduce job with MrSpark.
To make all this work, let's assume we have a file called main.py
where we will run the job from. As per the documentation, it is vital that the file you run the job from is different from the one which contains the job.
Example main.py
:
from io import BytesIO
from test_job import TestJob
data = [
{"name": "Kayer", "Job": "Programmer"},
{"name": "Angela", "Job": "Designer"},
{"name": "Eve", "Job": "Programmer"},
{"name": "Robert", "Job": "Programmer"},
]
if __name__ == '__main__':
# Prepare the data to be passed
modified_data = ''
for d in data:
modified_data += f'{str(d)}\n'
stdin = BytesIO(bytes(modified_data, 'utf8'))
# Creates the job and gives it the data
job = TestJob(['--no-conf', '-'])
job.sandbox(stdin)
# Returns the results for the given job
with job.make_runner() as runner:
runner.run()
for key, value in job.parse_output(runner.cat_output()):
print(f'key: {key} -> value: {value}')
After having prepared this, we need to make sure the job can now read the received input and process it accordingly. For this we make a new file, which in this case, is called test_job.py
.
Example test_job.py
:
from mrjob.job import MRJob
import json
class TestJob(MRJob):
def mapper(self, _, line):
# Takes the data and prepares it to be loaded into a dictionary
test = line.replace("'", '"')
test = test.replace('None', 'null')
# Make the received line into a dictionary
data = json.loads(test)
if 'Job' in data.keys():
if data['Job'] == 'Programmer':
yield 'Programmer', 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
TestJob.run()