Dr. Pucketlove – Or, How I Learned to Stop Worrying and Love Parquet (partitioning)
11 min. Read
Pucket is a Partitioning System For Parquet
» Parquet + Bucket = Pucket.
Pucket is a Scala library which provides a simple partitioning system for Parquet. But what is Parquet and why does it need partitioning when it already supports filtering? In this post I will attempt to explain Parquet, partitioning in Hadoop, and the motivation and design of Pucket. If you’re not interested in the background, you can skip straight to some simple code examples or go to the GitHub repository.
BACKGROUND
What is Parquet and Why Should I Care?
Parquet is a binary columnar storage format for Hadoop created by Cloudera and Twitter designed to store large volumes of analytics data. Here’s a description plucked straight from the website:
» Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.1
This a little bit vague, so let me try to answer the important question: Why do we want to use Parquet to store data in Hadoop? Parquet stores data as columnar records in files. Columns in this case are essentially nested key values with strongly typed primitive values. Parquet also encourages compressing of the dataset, doing this will reduce your storage footprint and increase your IO throughput at the cost of CPU cycles2. Parquet files also contain metadata which promote efficient seeking within the data3. Both of these properties combined enable Parquet to have a small storage footprint and quick access time when compared to simple sequence files.
» Essentially Parquet is a NoSQL column store without the need for a query engine runtime.
WHY MIGHT DATA NEED PARTITIONING?
The point of data partitioning is to be able to quickly isolate a portion of your data set so that you don’t have to seek through the whole lot to find the part of the data you are interested in. Partitioning generally falls into two types; horizontal and vertical. Horizontal partitioning splits your data by values, and vertical partitioning splits your data by field structure. The pattern for doing this is known as a partitioning scheme.
For example a partitioning scheme could be created to partition the data by date and data type, combining both horizontal and vertical partitioning types.
Given that Parquet provides filtering natively, why should it be necessary to use partitions in the filesystem itself? First, let me explain what a partition means in the Hadoop filesystem world:
WHAT DO PARTITIONS LOOK LIKE ON A HADOOP FILE SYSTEM?
Very simply partitions are directories in the filesystem under which data is stored. That’s it. The structure of these directories should take on a consistent pattern based on the partitioning scheme.
DOES PARQUET ACTUALLY NEED PARTITIONING?
As with a lot of questions, the answer to this is “it depends”. If you want to perform operations involving every piece of data in your entire dataset then you would not need partitioning, or indeed filtering. If you want to perform lots of complex filtering operations ad-hoc, there’s little point partitioning your data. However if your operations follow a similar pattern, like a certain set of operations which always run on a certain portion of the data, it makes sense to partition your data accordingly.
So if you think partitioning is for you then read on…
PUCKET DESIGN
Pucket has been designed to be a simple wrapper around Parquet, following the design principles below
Simple programming interface for writing custom partitioning schemes
Functionally orientated – controlled side effects and no mutable state
Limited set of core functionality
Limited abstraction from underlying frameworks – don’t attempt to hide necessary complexity
With Pucket we aim to provide the following high-level features, broken down into single responsibility modules:
Simple set of functionality for bucketing data and maintaining schemas
Filesystem level partitioning of Parquet files
Incremental writing of Parquet files with checkpoints
Integration with MapReduce and Spark
PUCKET HIGH-LEVEL CONCEPTS AND USAGE CONSIDERATIONS
Pucket’s implementation is centered around a few key concepts described below. These may be a one-time implementation in the core functionality or is partially implemented and requires specific implementation per data format. Current Pucket supports Avro and Thrift, but can be easily extended to support Protocol Buffers.
PUCKET
(Implementation per format)
This is a partially implemented trait which contains information on the data in a certain directory (or bucket) on the filesystem. It has a few simple operations for creating a new instance:
Create – create a new Pucket by writing a descriptor to the filesystem: if a pucket already exists at the location an error will be returned
Find (apply function) – return a Pucket which is known to be at a certain location on the filesystem, if it does not exist an error will be returned
Find or create – return an existing Pucket or create a new one: will return an error if the existing Pucket’s descriptor does not match the one provided
Once an instance is created the following operations can be performed on it:
Reader – obtain a reader for the Pucket
Writer – obtain a simple writer for the Pucket
Absorb – move another Pucket’s data into this one (provided they are the same structure)
It also holds configuration for the default Parquet block size: i.e. the amount of data the underlying Parquet writer will hold in memory before flushing it to disk.
DESCRIPTOR
The Pucket descriptor is a class which describes the structure of the partitions and the data within the Pucket. It is written to the filesystem as JSON on creation and read when the Pucket is located. The descriptor on disk contains the following information:
Schema format
Compression used
Partitioner (optional)
PARTITIONER
The partitioner is a trait which describes the partitioning scheme, to be implemented by the user according to the requirements for partitioning their data. The class name of the partitioner is stored in the descriptor, so the implementation can change. While it is not recommended to change the implementation on an existing Pucket, your data will still be accessible for reading in the old scheme.
WRITER
There are a few implementations of writer for Pucket, each performing a different type of writing functionality. Each type is described below and code examples can be seen in the TL;DR section.
Simple Writer (Implementation per format) – a functional wrapper around the standard implementations of Parquet writers
Incremental Writer – a wrapper around the simple writer, which rolls a file when a configured number of objects have been written. It keeps a checkpoint of the point at which a file was last finalised. It is important to tune the roll limit based on expected size of each data object. This should be a balance of the number of objects you are prepared to lose per checkpoint versus the number of small files on the filesystem, given Hadoop’s default block size.
Partitioned Writer – a wrapper around the simple writer which uses the partitioner implementation to write data out to sub directories of the Pucket. The writer instances are kept in a cache with a configurable size. When the cache is full the least recently used writer will be removed and closed. It is important to make sure you balance the writer cache size with your memory constraints and number of partitions you expect to be writing to concurrently, as opening and closing writers is an expensive operation. You should also be aware of the Parquet block size configuration in the Pucket. By default each writer will hold 50mb in memory before flushing to disk.
Incremental Partitioned Writer – a wrapper around the incremental writer which provides partitioning. The same tuning constraints apply to this as with the incremental and partitioned writers.
READER
On setting out to implement Pucket there were no plans to implement a reader, however the standard Parquet reader cannot read files in subdirectories and assumes that all files in a given directory are Parquet format. Therefore we had to clone the functionality in the main Parquet reader and change it to allow reading of files in subdirectories. Note that the Parquet input format for MapReduce can cope with parquet files in subdirectories so does not need to use this reader.
ALTERNATIVES TO PUCKET
There’s always a wheel to be redesigned in the world so why not Hadoop storage partitioning? There are surprisingly few projects which do this, probably because it is a relatively simple problem. A discussion of two major alternatives to Pucket are discussed below with reasons as to why we are not using, or contributing to them.
Pail
We have been heavy users of Pail in the past for partitioning our data, however this project is rarely updated and appears to be abandonware. It also uses version 1 of the MapReduce api. So why not modify it to work with Parquet?
Old library dependencies make it difficult to upgrade
Complex implementation of simple functionality: the distributed copy functionality, while useful, requires a job to be submitted to MapReduce to operate
Lots of side-effecting code which needs cleaning up
Attempts to modify Pail resulted in frustration as it is strongly tied to sequence files
KiteSDK
KiteSDK is the major leader in working with Parquet, it is developed and maintained by Cloudera. It provides quite a large set of functionality for maintaining Parquet data in Hadoop and Hive. One major issue we had with KiteSDK was that it only supports Avro for schema definition.
We had been using Thrift for our schema in Pail, and seriously considered switching to Avro just so that we could use KiteSDK. We found that switching to Avro would mean that one of the features of Thrift, which we love and use heavily, was not supported in the same way in Avro: unions. Looking into the KiteSDK source code, decoupling Avro from the core functionality would be a lot of work and it would be impossible to implement in an abstract way given that the functionality relies on Avro features.
Here are the reasons we chose not to contribute to KiteSDK:
Tied to Avro for schema definition
Complex codebase with many levels of abstraction
Lots of abstraction from underlying frameworks
Large set of core functionality
Given the issues we had with both of these libraries and our own requirements we opted for our own implementation. This has left us free to perform the implementation in Scala and keep to our aforementioned design principles.
TL;DR. SHOW ME SOME CODE
Pucket
PucketDescriptor
Partitioner
Reader
Writer
The Scalaz implementation of Either, known as disjunction4 (\/), is used heavily within the Pucket code. This is a proper Monad which allows it to be used in a flat map or for comprehension. This enables a happy path and sad path to be accounted for when performing any side-effecting operation. Every operation requiring interaction with the Hadoop filesystem will return a disjunction.
In the examples below disjunction is used with the implicit either class in Scalaz syntax package, which allows .left or .right operations to lift objects into the appropriate side of the disjunction. To use this the following imports must be included in implementing classes:
import scalaz.\/
import scalaz.syntax.either._
CREATING A PUCKET
The following examples use the imports listed below:
val fs = FileSystem.get()
val path = new Path("/path/to/Pucket")
Create or find a Thrift Pucket:
val thriftDescriptor: ThriftPucketDescriptor[ThriftData] =
ThriftPucketDescsriptor[ThriftData](classOf[ThriftData],
CompressionCodecName.SNAPPY,
Some(ThriftPartitioner))
// Create a new Pucket on /path/to/Pucket, writing the descriptor in place
// Operation will fail if a Pucket already exists on that path
val newThriftPucket: Throwable \/ Pucket[ThriftData] =
ThriftPucket.create[ThriftData](path, fs, thriftDescriptor)
// Find an existing Pucket at a certain path
// Operation will fail if no Pucket exists on that path or the schema does not match the one provided
val existingThriftPucket: Throwable \/ Pucket[ThriftData] =
ThriftPucket[ThriftData](path, fs, classOf[ThriftData])
// Find an existing or create a new Pucket on a certain path
// Operation will fail if the Pucket exists and the Pucket descriptor on the filesystem matches the one provided
val maybeExistingThriftPucket: Throwable \/ Pucket[ThriftData] =
ThriftPucket.findOrCreate[ThriftData](path, fs, thriftDescriptor)
Create or find an Avro Pucket:
val avroDescriptor: AvroPucketDescriptor[AvroData] =
AvroPucketDescriptor[AvroData](AvroData.getClassSchema,
CompressionCodecName.SNAPPY,
Some(AvroPartitioner))
val newAvroPucket: Throwable \/ Pucket[AvroData] =
AvroPucket.create[AvroData](path, fs, avroDescriptor)
val existingAvroPucket: Throwable \/ Pucket[AvroData]
AvroPucket[AvroData](path, fs, AvroData.getClassSchema)
val maybeExistingAvroPucket: Throwable \/ Pucket[AvroData] =
AvroPucket.findOrCreate[AvroData](path, fs, avroDescriptor)
WRITING TO A PUCKET
// Write function which fails fast on error
def write[Ex](data: Seq[T],
writer: Ex \/ Writer[T, Ex]): Ex \/ Writer[T, Ex] =
data.foldLeft(writer)( (w, i) =>
w.fold(ex => return ex.left, _.write(i)))
Plain Writer
def writeMeSomeData[T](data: Seq[T],
Pucket: Throwable \/ Pucket[T]): Throwable \/ Unit =
for {
p <- pucket
writer <- p.writer
finishedWriter <- write[Throwable](data, writer)
_ <- finishedWriter.close
} yield ()
Incremental Writer
def writeMeSomeDataIncrementally[T](data: Seq[T],
Pucket: Throwable \/ Pucket[T]): (Long, Throwable) \/ Unit =
for {
p <- pucket.leftMap((0, _))
writer <- IncrementalWriter(p, 100) // 100 indicates the number of writes before the file is rolled
finishedWriter <- write[(Long, Throwable)](data, writer)
_ <- finishedWriter.close
} yield ()
Partitioned Writer
def writeMeSomePartitionedData[T](data: Seq[T],
Pucket: Throwable \/ Pucket[T]): Throwable \/ Unit =
for {
p <- pucket
writer <- PartitionedWriter(p).right
finishedWriter <- write[Throwable](data, writer)
_ <- finishedWriter.close
} yield ()
The reader behaves in a similar way to the writer in that each read returns a new instance of the reader with an updated state. However as well as a new reader instance, an option of the data item is returned. The example below is an implementation which will read a certain number of records into a scala Seq or fail with a Throwable. If there is an error encountered in the read process then the code will fail fast and return the throwable in the left side of the disjunction. If the output from the pucket is exhausted then it will close the reader and return the result in the right side of the disjunction.
def readData[T](count: Int, pucket: Pucket[T]): Throwable \/ Seq[T] =
pucket.reader.flatMap(reader =>
0.to(count).foldLeft((Seq[T](), reader).right[Throwable])( (acc, _) =>
acc.fold(ex => return ex.left,
dataAndReader => dataAndReader._2.read.fold(ex => return ex.left,
optionalDataAndReader => (
optionalDataAndReader._1.fold(
// if the output is exhausted, then close the reader and return the state
return optionalDataAndReader._2.close.map(_ => dataAndReader._1)
//if the data is present in the output then append the data to the state and include the updated writer state
)(dataAndReader._1 ++ Seq(_)), optionalDataAndReader._2).right[Throwable]
)
)
)
).flatMap(dataAndReader => dataAndReader._2.close.map(_ => dataAndReader._1))
More examples
For more detailed examples such as working with MapReduce and Spark please see the Pucket GitHub repository.
References
Cropped image of partition by Thomas Leth-Olsen is licensed under CC BY 2.0
Diversity and inclusion are values we really believe in at Intent HQ, so we were thrilled to be recognised for our efforts with a ‘Best Tech Employer’ award shortlisting from Women in Tech.
Best practice is to use all three, of course. Facts, generalizations, and inferences are defined as personal data whenever they are attributed or attributable to a customer. They are regulated under laws such as GDPR in Europe and CPRA in California, so are rightly part of a conversation about customer data protection and privacy. Using […]
The telecoms industry holds some of the most extensive and detailed customer data of any sector. One major advantage for these firms is that the data they possess comes directly from their customers.
Intent HQ is delighted to have gained industry wide recognition for excellence in Diversity and Inclusion (D&I), after winning ‘Best Tech Employer’ in the SMB category at this year’s Women in Tech Awards.