{"id":68457,"date":"2016-10-04T13:34:17","date_gmt":"2016-10-04T13:34:17","guid":{"rendered":"https:\/\/www.simple-talk.com\/?p=68457"},"modified":"2021-02-23T15:48:30","modified_gmt":"2021-02-23T15:48:30","slug":"ever-need-partition-big-raw-data","status":"publish","type":"post","link":"https:\/\/www.red-gate.com\/simple-talk\/cloud\/big-data\/ever-need-partition-big-raw-data\/","title":{"rendered":"Why Would I Ever Need to Partition My Big \u2018Raw\u2019 Data?"},"content":{"rendered":"<p>Before we jump into the Big Data world and <a href=\"http:\/\/spark.apache.org\/\">Spark<\/a> specifically, let\u2019s take a step back to the relational database sphere and see what all the fuss around partitioning relational data tables is about.<\/p>\n<h2>First things first<\/h2>\n<p>There a few important reasons why partitioning was introduced in RDBMS. However, all of them stem from the fact that, at some point, a database table may grow in a size to the hundreds of gigabytes or more and this makes difficult to:<\/p>\n<ul>\n<li>load new data<\/li>\n<li>remove old data<\/li>\n<li>maintain indexes<\/li>\n<li>query the data<\/li>\n<\/ul>\n<p>The table partitioning should make such operations much more manageable by dividing the table and its indexes into smaller partitions based on selected attributes (columns). As a result, maintenance operations can be applied on a partition-by-partition basis, rather than the entire table. What\u2019s more, a SQL Server optimizer can generate an execution plan that will read data only from the requested partitions if a proper filtered query is invoked; this dramatically improves the query execution performance. This operation is called \u2018partition pruning&#8217;.<\/p>\n<p>There are well known strategies and best practices for partitioning in the RDBMS world, which I will not cover here (you can read <a href=\"https:\/\/www.google.pl\/url?sa=t&amp;rct=j&amp;q=&amp;esrc=s&amp;source=web&amp;cd=3&amp;cad=rja&amp;uact=8&amp;ved=0ahUKEwjF5-yqjJHOAhXC_iwKHeSeAMQQFggsMAI&amp;url=http%3A%2F%2Fdownload.microsoft.com%2Fdownload%2Fd%2Fb%2Fd%2Fdbde7972-1eb9-470a-ba18-58849db3eb3b%2Fparttableandindexstrat.docx&amp;usg=AFQjCNHXKM0Ul_ZSx4fk62mRZcOlG8CYAA&amp;sig2=9lvox2oDghg0npTfFwC-HA&amp;bvm=bv.127984354,d.bGg\">Partitioned Table and Index Strategies Using SQL Server 2008<\/a> whitepaper if you\u2019re interested). Instead, I will focus on generic day-to-day data flow such as loading new data into a system to better understand the problem and see whether we can do the same kind of optimizations in Spark world. I will assume, in this article, that you know a few things about Spark at least, if not then I strongly recommend <a href=\"https:\/\/www.infoq.com\/articles\/apache-spark-introduction\">Srini Penchikala \u2013 Big Data Processing with Apache Spark<\/a>, <a href=\"https:\/\/www.youtube.com\/watch?v=2b-0xddTzEU\">Spark Essentials with Adam Breindel<\/a>, <a href=\"http:\/\/shop.oreilly.com\/product\/0636920028512.do\">Learning Spark<\/a> book or <a href=\"http:\/\/spark.apache.org\/docs\/latest\/index.html\">Apache Spark Documentation<\/a>.<\/p>\n<h2>Load\u2019em all, but how?<\/h2>\n<p>The most common day-to-day activity in any data pipeline is loading a new data into any sort of storage system. Based on what I have already said, we could imagine two approaches here:<\/p>\n<ul>\n<li>always load data into the same container, whether it\u2019s a single table file or folder<\/li>\n<li>structure (partition) the data where data will be physically distributed across different table files or folders, while at the same time logically visible as a single entity (table)<\/li>\n<\/ul>\n<p>While the first approach is very tempting, it carries quite a big burden. By this I mean that you\u2019re making a single big pile of data and, the more it grows, the less control you have over it. Immediately, it turns out that in order to do any sort of operations, such as duplicate checks, you will need to read the entire existing data. I\u2019m sure you don\u2019t want to load the same data twice.<\/p>\n<p>You can introduce indexes in order to overcome those issues: However with a very large scale, indexes become a problem as well, in the exact same way as the table itself. Additionally, you have to rebuild those big indexes for the whole table each time the data has been loaded.<\/p>\n<p>And what if you want to re-load the data that has been loaded two weeks ago because, at that time, the data was of poor quality? You need to locate the old data subset, delete it &amp; finally load a new one. This can potentially take a tremendous time if you are operating on a very large scale of data.<\/p>\n<p>So how does partitioning solve that? Let\u2019s take a look.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-68462\" src=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2016\/09\/diagram.png\" alt=\"\" width=\"561\" height=\"270\" \/><\/p>\n<p>In a nutshell, firstly, an empty <em>Partition 3<\/em> is created in a production table; then new data is loaded into a staging table with only one partition. Finally a meta-switch exchanges partitions. The old partition can be deleted now. The only operation that really takes time is loading a new data into a staging partition. However, keep in mind it\u2019s loading data into an empty table with one partition rather than into the entire production table.<\/p>\n<p>Such approach also enables an <em>immutable<\/em> data pipeline where you can easily reload any snapshot data, on any partition basis, anytime. Such an immutable data pipeline removes the headache of managing the existing state (merges).<\/p>\n<p>How about Spark then? Do we have the same problems with raw data there? Can we use the same concepts there?<\/p>\n<h2>Let\u2019s partition a table, oops, I meant folder, does it really matter?<\/h2>\n<p>The story about loading data is a bit different in the Big Data world because raw data is stored directly on a distributed file system. However, even though we don\u2019t have to really care about indexes, we would still appreciate partitioning here for the same exact reasons, especially because we have to deal with a very large scale data.<\/p>\n<p>First of all, since these are just files on a file system, we need an efficient file format that supports file schema, partitioning, compression and ideally columnar storage. Luckily, there are a few in the Big Data ecosystem but the most promising and natively integrated by Spark is <a href=\"http:\/\/parquet.apache.org\/\">Apache Parquet<\/a> that was originally invented by <a href=\"https:\/\/about.twitter.com\/pl\/company\">Twitter<\/a>.<\/p>\n<p>Parquet makes it possible to make the transition from having just files on a file system to having more files-as-tables on a file system with very efficient columnar storage. That brings us more closely to the RDBMS world rather than working with just files.<\/p>\n<p>In terms of <em>tables<\/em>, columnar storage brings two important advantages:<\/p>\n<ul>\n<li>very efficient compression of homogeneous columnar values over heterogeneous row values<\/li>\n<li>. For example, if you have a \u2018<span style=\"font-family: consolas;\">Time<\/span>\u2019 column that ticks every 3 seconds, then it\u2019s much more efficient to store initial value and fixed interval, rather than storing all data as \u2018<span style=\"font-family: consolas;\">Time<\/span>\u2019 type, we will save significant amount of size in the resulting file size, as well as reduce I\/O operations on the hard drive.<\/li>\n<li>column pruning \u2013 reading only requested columns<\/li>\n<\/ul>\n<p>Spark leverages Parquet and give us a mechanism for partition discovery. Let\u2019s take a look at an example of a partitioned folder structure:<\/p>\n<pre>path\r\n\u2514\u2500\u2500 to\r\n    \u2514\u2500\u2500 my_table\r\n        \u251c\u2500\u2500 Year=2016\r\n        \u2502   \u251c\u2500\u2500 ...\r\n        \u2502   \u2502\r\n        \u2502   \u251c\u2500\u2500 Month=4\r\n        \u2502   \u2502   \u251c\u2500\u2500 Day=15\r\n        \u2502   \u2502   \u2502   \u2514\u2500\u2500 data.parquet\r\n        \u2502   \u2502   \u2514\u2500\u2500 ...\r\n        \u2502   \u251c\u2500\u2500 Month=5\r\n        \u2502   \u2502   \u251c\u2500\u2500 Day=12\r\n        \u2502   \u2502   \u2502   \u2514\u2500\u2500 data.parquet\r\n        \u2502   \u2502   \u2514\u2500\u2500 ...\r\n        \u2514\u2500\u2500 Year=2015\r\n            \u251c\u2500\u2500 ...\r\n<\/pre>\n<p>As we can see data in a partitioned table is distributed across different directories where the values of partitioned columns are encoded in the name of each partition directory, such as <span style=\"font-family: consolas;\">Year=2016, Month=04, Day=15<\/span> etc. In other words, by having that structure we can be sure that data from April 2016 can be found only in <span style=\"font-family: consolas;\">\/path\/to\/my_table\/Year=2016\/Month=4<\/span> directory. A table is just a root folder, <span style=\"font-family: consolas;\">\/path\/to\/my_table<\/span>.<\/p>\n<p>There\u2019s nothing really special about it, there\u2019s no hidden magic or code adding some partition schema metadata, it\u2019s only about keeping the file structure right. That means, you could prepare such structure <em>offline<\/em>, out of Spark awareness and then load the table with Spark. It\u2019s also in line with the <a href=\"http:\/\/hive.apache.org\/\">Hive<\/a> partition layout, that means you can load existing Hive data and Spark <a href=\"http:\/\/spark.apache.org\/docs\/latest\/sql-programming-guide.html#hive-tables\">integrates<\/a> with Hive metastore anyway, while it keeps using its own Parquet reader &amp; writer instead of Hive SerDe.<\/p>\n<h2>Shall we read something?<\/h2>\n<p>Whenever we read such a directory structure using <a href=\"http:\/\/spark.apache.org\/docs\/latest\/sql-programming-guide.html\">Spark SQL<\/a>, it will automatically discover the partition layout in the given directory for us by inferring the partition columns together with their values. They will be added to the resulting <span style=\"font-family: consolas;\">DataFrame<\/span> in some sort of pseudo columns (similarly what Hive does underneath), see below example:<\/p>\n<pre>val df = sqlContext.read.parquet(\"\/path\/to\/my_table\") \/\/ load data\r\ndf.printSchema() \/\/ print inferred schema\r\n\r\nroot \r\n|-- column1: string (nullable = true) \r\n|-- column2: string (nullable = true)\r\n|-- ... \r\n|-- Day: long (nullable = true) \r\n|-- Month: long (nullable = true) \r\n|-- Year: long (nullable = true)\r\n<\/pre>\n<p>If we now try to query the underlying data with a filter predicate on any of those partition columns, SparkSQL engine will optimize the query and read only the data that matches this filter predicate.<\/p>\n<p>For example, if we run a following query:<\/p>\n<pre>df.filter(\"Year = 2016\").show()<\/pre>\n<p>then Spark will only read data from <span style=\"font-family: consolas;\">\/path\/to\/my_table\/Year=2016<\/span> directory, and the rest will be skipped (pruned).<\/p>\n<h2>Now I wanna load some data<\/h2>\n<p>Once we got such a partitioned directory structure, it is quite easy to load new data on a partition-by-partition basis (immutable data pipeline).<\/p>\n<p>It usually happens that any sort of batch processing is done on a schedule, such as daily, weekly or monthly. If we have our data partitioned also by any of those attributes, then it\u2019s really only about replacing a proper directory with new data.<\/p>\n<p>Unfortunately, we have to handle this on our own, since Spark does not yet provide an easy way of overwriting only selected partitions. If we take a look at <a href=\"http:\/\/spark.apache.org\/docs\/latest\/api\/scala\/index.html#org.apache.spark.sql.DataFrameWriter\">DataFrameWriter scala API<\/a> we can see following methods (there\u2019s more but we want to focus on these ones):<\/p>\n<ul>\n<li><span style=\"font-family: consolas;\">partitionBy(colNames: String*): DataFrameWriter<\/span> \u2013 partitions the output by the given columns on the file system, in exactly the same way as we already described<\/li>\n<li><span style=\"font-family: consolas;\">mode(saveMode: SaveMode): DataFrameWriter<\/span> \u2013 specifies the saving behavior when data or table already exists<\/li>\n<\/ul>\n<p>and options we got for SaveMode are:<\/p>\n<ul>\n<li><span style=\"font-family: consolas;\">Overwrite<\/span> \u2013 overwrite the existing data,<\/li>\n<li><span style=\"font-family: consolas;\">Append<\/span> \u2013 append the data,<\/li>\n<li><span style=\"font-family: consolas;\">Ignore<\/span> \u2013 ignore the operation,<\/li>\n<li><span style=\"font-family: consolas;\">ErrorIfExists<\/span> \u2013 default option, throws an exception at runtime if it already exists<\/li>\n<\/ul>\n<p>At first glance, it may look as if we got all we need, because <span style=\"font-family: consolas;\">partitionBy<\/span> will do the trick and we don\u2019t have to take care of anything. However, it turns out that we can\u2019t rely on it and replace only selected partitions. What I mean by that is that, if you load new not-yet-partitioned data into <span style=\"font-family: consolas;\">DataFrame<\/span> for a given batch window, you have literally two options when you try to save it into your existing production table by calling the <span style=\"font-family: consolas;\">partitionBy<\/span> method:<\/p>\n<ul>\n<li>Overwrite your existing table with <span style=\"font-family: consolas;\">Overwrite<\/span> option &#8211; watch out, it will delete or your existing data permanently!<\/li>\n<li>Append the new data to the existing table with <span style=\"font-family: consolas;\">Append<\/span> option \u2013 watch out, it will create duplicates if you are trying to re-load an existing partition! To overcome that, you would need to load existing data for those partitions, merge with new data, remove existing data from existing table and finally append merged data to that table. It\u2019s quite complicated, isn\u2019t it? but it more or less follows the way we do it in RDBMS with partitions in place.<\/li>\n<\/ul>\n<p>Anyway, for our immutable data pipeline, it looks as if neither option does the right job for us. What we really want is sort of <span style=\"font-family: consolas;\">OverwritePartition<\/span> option that would only replace the selected partition data. Hopefully, in the future Spark will provide such option, but for the time being we have to handle this on our own.<\/p>\n<p>It literally means we have to write new data on a partition-by-partition basis, directly to a proper partition directory. For example, if we got new data for <span style=\"font-family: consolas;\">2016\/1\/1<\/span>, we need to save it to <span style=\"font-family: consolas;\">\/path\/to\/my_table\/Year=2016\/Month=1\/Day=1<\/span>. That also requires implementing some sort of utility code that will be responsible for building proper partition file paths.<\/p>\n<p>In some systems, it may be the case that your batch window date (technical date) has really nothing to do with the actual business date which describes the real events. These dates along with events data are in the ingested file. For example, on a daily basis, you may get data from different business dates, such as sales that happened for few past days.<\/p>\n<p>Luckily, we can still apply the same concept, which is to decouple those two dates and add them individually to our partition schema hierarchy, where the batch window date would be at the very bottom of our partition schema hierarchy. Having had this technical batch window date we will be able to keep our data pipeline immutable and replace the partitions in the same way we described above.<\/p>\n<p>Apart from being able to reload snapshot data on a partition basis very easily, such an approach also allows you to do some analysis on a partition basis as well. For example, if your business requires you to validate daily whether you have any duplicates, then you just have to run your analytics with daily partition data iteratively, which requires way less memory than your all existing data in the table. Can you imagine a hash algorithm removing duplicates on a set with petabytes; that will grow up in the course of time even more? Additionally, just by looking at your structure directly from your partition schema, you can immediately say what kind of data you have at the moment. I\u2019m also sure you don\u2019t want to put all your data into a single container.<\/p>\n<h2>Conclusions<\/h2>\n<p>Hopefully, I\u2019ve shed some light on the importance of partitioning your data even in Big Data ecosystem. Some may disagree, saying that you can scale your cluster &amp; storage to mitigate performance issues, but hey, shouldn\u2019t we solve software issues firstly? Making your software right has the potential to sky-rocket your performance; which is often not possible throwing hardware at a performance problem. We\u2019re also obliged to do things right, maybe not perfect, but definitely wisely, using our craftsmanship. Apart from the performance gains, it also allows you to keep your data in a structured manner that make some things simpler.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Whether you are running an RDBMS, or a Big Data system, it is important to consider your data-partitioning strategy. As the volume of data grows, so it becomes increasingly important to match the way you partition your data to the way it is queried, to allow &#8216;pruning&#8217; optimisation. When you have huge imports of data to consider, it can get complicated. Bartosz  explains how to get things right; not perfect but wisely.&hellip;<\/p>\n","protected":false},"author":243195,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[137094],"tags":[],"coauthors":[26505],"class_list":["post-68457","post","type-post","status-publish","format-standard","hentry","category-big-data"],"acf":[],"_links":{"self":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts\/68457","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/users\/243195"}],"replies":[{"embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/comments?post=68457"}],"version-history":[{"count":6,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts\/68457\/revisions"}],"predecessor-version":[{"id":90067,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts\/68457\/revisions\/90067"}],"wp:attachment":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/media?parent=68457"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/categories?post=68457"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/tags?post=68457"},{"taxonomy":"author","embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/coauthors?post=68457"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}