Last summer I had the pleasure to review a really interesting book about Spark written by Holden Karau for PacktPub. She is a really smart woman currently software development engineer at Google, active in Spark‘s developers community. In the past she worked for MicrosoftAmazon and Foursquare.

Spark is a framework for writing fast, distributed programs. It’s similar to Hadoop MapReduce but uses fast in-memory approach. Spark ecosystem incorporates an inbuilt tools for interactive query analysis (Shark), a large-scale graph processing and analysis framework (Bagel), and real-time analysis framework (Spark Streaming). I discovered them a few months ago exploring the extended Hadoop ecosystem.

The book covers topics about how to write distributed map reduce style programs. You can find everything you need: setting up your Spark cluster, use the interactive shell and write and deploy distributed jobs in Scala, Java and Python. Last chapters look at how to use Hive with Spark to use a SQL-like query syntax with Shark, and manipulating resilient distributed datasets (RDDs).

Have fun reading it! 😀

Fast data processing with Spark
by Holden Karau

fast_data_processing_with_spark_cover

The title is also listed into Research Areas & Publications section of Google Research portal: http://research.google.com/pubs/pub41431.html

klout_logo

According to WikipediaKlout is “a website and mobile app that uses social media analytics to rank its users according to online social influence via the “Klout Score“, which is a numerical value between 1 and 100“.

This is not so different from what I try to do everyday. They get signals from social networks, process them in order to extract relevant data and show some diagrams and a synthetic index of user influence. It’s really interesting for me observe how their data is stored and processed.

At Hadoop Summit 2012, Dave Mariani (by Klout) and Denny Lee (by Microsoft) presented the Klout architecture and shown the following diagram:

klout_architecture

It shows many different technologies, a great example of polyglot persistence 🙂

Klout uses a lot of Hadoop. It’s used to collect signals coming from different Signal Collectors (one for each social network i suppose). Procedure to enhance data are written using Pig and Hive used also for data warehouse.

Currently MySQL is used only to collect user registrations, ingested into the data warehouse system. In the past they use it as bridge between the data warehouse and their “Cube“, a Microsoft SQL Server Analysis Services (SSAS). They use it for Business Intelligence with Excel and other custom apps. On 2011 data were migrated using Sqoop. Now they can leverage on Microsoft’s Hive ODBC driver and MySQL isn’t used anymore.

Website and mobile app are based on the Klout API. Data is collected from the data warehouse and stored into HBase (users profile and score) and MongoDB (interaction between users). ElasticSearch is used as search index.

Most of custom components are written in Scala. The only exception is the website, written in Javascript/Node.js.

In the end Klout is probably the biggest company working both using open source tools coming from the Hadoop ecosystem and Microsoft tools. The Hadoop version for Windows Azure, developed in pair with Hortonworks, is probably the first product of this collaboration.

Sources

Last month while I was inspecting the Hadoop ecosystem I found many other software related to big-data. Below the (incomplete again) list.

N.B. Informations and texts are taken from official websites or sources referenced at the end of the article.

facebook_scribe_logoFacebook Scribe (Github)

Scribe is a server for aggregating log data streamed in real time from a large number of servers. It is designed to be scalable, extensible without client-side modification, and robust to failure of the network or any specific machine.”

facebook_scribe_diagramScribe servers are arranged in a directed graph, with each server knowing only about the next server in the graph. This network topology allows for adding extra layers of fan-in as a system grows, and batching messages before sending them between datacenters, without having any code that explicitly needs to understand datacenter topology, only a simple configuration.
Scribe was designed to consider reliability but to not require heavyweight protocols and expansive disk usage.

Facebook McDipper

McDipper is a highly performant flash-based cache server that is Memcache protocol compatible.”

Facebook heavily use Memcached in early stages and when RAM’s cost became too expensive had to find a different solution. This is why McDipper was born. It is not different then other projects like Fatcache or Edis. As many DBMS did, they try to mimic an in-memory caching engine in order to extend available space. SSD are fast enough to have no problem about speed loss.

Facebook Haystack

“The new photo infrastructure merges the photo serving tier and storage tier into one physical tier. It implements a HTTP based photo server which stores photos in a generic object store called Haystack.”

A system specifically designed to serve static content using HTTP as fast as possible. Haystack can be broken down into these functional layers –

  • HTTP server
  • Photo Store
  • Haystack Object Store
  • Filesystem
  • Storage

I talked about it in a previous post.

Facebook Memcached (Github)

