I'm trying to parse a file with a million lines, each line is a json string with some information about a book (author, contents etc). I'm using iota to load the file, as my program throws an OutOfMemoryError
if I try to use slurp
. I'm also using cheshire to parse the strings. The program simply loads a file and counts all the words in all books.
My first attempt included pmap
to do the heavy work, I figured this would essentially make use of all my cpu cores.
(ns multicore-parsing.core
(:require [cheshire.core :as json]
[iota :as io]
[clojure.string :as string]
[clojure.core.reducers :as r]))
(defn words-pmap
(letfn [(parse-with-keywords [str]
(json/parse-string str true))
(words [book]
(string/split (:contents book) #"\s+"))]
(io/vec filename)
(pmap parse-with-keywords)
(pmap words)
(r/reduce #(apply conj %1 %2) #{})
While it does seem to use all cores, each core rarely uses more than 50% of its capacity, my guess is that it has to do with batch size of pmap and so I stumbled across relatively old question where some comments make reference to the clojure.core.reducers
I decided to rewrite the function using reducers/map
(defn words-reducers
(letfn [(parse-with-keywords [str]
(json/parse-string str true))
(words [book]
(string/split (:contents book) #"\s+"))]
(io/vec filename)
(r/map parse-with-keywords)
(r/map words)
(r/reduce #(apply conj %1 %2) #{})
But the cpu usage is worse, and it even takes longer to finish compared to the previous implementation:
multicore-parsing.core=> (time (words-pmap "./dummy_data.txt"))
"Elapsed time: 20899.088919 msecs"
multicore-parsing.core=> (time (words-reducers "./dummy_data.txt"))
"Elapsed time: 28790.976455 msecs"
What am I doing wrong? Is mmap loading + reducers the correct approach when parsing a large file?
EDIT: this is the file I'm using.
EDIT2: Here are the timings with iota/seq
instead of iota/vec
multicore-parsing.core=> (time (words-reducers "./dummy_data.txt"))
"Elapsed time: 160981.224565 msecs"
multicore-parsing.core=> (time (words-pmap "./dummy_data.txt"))
"Elapsed time: 160296.482722 msecs"
I don't believe that reducers are going to be the right solution for you, as they don't cope with lazy sequences at all well (a reducer will give correct results with a lazy sequence, but won't parallelise well).
You might want to take a look at this sample code from the book Seven Concurrency Models in Seven Weeks (disclaimer: I am the author) which solves a similar problem (counting the number of times each word appears on Wikipedia).
Given a list of Wikipedia pages, this function counts the words sequentially (get-words
returns a sequence of words from a page):
(defn count-words-sequential [pages]
(frequencies (mapcat get-words pages)))
This is a parallel version using pmap
which does run faster, but only around 1.5x faster:
(defn count-words-parallel [pages]
(reduce (partial merge-with +)
(pmap #(frequencies (get-words %)) pages)))
The reason it only goes around 1.5x faster is because the reduce
becomes a bottleneck—it's calling (partial merge-with +)
once for each page. Merging batches of 100 pages improves the performance to around 3.2x on a 4-core machine:
(defn count-words [pages]
(reduce (partial merge-with +)
(pmap count-words-sequential (partition-all 100 pages))))