{"id":841,"date":"2019-08-13T19:44:55","date_gmt":"2019-08-13T18:44:55","guid":{"rendered":"https:\/\/www.thesixsides.com\/blog\/?p=841"},"modified":"2019-08-13T19:57:58","modified_gmt":"2019-08-13T18:57:58","slug":"processing-many-small-files-with-apache-spark","status":"publish","type":"post","link":"https:\/\/www.thesixsides.com\/blog\/2019\/08\/13\/processing-many-small-files-with-apache-spark\/","title":{"rendered":"Processing Many Small Files with Apache Spark"},"content":{"rendered":"<p><a href=\"https:\/\/spark.apache.org\/\">Apache Spark<\/a> is straight-forward to use when there are large files that need to be chunked up and read, but what about when you have many small files? In this post we\u2019ll look at a way to address many small files and some parameter tuning to speed that up.<\/p>\n<p>The sample data we\u2019ll be looking at is the <a href=\"http:\/\/millionsongdataset.com\/pages\/getting-dataset\/\">Million Song Dataset<\/a>(MSD), the entire dataset is about 273 GB, more than enough for Spark to take a bite out of. (If you wish to try this at home the site offers a sample of 10,000 songs at a more digestible size of 1.8GB.) The entire dataset can be downloaded in 26 A-Z partitions, each partition contains between 38,109 and 39,100 songs. Each of these songs is stored as a file in the HDF5 format and is between 180kb to 900kb in size, much too small to split up for a Spark job. The small file size is the crux of our problem.<\/p>\n<p>So we need to make large files, this will be as lists of all the files. It is these lists that will be partitioned among Spark jobs. A job will read the file list and extract important song information in the HDF5 files and aggregate it into larger CSV files which Spark will consume. To create a CSV for every partition:<\/p>\n<pre><code class=\"sh\">#!\/bin\/bash\nfor X in {A..Z}\ndo\n    ls -1 ${X}\/*\/*\/*.h5 &gt; ${X}_file_list.csv\ndone<\/code><\/pre>\n<p>From each file in the file list, we want to aggregate the relevant information. A script that extracts the title, artist name, and year follows. For reading the MSD HDF5 files there exists a library which makes the reading easier: <a href=\"https:\/\/github.com\/tb2332\/MSongsDB\/blob\/master\/PythonSrc\/hdf5_getters.py\"><code>hdf5_getters.py<\/code><\/a>. It\u2019s referred to as <code>mill<\/code> below.<\/p>\n<pre><code class=\"py\">import glob\nimport re\nimport sys\nfrom functools import partial\nfrom pyspark import SparkConf, SparkContext\nimport hdf5_getters as mill\n\n# simple wrapper for the hdf5_getters, just be sure the string 'func' exists\ndef call_mill(m, func, fi):\n    return getattr(m, 'get_' + func)(fi)\n\n# gather 'fcns' attributes and return them as a comma separated string.\ndef parse_h5(file_path, fcns):\n    # read the file\n    fi = mill.open_h5_file_read('\/home\/ubuntu\/' + file_path)\n    vals    = []\n    str_format = []\n    sep     = '\\t'\n    # for each getter in fcns\n    for f in fcns.value:\n        format = f['format']\n        str_format.append(format)\n        val = call_mill(mill, f['func'], fi)\n        # if string, cast it to utf-8 string\n        if format == '%s':\n            # there's a better way to do this but we have to import numpy\n            if str(type(val)) == \"&lt;class 'numpy.ndarray'&gt;\":\n                val = ','.join([x.decode('utf-8') for x in val])\n            else:\n                val = val.decode('utf-8')\n            val = re.sub(r'\\t', ' ', val)\n        vals.append(val)\n    # close the file\n    fi.close()\n    return (sep.join(str_format) % tuple(vals))\n\ndef get_headers(fcns, sep='\\t'):\n    headers = []\n    for f in fcns:\n        headers.append(f['func'])\n    return sep.join(headers)\n\nif __name__ == '__main__':\n    start_letter = None\n    length = None\n    if len(sys.argv) &lt; 4:\n        print('Expected arguments: start_letter, length, workers')\n        exit(1)\n\n    file_prefixes = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'\n    start = file_prefixes.index(start_letter)\n    end_letter = file_prefixes[start+length-1] # -1 for inclusive\n\n    conf = SparkConf() \\\n            .setMaster('spark:\/\/spark_ip') \\\n            .set('spark.hadoop.validateOutputSpecs', False) \\\n            .setAppName('h5_to_csv:%s to %s' % (start_letter, end_letter))\n    sc = SparkContext(conf = conf)\n\n    # the fields to get from the hdf5 files\n    get_fcns = [\n        {'func': 'title',           'format': '%s'},\n        {'func': 'artist_name',     'format': '%s'},\n        {'func': 'year',            'format': '%d'},\n    ]\n    # let the workers know about them\n    fcns = sc.broadcast(get_fcns)\n\n    # get all file listings\n    file_list = glob.glob('file_lists\/*.csv')\n    file_list.sort() # alphabetize\n    job_list = file_list[start:start+length]\n\n    # concatenate the file lists\n    files = sc.textFile(','.join(job_list)).cache()\n    # process and collect the data from all the files\n    rdd_output = files.map(partial(parse_h5, fcns=fcns)).collect()\n\n    # write the aggregated data as csv.\n    outname = 'entries-%s-%s.csv' % (start_letter, end_letter)\n    with open('results\/' + outname, 'w') as fi:\n        fi.write(get_headers(get_fcns) + '\\n')\n        fi.write('\\n'.join(rdd_output))\n<\/code><\/pre>\n<p>If we zip our dependencies and run this on a cluster with something like:<\/p>\n<pre><code class=\"sh\">spark-submit --py-files dep.zip h5_to_csv.py A B 4 # start, end, workers<\/code><\/pre>\n<p>The results will be underwhelming. If we go to the Spark dashboard and watch the jobs run we\u2019ll see a performance bottleneck because the data is partitioned across only one or two partitions. This is due to the many small file sizes. Per the Spark <a href=\"https:\/\/spark.apache.org\/docs\/2.4.0\/rdd-programming-guide.html\">RDD Programming Guide<\/a>:<\/p>\n<blockquote><p>\u201ctypically you want 2\u20134 partitions for each CPU in your cluster.\u201d<\/p><\/blockquote>\n<p>Let\u2019s say the cluster is 4 cores, we should want about 8\u201316 partitions however we\u2019re not seeing that due to size of the files that are being read from the file lists. We can force a number of minimum partitions with the <code>minPartitions<\/code> argument, changing the <code>files = sc.textFile(...)<\/code> line to be:<\/p>\n<pre><code>files = sc.textFile(','.join(job_list), minPartitions=8).cache()<\/code><\/pre>\n<p>When working with this many small files 8 partitions still presents a bit of a bottleneck. A better <code>minPartitions<\/code> can be found by doing some timed runs with a multiplier variable <code>m<\/code>.<\/p>\n<pre><code>minPartitions = workers * cores * m<\/code><\/pre>\n<p>As the graph below shows a multiplier of 8 seems reasonable with diminishing returns thereafter. Unless there are any other common bottlenecks there will be noticeably better performance. With six workers, and <code>m = 8<\/code> the entire 273 GB can is processed in roughly 24 minutes! On the cluster I was using this is <code>minPartitions=256<\/code>.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-843 size-full\" src=\"https:\/\/www.thesixsides.com\/blog\/wp-content\/uploads\/2019\/08\/chart.png\" alt=\"\" width=\"570\" height=\"371\" srcset=\"https:\/\/www.thesixsides.com\/blog\/wp-content\/uploads\/2019\/08\/chart.png 570w, https:\/\/www.thesixsides.com\/blog\/wp-content\/uploads\/2019\/08\/chart-300x195.png 300w\" sizes=\"auto, (max-width: 570px) 100vw, 570px\" \/><\/p>\n<h2 id=\"cluster\">Cluster<\/h2>\n<p>The cluster itself setup and run on the Swedish National Infrastructure for Computing (SNIC). In total the cluster consisted of seven nodes with eight CPU\u2019s and 16 GB of RAM each. One of these was the dedicated master node responsible for resource allocation while the remaining six where pure worker nodes, using all their CPUs for data processing.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Apache Spark is straight-forward to use when there are large files that need to be chunked up and read, but what about when you have many small files? In this post we\u2019ll look at a way to address many small files and some parameter tuning to speed that up. The sample data we\u2019ll be looking [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[51],"tags":[53],"class_list":["post-841","post","type-post","status-publish","format-standard","hentry","category-code","tag-python"],"_links":{"self":[{"href":"https:\/\/www.thesixsides.com\/blog\/wp-json\/wp\/v2\/posts\/841","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.thesixsides.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.thesixsides.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.thesixsides.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.thesixsides.com\/blog\/wp-json\/wp\/v2\/comments?post=841"}],"version-history":[{"count":3,"href":"https:\/\/www.thesixsides.com\/blog\/wp-json\/wp\/v2\/posts\/841\/revisions"}],"predecessor-version":[{"id":849,"href":"https:\/\/www.thesixsides.com\/blog\/wp-json\/wp\/v2\/posts\/841\/revisions\/849"}],"wp:attachment":[{"href":"https:\/\/www.thesixsides.com\/blog\/wp-json\/wp\/v2\/media?parent=841"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.thesixsides.com\/blog\/wp-json\/wp\/v2\/categories?post=841"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.thesixsides.com\/blog\/wp-json\/wp\/v2\/tags?post=841"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}