“Here at Facebook, we’re likely the world’s largest user of memcached. We use memcached to alleviate database load. memcached is already fast, but we need it to be faster and more efficient than most installations.”

Twitter Storm (Github)

Storm is a distributed realtime computation system. Similar to how Hadoop provides a set of general primitives for doing batch processing, Storm provides a set of general primitives for doing realtime computation.”

twitter_storm_topologyThere are just three abstractions in Storm: spouts, bolts, and topologies.

A spout is a source of streams in a computation. Typically a spout reads from a queueing broker such as Kestrel, RabbitMQ, or Kafka, but a spout can also generate its own stream or read from somewhere.

A bolt processes any number of input streams and produces any number of new output streams. Most of the logic of a computation goes into bolts, such as functions, filters, streaming joins, streaming aggregations, talking to databases, and so on.

A topology is a network of spouts and bolts, with each edge in the network representing a bolt subscribing to the output stream of some other spout or bolt.

Twitter Snowflakes (Github)

Snowflake is a network service for generating unique ID numbers at high scale with some simple guarantees.”

Twitter needed something that could generate tens of thousands of ids per second in a highly available manner. This naturally led us to choose an uncoordinated approach.

These ids need to be roughly sortable, meaning that if tweets A and B are posted around the same time, they should have ids in close proximity to one another since this is how we and most Twitter clients sort tweets. Additionally, these numbers have to fit into 64 bits.

To generate them in an uncoordinated manner, we settled on a composition of: timestamp, worker number and sequence number. Sequence numbers are per-thread and worker numbers are chosen at startup via zookeeper (though that’s overridable via a config file).

Twitter Fatcache (Github)

Fatcache is memcache on SSD. Think of fatcache as a cache for your big data.”

SSD-backed memory presents a viable alternative for applications with large workloads that need to maintain high hit rate for high performance.

Like Facebook McDipper, Fatcache try to overcome memory limitations. It maintains an in-memory index for all data stored on disk. An in-memory index serves two purposes: cheap object existence checks and on-disk object address storage.

To minimize the number of small, random writes, fatcache treats the SSD as a log-structured object store. All writes are aggregated in memory and written to the end of the circular log in batches – usually multiples of 1 MB.

Twitter FlockDB (Github)

FlockDB is a database that stores graph data, but it isn’t a database optimized for graph-traversal operations. Instead, it’s optimized for very large adjacency lists, fast reads and writes, and page-able set arithmetic queries.”

flockdb_diagramIt is a distributed graph database for storing adjacency lists, with goals of supporting:

  • a high rate of add/update/remove operations
  • potientially complex set arithmetic queries
  • paging through query result sets containing millions of entries
  • ability to “archive” and later restore archived edges
  • horizontal scaling including replication
  • online data migration

Non-goals include: multi-hop queries (or graph-walking queries), automatic shard migrations

FlockDB is much simpler than other graph databases such as Neo4j because it tries to solve fewer problems. It scales horizontally and is designed for on-line, low-latency, high throughput environments such as web-sites.

twemcache_logoTwitter Twemcache (Github)

“We built Twemcache because we needed a more robust and manageable version of Memcached, suitable for our large-scale production environment.”

Apache Kafka (Github)

Kafka is publish-subscribe messaging rethought as a distributed commit log.”

apache_kafka_diagramIt is designed to support the following

  • Persistent messaging with O(1) disk structures that provide constant time performance even with many TB of stored messages.
  • High-throughput: even with very modest hardware Kafka can support hundreds of thousands of messages per second.
  • Explicit support for partitioning messages over Kafka servers and distributing consumption over a cluster of consumer machines while maintaining per-partition ordering semantics.
  • Support for parallel data load into Hadoop.

Kafka is aimed at providing a publish-subscribe solution that can handle all activity stream data and processing on a consumer-scale web site.

apache_gora_logoApache Gora (Github)

Gora is an open source framework provides an in-memory data model and persistence for big data. Gora supports persisting to column stores, key value stores, document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce support.”

Apache Mesos (Github)

Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications, or frameworks. It can run Hadoop, MPI, Hypertable, Spark”

apache_mesos_structureMain features are

  • Fault-tolerant replicated master using ZooKeeper.
  • Scalability to 10,000s of nodes using fast, event-driven C++ implementation.
  • Isolation between tasks with Linux Containers.
  • Multi-resource scheduling (memory and CPU aware).
  • Efficient application-controlled scheduling mechanism.
  • Java, Python and C++ APIs for developing new parallel applications.
  • Web UI for viewing cluster state.

apache_spark_logoApache Spark (Github)

