clojurereducerspmapcheshire

Why is pmap|reducers/map not using all cpu cores?


I'm trying to parse a file with a million lines, each line is a json string with some information about a book (author, contents etc). I'm using iota to load the file, as my program throws an OutOfMemoryError if I try to use slurp. I'm also using cheshire to parse the strings. The program simply loads a file and counts all the words in all books.

My first attempt included pmap to do the heavy work, I figured this would essentially make use of all my cpu cores.

(ns multicore-parsing.core
  (:require [cheshire.core :as json]
            [iota :as io]
            [clojure.string :as string]
            [clojure.core.reducers :as r]))


(defn words-pmap
  [filename]
  (letfn [(parse-with-keywords [str]
            (json/parse-string str true))
          (words [book]
            (string/split (:contents book) #"\s+"))]
    (->>
     (io/vec filename)
     (pmap parse-with-keywords)
     (pmap words)
     (r/reduce #(apply conj %1 %2) #{})
     (count))))

While it does seem to use all cores, each core rarely uses more than 50% of its capacity, my guess is that it has to do with batch size of pmap and so I stumbled across relatively old question where some comments make reference to the clojure.core.reducers library.

I decided to rewrite the function using reducers/map:

(defn words-reducers
  [filename]
  (letfn [(parse-with-keywords [str]
            (json/parse-string str true))
          (words [book]
            (string/split (:contents book) #"\s+"))]
  (->>
   (io/vec filename)
   (r/map parse-with-keywords)
   (r/map words)
   (r/reduce #(apply conj %1 %2) #{})
   (count))))

But the cpu usage is worse, and it even takes longer to finish compared to the previous implementation:

multicore-parsing.core=> (time (words-pmap "./dummy_data.txt"))
"Elapsed time: 20899.088919 msecs"
546
multicore-parsing.core=> (time (words-reducers "./dummy_data.txt"))
"Elapsed time: 28790.976455 msecs"
546

What am I doing wrong? Is mmap loading + reducers the correct approach when parsing a large file?

EDIT: this is the file I'm using.

EDIT2: Here are the timings with iota/seq instead of iota/vec:

multicore-parsing.core=> (time (words-reducers "./dummy_data.txt"))
"Elapsed time: 160981.224565 msecs"
546
multicore-parsing.core=> (time (words-pmap "./dummy_data.txt"))
"Elapsed time: 160296.482722 msecs"
546

Solution

  • I don't believe that reducers are going to be the right solution for you, as they don't cope with lazy sequences at all well (a reducer will give correct results with a lazy sequence, but won't parallelise well).

    You might want to take a look at this sample code from the book Seven Concurrency Models in Seven Weeks (disclaimer: I am the author) which solves a similar problem (counting the number of times each word appears on Wikipedia).

    Given a list of Wikipedia pages, this function counts the words sequentially (get-words returns a sequence of words from a page):

    (defn count-words-sequential [pages]
      (frequencies (mapcat get-words pages)))
    

    This is a parallel version using pmap which does run faster, but only around 1.5x faster:

    (defn count-words-parallel [pages]
      (reduce (partial merge-with +)
        (pmap #(frequencies (get-words %)) pages)))
    

    The reason it only goes around 1.5x faster is because the reduce becomes a bottleneck—it's calling (partial merge-with +) once for each page. Merging batches of 100 pages improves the performance to around 3.2x on a 4-core machine:

    (defn count-words [pages]
      (reduce (partial merge-with +)
        (pmap count-words-sequential (partition-all 100 pages))))