Spark is an open source cluster computing system that aims to make data analytics fast  both fast to run and fast to write. To run programs faster, Spark provides primitives for in-memory cluster computing: your job can load data into memory and query it repeatedly much more quickly than with disk-based systems like Hadoop MapReduce.”

Spark was initially developed for two applications where keeping data in memory helps: iterative algorithms, which are common in machine learning, and interactive data mining. In both cases, Spark can run up to100x faster than Hadoop MapReduce. However, you can use Spark for general data processing too.

Shark: Hive on Spark (Github)

Shark is a large-scale data warehouse system for Spark designed to be compatible with Apache Hive. It can execute Hive QL queries up to 100 times faster than Hive without any modification to the existing data or queries. Shark supports Hive’s query language, metastore, serialization formats, and user-defined functions, providing seamless integration with existing Hive deployments and a familiar, more powerful option for new ones.”

shark_spark_diagramShark is built on top of Spark, a data-parallel execution engine that is fast and fault-tolerant. Even if data are on disk, Shark can be noticeably faster than Hive because of the fast execution engine. It avoids the high task launching overhead of Hadoop MapReduce and does not require materializing intermediate data between stages on disk. Thanks to this fast engine, Shark can answer queries in sub-second latency.

fluentd_logoFluentd (Github)

Fluentd is an open-source tool to collect events and logs. 150+ plugins instantly enables you to store the massive data for Log Search, Big Data Analytics, and Archiving (MongoDB, S3, Hadoop)”

fluentd_diagram

The fundamental problem with logs is that they are usually stored in files although they are best represented as streams (by Adam Wiggins, CTO at Heroku). Traditionally, they have been dumped into text-based files and collected by rsync in hourly or daily fashion. With today’s web/mobile applications, this creates two problems.

  1. Need ad-hoc parsing: The text-based logs have their own format, and analytics engineer need to write a dedicated parser for each format. But that’s probably not the best use of your time. You should be analyzing data to make better business decisions instead of writing one parser after another.
  2. Lacks Freshness: The logs lag. The realtime analysis of user behavior makes feature iterations a lot faster. A nimbler A/B testing will help you differentiate your service from competitors.

This is where Fluentd comes in. We believe Fluentd solves all issues of scalable log collection by getting rid of files and turning logs into true semi-structured data streams.

kestrel_logo

Kestrel (Github)

“Kestrel is a very simple message queue that runs on the JVM. It supports multiple protocols: memcache: the memcache protocol, with some extensions, thrift: Apache Thrift-based RPC, text: a simple text-based protocol”

A cluster of kestrel servers is like a memcache cluster: the servers don’t know about each other, and don’t do any cross-communication, so you can add as many as you like. The simplest clients have a list of all servers in the cluster, and pick one at random for each operation. In this way, each queue appears to be spread out across every server, with items in a loose ordering. More advanced clients can find kestrel servers via ZooKeeper.

impala_logoCloudera Impala (Github)

Impala is an open source Massively Parallel Processing (MPP) query engine that runs natively on Apache Hadoop”

With Impala, you can query data, whether stored in HDFS or Apache HBase – including SELECT, JOIN, and aggregate functions – in real time.

To avoid latency, Impala circumvents MapReduce to directly access the data through a specialized distributed query engine that is very similar to those found in commercial parallel RDBMSs. The result is order-of-magnitude faster performance than Hive, depending on the type of query and configuration. (See FAQ below for more details.) Note that this performance improvement has been confirmed by several large companies that have tested Impala on real-world workloads for several months now.

Structure diagram:

cloudera_impala_diagram

HadoopDB

HadoopDB is an Architectural Hybrid of MapReduce and DBMS Technologies for Analytical Workloads”

HadoopDB is:

  • A hybrid of DBMS and MapReduce technologies that targets analytical workloads
  • Designed to run on a shared-nothing cluster of commodity machines, or in the cloud
  • An attempt to fill the gap in the market for a free and open source parallel DBMS
  • Much more scalable than currently available parallel database systems and DBMS/MapReduce hybrid systems.
  • As scalable as Hadoop, while achieving superior performance on structured data analysis workloads

hypertable_logoHypertable

Hypertable is an open source database system inspired by publications on the design of Google’s BigTable. The project is based on experience of engineers who were solving large-scale data-intensive tasks for many years.”

This project is for the design and implementation of a high performance, scalable, distributed storage and processing system for structured and unstructured data. It is designed to manage the storage and processing of information on a large cluster of commodity servers, providing resilience to machine and component failures. Data is represented in the system as a multi-dimensional table of information. The data in a table can be transformed and organized at high speed by performing computations in parallel, pushing them to where the data is physically stored.

Sources:

database_venn_diagram

Last week I found this diagram on @al3xandru‘s MyNoSQL blog and I was surprise of how many softwares I never heard before.

From the diagram are missing many other softwares such as NuoDB (NewSQL), Aerospike (Key-Value), Titan (Graph), FoundationDB (Key-Values) Apache Accumulo (Key-Value), Apache Giraph (Graph) and more and includes some companies (like Cloudera, MapR and Xeround) also if they didn’t develop a custom version but just fork and maintain the main one.

Anyway it seems one of the best visual representation of the current database world and I’m going to use it as base to an updated and more detailed version 😉

Sources: 

Few days after Google released its papers on 2003 many developers started implement them. Apache Hadoop is the biggest result of that implementation. Around Hadoop many other technologies was born and the Apache Software Foundation helped the most promising to grow up. Below there is an (incomplete) list of the Hadoop-related softwares.

Apache Hadoop (HDFS, MapReduce)

apache_hadoop_logoHadoop is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage.

The Hadoop Distributed File System (HDFS) offers a way to store large files across multiple machines. Hadoop and HDFS was derived from Google’s MapReduce and Google File System (GFS) papers.

Apache Hive (Github)

apache_hive_logoHive is a data warehouse system for Hadoop […] Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL.”

MapReduce paradigm is extremely powerful but programmers use SQL to query data from years. HiveQL is a SQL-like language to query data over the Hadoop filesystem.

An example of HiveQL:

CREATE TABLE page_view(viewTime INT, userid BIGINT,
page_url STRING, referrer_url STRING,
friends ARRAY<BIGINT>, properties MAP<STRING, STRING>
ip STRING COMMENT 'IP Address of the User')
COMMENT 'This is the page view table'
PARTITIONED BY(dt STRING, country STRING)
CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '1'
COLLECTION ITEMS TERMINATED BY '2'
MAP KEYS TERMINATED BY '3'
STORED AS SEQUENCEFILE;

Apache Pig (Github)

pig_logoPig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. […] Pig’s infrastructure layer consists of a compiler that produces sequences of Map-Reduce programs. […] Pig’s language layer currently consists of a textual language called Pig Latin

If you don’t like SQL maybe you prefer a sort of procedural language. Pig Latin is different than HiveQL but have the same purpose: query data.

An example of Pig Latin:

set default_parallel 10;
daily   = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close);
bysymbl = group daily by symbol;
average = foreach bysymbl generate group, AVG(daily.close) as avg;
sorted  = order average by avg desc;

Apache Avro (GitHub)

avro_logoAvro is a data serialization system.

It’s a framework for performing remote procedure calls and data serialization. It can be used to pass data from one program or language to another (e.g. from C to Pig). It is particularly suited for use with scripting languages such as Pig, because data is always stored with its schema in Avro, and therefore the data is self-describing.

Apache Chukwa (Github)

chukwa_logoChukwa is an open source data collection system for monitoring large distributed systems. It’s built on top of the Hadoop Distributed File System (HDFS) and Map/Reduce framework and inherits Hadoop’s scalability and robustness.

It’s used to process and analyze generated logs and has different components:
  • Agents that run on each machine to collect the logs generated from various applications.
  • Collectors that receive data from the agent and write it to stable storage
  • MapReduce jobs or parsing and archiving the data.

chukwa_structure

Apache Drill (Github)

drill_logoDrill is a distributed system for interactive analysis of large-scale datasets, based on Google’s Dremel. Its goal is to efficiently process nested data. It is a design goal to scale to 10,000 servers or more and to be able to process petabyes of data and trillions of records in seconds.

Idea behind Drill is to build a low-latency execution engine, enabling interactive queries across billions of records instead of using a batch MapReduce process.

Apache Flume (Github)

flume_logoFlume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

It is a distributed service that makes it very easy to collect and aggregate your data into a persistent store such as HDFS. Flume can read data from almost any source – log files, Syslog packets, the standard output of any Unix process – and can deliver it to a batch processing system like Hadoop or a real-time data store like HBase.

Apache HBase (Github)

hbase_logoHBase is the Hadoop database, a distributed, scalable, big data store.

It is an open source, non-relational, distributed database modeled after Google’s BigTable, is written in Java and provides a fault-tolerant way of storing large quantities of sparse data. HBase features compression, in-memory operation, and Bloom filters on a per-column basis as outlined in the original BigTable paper. Tables in HBase can serve as the input and output for MapReduce jobs run in Hadoop.

Apache HCatalog (Github)

HCatalog is a table and storage management service for data created using Hadoop

Hadoop needs a better abstraction for data storage, and it needs a metadata service. HCatalog addresses both of these issues. It presents users with a table abstraction. This frees them from knowing where or how their data is stored. It allows data producers to change how they write data while still supporting existing data in the old format so that data consumers do not have to change their processes. It provides a shared schema and data model for Pig, Hive, and MapReduce. It will enable notifications of data availability. And it will provide a place to store state information about the data so that data cleaning and archiving tools can know which data sets are eligible for their services.

Apache Mahout (Github)

mahout_logoMahout is a machine learning library’s goal is to build scalable machine learning libraries”

It’s an implementations of distributed machine learning algorithms on the Hadoop platform. While Mahout‘s core algorithms for clustering, classification and batch based collaborative filtering are implemented on top of Apache Hadoop using the map/reduce paradigm, it does not restrict contributions to Hadoop based implementations.

Apache Oozie (Github)

oozie_logoOozie is a workflow scheduler system to manage Apache Hadoop jobs.”

Tasks performed in Hadoop sometimes require multiple Map/Reduce jobs to be chained together to complete its goal.

Oozie is a Java Web-Application that runs in a Java servlet-container and uses a database to store:

  • Workflow definitions
  • Currently running workflow instances, including instance states and variables

Oozie workflow is a collection of actions (i.e. Hadoop Map/Reduce jobs, Pig jobs) arranged in a control dependency DAG (Direct Acyclic Graph), specifying a sequence of actions execution. This graph is specified in hPDL (a XML Process Definition Language).

Apache Sqoop (Github)

sqoop_logoSqoop is a tool designed for efficiently transferring bulk data between Apache Hadoop and structured datastores such as relational databases.”

Sqoop (“SQL-to-Hadoop”) is a straightforward command-line tool with the following capabilities:

  • Imports individual tables or entire databases to files in HDFS
  • Generates Java classes to allow you to interact with your imported data
  • Provides the ability to import from SQL databases straight into your Hive data warehouse

After setting up an import job in Sqoop, you can get started working with SQL database-backed data from your Hadoop MapReduce cluster in minutes.

Apache ZooKeeper (Github)

zookeeper_logoZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications.”

it is an open source, in-memory, distributed NoSQL database, typically used for storing configuration variables.

Apache Giraph (Github)

giraph_logoGiraph is an iterative graph processing system built for high scalability. For example, it is currently used at Facebook to analyze the social graph formed by users and their connections. Giraph originated as the open-source counterpart to Pregel, the graph processing architecture developed at Google”

While it is possible to do processing on graphs with MapReduce, this approach is suboptimal for two reasons:

  1. MapReduce’s view of the world as keys and values is not the greatest way to think of graphs and often requires a significant effort to pound graph-shaped problems into MapReduce-shaped solutions.
  2. Most graph algorithms involve repeatedly iterating over the graph states, which in a MapReduce world requires multiple chained jobs. This, in turn, requires the state to be loaded and saved between each iteration, operations that can easily dominate the runtime of the computation overall.

Giraph attempts to alleviate these limitations by providing a more natural way to model graph problems:

  1. Think like a vertex!
  2. Keep the graph state in memory during the whole of the algorithm, only writing out the final state (and possibly some optional checkpointing to save progress as we go).

Rather than implementing mapper and reducer classes, one implements a Vertex, which has a value and edges and is able to send and receive messages to other vertices in the graph as the computation iterates.

Apache Accumulo (Github)

accumulo_logoAccumulo sorted, distributed key/value store is a robust, scalable, high performance data storage and retrieval system.”

It is a sorted, distributed key/value store based on Google’s BigTable design. Written in Java, Accumulo has cell-level access labels (useful for security purpose) and server-side programming mechanisms called Iterators that allows users to perform additional processing at the Tablet Server.

Apache S4 (Github)

s4_logoS4 is a general-purpose, distributed, scalable, fault-tolerant, pluggable platform that allows programmers to easily develop applications for processing continuous unbounded streams of data.”

Developed by Yahoo (which released the Yahoo’s S4 paper) and then open sourced to Apache. Inspired by MapReduce and Actor model for computation. Basic components are:

  • Processing Element (PE): Basic computational unit which can send and receive messages called Events.
  • Processing Node (PN): The logical hosts to PEs
  • Adapter: injects events into the S4 cluster and receives from it via the Communication Layer.

Apache Thrift (Github)

Thrift software framework, for scalable cross-language services development, combines a software stack with a code generation engine to build services”

It is an interface definition language that is used to define and create services for numerous languages It is used as a remote procedure call (RPC) framework and was developed at Facebook for “scalable cross-language services development”. It combines a software stack with a code generation engine to build services that work efficiently together.

To put it simply, Apache Thrift is a binary communication protocol.

Sources

google_logo

I always underestimated the contribute of Google to the evolution of big-data processing. I used to think that Google only manages and shows some search results.  Not so much data. Not so much as Facebook or Twitter at least…

Obviously I was wrong. Google has to manage a HUGE amount of data and big-data processing was already a problem on 2002! Its contribution to the current processing technologies such as Hadoop and its filesystem HDFS and HBase was fundamental.

We can split contribution into two periods. The first of these (from 2003 to 2008) influenced technologies we are using today. The second (from 2009 since today) is influencing product we are going to use is the near future.

The first period gave us

  • GFS Google FileSystem (PDF paper), a scalable distributed file system for large distributed data-intensive applications which later inspire HDFS
  • BigTable (PDF paper), a columnar oriented database designed to store petabyte of data across large clusters which later inspire HBase
  • the concept of MapReduce (PDF paper), a programming model to process large datasets distributed across large cluster. Hadoop implements this programming model over the HDFS or similar filesystems.

This series of paper revolutionized the strategies behind data warehouse and now all the largest companies uses products, inspired by these papers, we all knows.

The second period is less popular at the moment. Google faced many limits in its previous infrastructure and tried to fix them and move ahead. This behavior gave as many other technologies, some of these not yet completely public:

  • Caffeine, a new search infrastructure who use GFS2, next-generation MapReduce and next-generation BigTable.
  • Colossus, formerly known as Google FileSystem 2 the next generation GFS.
  • Spanner (PDF paper), a scalable, multi-version, globally-distributed, and synchronously-replicated database, the NewSQL evolution of BigTable
  • Dremel (PDF paper), a scalable, near-real-time ad-hoc query system for analysis of read-only nested data, and its implementation for the GAEBigQuery.
  • Percolator (PDF paper), a platform for incremental processing which continually update the search index.
  • Pregel (PDF paper), a system for large-scale graph processing similar to MapReduce for columnar data.

Now market is different than 2002. Many companies such Cloudera and MapR are working hard for big-data and Apache Foundation as well. Anyway Google has 10 years of advantages and its technologies are still stunning.

Probably many of these papers are going to influence the next 10 year. First results are already here. Apache Drill and Cloudera Impala implement the Dremel paper specification, Apache Giraph implements the Pregel one and HBase Coprocessor the Percolator one.

And they are just some examples, a Google search can show you more 😉

Insights:

In previous post I analyzed Facebook main infrastructure. Now I’m going deeper into services.

Facebook Images

Facebook is the biggest photo sharing service in the world and grows by several millions of images every week. Pre-2009 infrastructure uses three NFS tier. Also with some optimization this solution can’t easily scale over a few billions of images.

So in 2009 Facebook develop Haystack, an HTTP based photo server. It is composed by 5 layers: HTTP server, Photo Store, Haystack Object Store, Filesystem and Storage.

Storage is made on storage blades using a RAID-6 configuration who provides adequate redundancy and excellent read performance. The poor write performance is partially mitigated by the RAID controller NVRAM write-back cache. Filesystem used is XFS and manage only storage-blade-local files, no NFS is used.

Haystack Object Store is a simple log structured (append-only) object store containing needles representing the stored objects. A Haystack consists of two files:

the actual haystack store file containing the needles

haystack_content

plus an index file

haystack_index

Photo Store server is responsible for accepting HTTP requests and translating them to the corresponding Haystack store operations. It keeps an in-memory index of all photo offsets in the haystack store file. The HTTP framework we use is the simple evhttp server provided with the open source libevent library.

Insights and Sources

Facebook Messages and Chat

Facebook messaging system is powered by a system called Cell. The entire messaging system (email, SMS, Facebook Chat, and the Facebook Inbox) is divided into cells, and each cell contains only a subset of users. Every Cell is composed by a cluster of application server (where business logic is defined) monitored by different ZooKeeper instances.

Application servers use a data acces layer to communicate with metadata storage, an HBase based system (old messaging infrastructure relied on Cassandra) which contains all the informations related to messages and users.

Cells are the “core” of the system. To connect them to the frontend there are different “entry points”. An MTA proxy parses mail and redirect data to the correct application. Emails are stored in the same structure than photos: Haystack. There are also discovering service to map user-to-Cell (based on hashing) and service-to-Cell (based on ZooKeeper notifications) and everything expose an API.

There is a “dirty” cache based on Memcached to serve messages (from a local cache of datacenter) and social information about the users (like social indexes).

facebook_messages_architecture

The search engine for messages is built using an inverted index stored in HBase.

Chat is based on an Epoll server developed in Erlang and accessed using Thrift and there is also a subsystem for logging chat messages (in C++). Both subsystems are clustered and partitioned for reliability and efficient failover.

Real-time presence notification is the most resource-intensive operation performed (not sending messages): keeping each online user aware of the online-idle-offline states of their friends. Real-time messaging is done using a variation of Comet, specifically XHR long polling, and/or BOSH.

Insights and Sources

Facebook Search

Original Facebook search engine simply searches into cached users informations: friends list, like lists and so on. Typeahead search (the search box on the top of Facebook frontend) came on 2009. It try to suggest you most interesting results. Performances are really important and results must be available within 100ms. It has to be fast and scalable and the structure of the system is built as follow:

typeahead_search

First attempts are still on browser cache where are stored informations about user (friends, like, pages). If cache misses starts and AJAX request.

Many Leaf services search for results inside theirs indexes (stored into an inverted index called Unicorn). When results references are fetched from the indexes, they are merged and loaded from global datastore. An aggregator provide a single channel to send data to client. Obviously query are cached.

On 2012 Facebook starts from the core part of typeahead search to build a new search tool. Unicorn is the core part of the new Graph Search. Formally is an in-memory inverted index which maps Facebook contents as a graph database would do and you can query it as graph traversing tool. To be used for Graph SearchUnicorn was updated to be more than a traversing tool. Now supports nested queries, scoring and support for different kind of resources. Results are aggregated on different levels.

unicorn

Query lifecycle is usually made by 2 steps: the Suggestion Phase and the Search Phase.

The Suggestion Phase works like “autocomplete” and Is powered by a Natural Language Processing (NLP) module attempts to parse text based on a grammar. It identifies parts of the query as potential entities and passes these parts down to Unicorn to search for them.

The Search Phase begins when the searcher has made a selection from the suggestions. The parse tree, along with the fbids of the matched entities, is sent back to the Top Aggregator. A user readable version of this query is displayed as part of the URL.

Currently Graph Search is still in beta.

Insights and Sources

Resources and insights

This post and the previous one are based and inspired by the answer of Michaël Figuière to the following question on Quora: What is Facebook’s architecture?

Additional stuff:

Facebook has one of the biggest hardware infrastructure in the world with more than 60.000 servers. But servers are worthless if you can’t efficiently use them. So how does Facebook works?

Facebook born in 2004 on a LAMP stack. A PHP website with MySQL datastore. Now is powered by a complex set of interconnected systems grown over the original stack. Main components are:

  1. Frontend
  2. Main persistence layer
  3. Data warehouse layer
  4. Facebook Images
  5. Facebook Messages and Chat
  6. Facebook Search

1. Frontend

hiphopThe frontend is written in PHP and compiled using HipHop (open sourced on github) and g++. HipHop converts PHP in C++ you can compile. Facebook frontend is a single 1,5 GB binary file deployed using BitTorrent. It provide logic and template system. Some parts are external and use HipHop Interpreter (to develop and debug) and HipHop Virtual Machine to run HipHop “ByteCode”.

Is not clear which web server they are using. Apache was used in the beginning and probably is still used for main fronted. In 2009 they acquired (and open sourced) Tornado from FriendFeed and are using it for Real-Time updates feature of the frontend. Static contents (such as fronted images) are served through Akamai.

Everything not in the main binary (or in the ByteCode source) is served using using Thrift protocol. Java services are served without Tomcat or Jetty. Overhead required by these application server is worthless for Facebook architecture. Varnish is used to accelerate responses to HTTP requests.

To speed up frontend rendering they use BigPipe, a sort of mod_pagespeed. After a HTTP request servers fetch data and build HTML structure. HTML is sent before data retrieval and is rendered by the browser. After render, Javascript retrieve and display data (already available) in asynchronous way.

memcachedThe Memcached infrastructure is based on more than 800 servers with 28TB of memory. They use a modded version of Memcached which reimplement the connection pool buffer (and rely on some mods on the network layer of Linux) to better manage hundred of thousand of connections.

Insights and Sources:

2. Main Persistence layer

Facebook rely on MySQL as main datastore. I know, I can’t believe it too…

mysql_clusterThey have one of the largest MySQL Cluster deploy in the world and use the standard InnoDB engine. As many other people they designd the system to easìly handle MySQL failure, they simply enforce backup. Recently Facebook Engineers published a post about their 3-level backup stack:

  • Stage 1: each node (all the replica set) of cluster has 2 rack backup. One of these backup binary log (to speed up slave promotion) and one with mysqldump snapshot taken every night (to handle node failure).
  • Stage 2: each binlog and backup are copied in a larger Hadoop cluster mapped using HDFS. This layer provide a fast and distributed source of data useful to restore nodes also in differenti locations.
  • Stage 3: provide a long-term storage copying data from Hadoop cluster to a discrete storage in a separate region.

Facebook collect several statistics about failure, traffic and backups. These statistics contribute to build a “score” of a node. If a node fails or has a score too low an automatic system provide to restore it from Stage 1 or Stage 2. They don’t try to avoid problem, they simply handle them as fast as possible.

Insights and Sources:

3. Data warehouse layer

hadoopMySQL data is also moved to a “cold” storage to be analyzed. This storage is based on Hadoop HDFS (which leverage on HBase) and is queried using Hive. Data such as logging, clicks and feeds transit using Scribe and are aggregating and stored in Hadoop HDFS using Scribe-HDFS.

Standard Hadoop HDFS implementation has some limitations. Facebook adds a different kind of NameNodes called AvatarNodes which use Apache ZooKeeper to handle node failure. They also adds RaidNode that use XOR-parity files to decrease storage usage.

hadoop_cluster

Analysis are made using MapReduce. Anyway analyze several petabytes of data is hard and standard Hadoop MapReduce framework became a limit on 2011. So they developed Corona, a new scheduling framework that separates cluster resource management from job coordination. Results are stored into Oracle RAC (Real Application Cluster).

Insights and Sources:

In the next post I’m going to analyze the other Facebook services: Images, Message and Search.

Yesterday @lastknight was looking for something to store and query a huge graph dataset. He found Titan developed by Aurelius and released last August. It is a distributed graph database who can rely on Apache Cassandra, Apache HBase or Oracle BerkeyDB for storage. It promises to be fully distributed and horizontally scalable, it’s really ambitious and presentation seem really interesting 🙂


Aurelius also develops Faunus, a Apache Hadoop-based graph analytics engine for analyzing massive-scale graphs.

More about Titan and Faunus:

[UPDATE 2013-03-10] Subscription page describe details about software (developed and planned) by Aurelius. The Fulgora processor seems really interesting. http://thinkaurelius.com/subscription/

In the beginning was RediSQL, a “Hybrid Relational-Database/NOSQL-Datastore written in ANSI C”. After a while they changed the name to AlchemyDB.

Everything is built over Redis adding a Lua interpreter to implement a really interesting technique they call Datastore-Side-Scripting. Is like to use stored procedure, putting logic into datastore. They can achieve many different goals using this technique:

  • Implement a SQL-like language using Lua to decode SQL requests
  • Implement many datatypes not supported by Redis using Lua to fit into common Redis types the new structure
  • Serve content (like web pages o JSON data) directly from the datastore using a REST API.
  • Implement a GraphDB using SQL for Index and Lua for graph-traversal logic.
  • Implement Document-oriented model using Lua
  • Implement an ObjectDB using Lua

Last year Citrusleaf acquired AlchemyDB and Russ Sullivan (the guy behind AlchemyDB) incrementally porting functionality to run on top of Citrusleaf’s proven distributed high-availability linearly-scalable key-value store: Aerospike. It is a distributed NoSQL database, the first solution to claim ACID support and an extremely fast architecture optimized to run using SSDs.

I didn’t test it yet but as far I can see they provide and SDK for most popular programming languages. The Ruby one requires a native library. To start you need to add a node:

require "Citrusleaf"
c = Citrusleaf.new
c.add_node "10.1.1.6", 3000

And set, get and delete operations are done as follow:

# Writing Values
c.put 'namespace', 'myset', 'mykey', 'bin_name', value
# Reading Values
rv, gen, value = c.get 'namespace', 'myset', 'mykey', 'binx'
# Deleting Values
c.delete 'namespace', 'myset', 'mykey'

Documentation isn’t useful yet. The only way to understand how if is cool or not is to test it. That’s what I’ll do.