I few days ago I have been at Codemotion in Milan and I had the opportunity to discover some insights about technologies used by two of our main competitor in Italy: BlogMeter and Datalytics. It’s quite interesting because, also if technical challenges are almost the same, each company use a differente approach with a different stack.

datalytics_logo

Datalytics a is relatively new company founded 4 months ago. They had a desk at Codemotion to show theirs products and recruit new people. I chatted with Marco Caruso, the CTO (who probably didn’t know who I am, sorry Marco, I just wanted to avoid hostility 😉 ), about technologies they use and developer profile they were looking for. Requires skills was:

Their tech team is composed by 4 developers (including the CTO) and main products are: Datalytics Monitoring™ (a sort of statistical dashboard that shows buzz stats in real time) and Datalytics Engage™ (a real time analytics dashboard for live events). I have no technical insights about how they systems works but I can guess some details inferring them from the buzz words they use.

Supported sources are Twitter, Facebook (only public data), Instagram, Youtube, Vine (logos are on their website) and probably Pinterest.

They use DataSift as data source in addition to standard APIs. I suppose their processing pipeline uses Storm to manage streaming input, maybe with an importing layer before. Data is crunched using Hadoop and Java and results are stored on MongoDB (Massimo Brignoli, Italian MongoDB evangelist, advertise their company during his presentation so I suppose they largely use it).

Node.js should be used for frontend. Is fast enough for near real time application (also using websockets) and play really well both with Angular.js and MongoDB (the MEAN stack). D3.js is obviously the only choice for complex dynamic charts.

I’m not so happy when I discover a new competitor in our market segment. Competition gets harder and this is not fun. Anyway guys at Datalytics seems smart (and nice) and compete with them would be a pleasure and will push me to do my best.

Now I’m curios to know if Datalytics is monitoring buzz on the web around its company name. I’m going to tweet about this article using #Datalytics hashtag. If you find this article please tweet me “Yes, we found it bwahaha” 😛

[UPDATE 2014-12-27 21:18 CET]

@DatalyticsIT favorite my tweet on December 1st. This probably means they found my article but the didn’t read it! 😀

Everything started while I was writing my first post about the Hadoop Ecosystem. I was relatively new to Hadoop and I wanted to discover all useful projects. I started collecting projects for about 9 months building a simple index.

About a month ago I found an interesting thread posted on the Hadoop Users Group on LinkedIn written by Javi Roman, High Performance Computing Manager at CEDIANT (UAX). He talks about a table which maps the Hadoop ecosystem likewise I did on my list.

He published his list on Github a couple of day later and called it the Hadoop Ecosystem Table. It was an HTML table, really interesting but really hard to use for other purpose. I wanted to merge my list with this table so I decided to fork it and add more abstractions.

I wrote a couple of Ruby scripts (thanks Nokogiri) to extract data from my list and Javi’s table and put in an agnostic container. After a couple of days spent hacking on these parsers I found a simple but elegant solution: JSON.

Information about each project is stored in a separated JSON file:

{
"name": "Apache HDFS",
"description": "The Hadoop Distributed File System (HDFS) offers a way to store large files across \nmultiple machines. Hadoop and HDFS was derived from Google File System (GFS) paper. \nPrior to Hadoop 2.0.0, the NameNode was a single point of failure (SPOF) in an HDFS cluster. \nWith Zookeeper the HDFS High Availability feature addresses this problem by providing \nthe option of running two redundant NameNodes in the same cluster in an Active/Passive \nconfiguration with a hot standby. ",
"abstract": "a way to store large files across multiple machines",
"category": "Distributed Filesystem",
"tags": [
],
"links": [
{
"text": "hadoop.apache.org",
"url": "http://hadoop.apache.org/"
},
{
"text": "Google FileSystem - GFS Paper",
"url": "http://research.google.com/archive/gfs.html"
},
{
"text": "Cloudera Why HDFS",
"url": "http://blog.cloudera.com/blog/2012/07/why-we-build-our-platform-on-hdfs/"
},
{
"text": "Hortonworks Why HDFS",
"url": "http://hortonworks.com/blog/thinking-about-the-hdfs-vs-other-storage-technologies/"
}
]
}

It includes: project name, long and short description, category, tags and links.

I merged data into these files, and wrote a couple of generator in order to put data into different templates. Now i can generate code for my WordPress page and an update version of Javi’s table.

Finally I added more data into more generic categories not strictly related to Hadoop (like MySQL forks, Memcached forks and Search Engine platforms) and build a new version of the table: The Big Data Ecosystem table. JSON files are available to everyone and will be served directly from a CDN located under same domain of table.

This is how I built an open source big data map 🙂

Last summer I had the pleasure to review a really interesting book about Spark written by Holden Karau for PacktPub. She is a really smart woman currently software development engineer at Google, active in Spark‘s developers community. In the past she worked for MicrosoftAmazon and Foursquare.

Spark is a framework for writing fast, distributed programs. It’s similar to Hadoop MapReduce but uses fast in-memory approach. Spark ecosystem incorporates an inbuilt tools for interactive query analysis (Shark), a large-scale graph processing and analysis framework (Bagel), and real-time analysis framework (Spark Streaming). I discovered them a few months ago exploring the extended Hadoop ecosystem.

The book covers topics about how to write distributed map reduce style programs. You can find everything you need: setting up your Spark cluster, use the interactive shell and write and deploy distributed jobs in Scala, Java and Python. Last chapters look at how to use Hive with Spark to use a SQL-like query syntax with Shark, and manipulating resilient distributed datasets (RDDs).

Have fun reading it! 😀

Fast data processing with Spark
by Holden Karau

fast_data_processing_with_spark_cover

The title is also listed into Research Areas & Publications section of Google Research portal: http://research.google.com/pubs/pub41431.html

Last month while I was inspecting the Hadoop ecosystem I found many other software related to big-data. Below the (incomplete again) list.

N.B. Informations and texts are taken from official websites or sources referenced at the end of the article.

facebook_scribe_logoFacebook Scribe (Github)

Scribe is a server for aggregating log data streamed in real time from a large number of servers. It is designed to be scalable, extensible without client-side modification, and robust to failure of the network or any specific machine.”

facebook_scribe_diagramScribe servers are arranged in a directed graph, with each server knowing only about the next server in the graph. This network topology allows for adding extra layers of fan-in as a system grows, and batching messages before sending them between datacenters, without having any code that explicitly needs to understand datacenter topology, only a simple configuration.
Scribe was designed to consider reliability but to not require heavyweight protocols and expansive disk usage.

Facebook McDipper

McDipper is a highly performant flash-based cache server that is Memcache protocol compatible.”

Facebook heavily use Memcached in early stages and when RAM’s cost became too expensive had to find a different solution. This is why McDipper was born. It is not different then other projects like Fatcache or Edis. As many DBMS did, they try to mimic an in-memory caching engine in order to extend available space. SSD are fast enough to have no problem about speed loss.

Facebook Haystack

“The new photo infrastructure merges the photo serving tier and storage tier into one physical tier. It implements a HTTP based photo server which stores photos in a generic object store called Haystack.”

A system specifically designed to serve static content using HTTP as fast as possible. Haystack can be broken down into these functional layers –

  • HTTP server
  • Photo Store
  • Haystack Object Store
  • Filesystem
  • Storage

I talked about it in a previous post.

Facebook Memcached (Github)

“Here at Facebook, we’re likely the world’s largest user of memcached. We use memcached to alleviate database load. memcached is already fast, but we need it to be faster and more efficient than most installations.”

Twitter Storm (Github)

Storm is a distributed realtime computation system. Similar to how Hadoop provides a set of general primitives for doing batch processing, Storm provides a set of general primitives for doing realtime computation.”

twitter_storm_topologyThere are just three abstractions in Storm: spouts, bolts, and topologies.

A spout is a source of streams in a computation. Typically a spout reads from a queueing broker such as Kestrel, RabbitMQ, or Kafka, but a spout can also generate its own stream or read from somewhere.

A bolt processes any number of input streams and produces any number of new output streams. Most of the logic of a computation goes into bolts, such as functions, filters, streaming joins, streaming aggregations, talking to databases, and so on.

A topology is a network of spouts and bolts, with each edge in the network representing a bolt subscribing to the output stream of some other spout or bolt.

Twitter Snowflakes (Github)

Snowflake is a network service for generating unique ID numbers at high scale with some simple guarantees.”

Twitter needed something that could generate tens of thousands of ids per second in a highly available manner. This naturally led us to choose an uncoordinated approach.

These ids need to be roughly sortable, meaning that if tweets A and B are posted around the same time, they should have ids in close proximity to one another since this is how we and most Twitter clients sort tweets. Additionally, these numbers have to fit into 64 bits.

To generate them in an uncoordinated manner, we settled on a composition of: timestamp, worker number and sequence number. Sequence numbers are per-thread and worker numbers are chosen at startup via zookeeper (though that’s overridable via a config file).

Twitter Fatcache (Github)

Fatcache is memcache on SSD. Think of fatcache as a cache for your big data.”

SSD-backed memory presents a viable alternative for applications with large workloads that need to maintain high hit rate for high performance.

Like Facebook McDipper, Fatcache try to overcome memory limitations. It maintains an in-memory index for all data stored on disk. An in-memory index serves two purposes: cheap object existence checks and on-disk object address storage.

To minimize the number of small, random writes, fatcache treats the SSD as a log-structured object store. All writes are aggregated in memory and written to the end of the circular log in batches – usually multiples of 1 MB.

Twitter FlockDB (Github)

FlockDB is a database that stores graph data, but it isn’t a database optimized for graph-traversal operations. Instead, it’s optimized for very large adjacency lists, fast reads and writes, and page-able set arithmetic queries.”

flockdb_diagramIt is a distributed graph database for storing adjacency lists, with goals of supporting:

  • a high rate of add/update/remove operations
  • potientially complex set arithmetic queries
  • paging through query result sets containing millions of entries
  • ability to “archive” and later restore archived edges
  • horizontal scaling including replication
  • online data migration

Non-goals include: multi-hop queries (or graph-walking queries), automatic shard migrations

FlockDB is much simpler than other graph databases such as Neo4j because it tries to solve fewer problems. It scales horizontally and is designed for on-line, low-latency, high throughput environments such as web-sites.

twemcache_logoTwitter Twemcache (Github)

“We built Twemcache because we needed a more robust and manageable version of Memcached, suitable for our large-scale production environment.”

Apache Kafka (Github)

Kafka is publish-subscribe messaging rethought as a distributed commit log.”

apache_kafka_diagramIt is designed to support the following

  • Persistent messaging with O(1) disk structures that provide constant time performance even with many TB of stored messages.
  • High-throughput: even with very modest hardware Kafka can support hundreds of thousands of messages per second.
  • Explicit support for partitioning messages over Kafka servers and distributing consumption over a cluster of consumer machines while maintaining per-partition ordering semantics.
  • Support for parallel data load into Hadoop.

Kafka is aimed at providing a publish-subscribe solution that can handle all activity stream data and processing on a consumer-scale web site.

apache_gora_logoApache Gora (Github)

Gora is an open source framework provides an in-memory data model and persistence for big data. Gora supports persisting to column stores, key value stores, document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce support.”

Apache Mesos (Github)

Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications, or frameworks. It can run Hadoop, MPI, Hypertable, Spark”

apache_mesos_structureMain features are

  • Fault-tolerant replicated master using ZooKeeper.
  • Scalability to 10,000s of nodes using fast, event-driven C++ implementation.
  • Isolation between tasks with Linux Containers.
  • Multi-resource scheduling (memory and CPU aware).
  • Efficient application-controlled scheduling mechanism.
  • Java, Python and C++ APIs for developing new parallel applications.
  • Web UI for viewing cluster state.

apache_spark_logoApache Spark (Github)

Spark is an open source cluster computing system that aims to make data analytics fast  both fast to run and fast to write. To run programs faster, Spark provides primitives for in-memory cluster computing: your job can load data into memory and query it repeatedly much more quickly than with disk-based systems like Hadoop MapReduce.”

Spark was initially developed for two applications where keeping data in memory helps: iterative algorithms, which are common in machine learning, and interactive data mining. In both cases, Spark can run up to100x faster than Hadoop MapReduce. However, you can use Spark for general data processing too.

Shark: Hive on Spark (Github)

Shark is a large-scale data warehouse system for Spark designed to be compatible with Apache Hive. It can execute Hive QL queries up to 100 times faster than Hive without any modification to the existing data or queries. Shark supports Hive’s query language, metastore, serialization formats, and user-defined functions, providing seamless integration with existing Hive deployments and a familiar, more powerful option for new ones.”

shark_spark_diagramShark is built on top of Spark, a data-parallel execution engine that is fast and fault-tolerant. Even if data are on disk, Shark can be noticeably faster than Hive because of the fast execution engine. It avoids the high task launching overhead of Hadoop MapReduce and does not require materializing intermediate data between stages on disk. Thanks to this fast engine, Shark can answer queries in sub-second latency.

fluentd_logoFluentd (Github)

Fluentd is an open-source tool to collect events and logs. 150+ plugins instantly enables you to store the massive data for Log Search, Big Data Analytics, and Archiving (MongoDB, S3, Hadoop)”

fluentd_diagram

The fundamental problem with logs is that they are usually stored in files although they are best represented as streams (by Adam Wiggins, CTO at Heroku). Traditionally, they have been dumped into text-based files and collected by rsync in hourly or daily fashion. With today’s web/mobile applications, this creates two problems.

  1. Need ad-hoc parsing: The text-based logs have their own format, and analytics engineer need to write a dedicated parser for each format. But that’s probably not the best use of your time. You should be analyzing data to make better business decisions instead of writing one parser after another.
  2. Lacks Freshness: The logs lag. The realtime analysis of user behavior makes feature iterations a lot faster. A nimbler A/B testing will help you differentiate your service from competitors.

This is where Fluentd comes in. We believe Fluentd solves all issues of scalable log collection by getting rid of files and turning logs into true semi-structured data streams.

kestrel_logo

Kestrel (Github)

“Kestrel is a very simple message queue that runs on the JVM. It supports multiple protocols: memcache: the memcache protocol, with some extensions, thrift: Apache Thrift-based RPC, text: a simple text-based protocol”

A cluster of kestrel servers is like a memcache cluster: the servers don’t know about each other, and don’t do any cross-communication, so you can add as many as you like. The simplest clients have a list of all servers in the cluster, and pick one at random for each operation. In this way, each queue appears to be spread out across every server, with items in a loose ordering. More advanced clients can find kestrel servers via ZooKeeper.

impala_logoCloudera Impala (Github)

Impala is an open source Massively Parallel Processing (MPP) query engine that runs natively on Apache Hadoop”

With Impala, you can query data, whether stored in HDFS or Apache HBase – including SELECT, JOIN, and aggregate functions – in real time.

To avoid latency, Impala circumvents MapReduce to directly access the data through a specialized distributed query engine that is very similar to those found in commercial parallel RDBMSs. The result is order-of-magnitude faster performance than Hive, depending on the type of query and configuration. (See FAQ below for more details.) Note that this performance improvement has been confirmed by several large companies that have tested Impala on real-world workloads for several months now.

Structure diagram:

cloudera_impala_diagram

HadoopDB

HadoopDB is an Architectural Hybrid of MapReduce and DBMS Technologies for Analytical Workloads”

HadoopDB is:

  • A hybrid of DBMS and MapReduce technologies that targets analytical workloads
  • Designed to run on a shared-nothing cluster of commodity machines, or in the cloud
  • An attempt to fill the gap in the market for a free and open source parallel DBMS
  • Much more scalable than currently available parallel database systems and DBMS/MapReduce hybrid systems.
  • As scalable as Hadoop, while achieving superior performance on structured data analysis workloads

hypertable_logoHypertable

Hypertable is an open source database system inspired by publications on the design of Google’s BigTable. The project is based on experience of engineers who were solving large-scale data-intensive tasks for many years.”

This project is for the design and implementation of a high performance, scalable, distributed storage and processing system for structured and unstructured data. It is designed to manage the storage and processing of information on a large cluster of commodity servers, providing resilience to machine and component failures. Data is represented in the system as a multi-dimensional table of information. The data in a table can be transformed and organized at high speed by performing computations in parallel, pushing them to where the data is physically stored.

Sources:

database_venn_diagram

Last week I found this diagram on @al3xandru‘s MyNoSQL blog and I was surprise of how many softwares I never heard before.

From the diagram are missing many other softwares such as NuoDB (NewSQL), Aerospike (Key-Value), Titan (Graph), FoundationDB (Key-Values) Apache Accumulo (Key-Value), Apache Giraph (Graph) and more and includes some companies (like Cloudera, MapR and Xeround) also if they didn’t develop a custom version but just fork and maintain the main one.

Anyway it seems one of the best visual representation of the current database world and I’m going to use it as base to an updated and more detailed version 😉

Sources: 

Few days after Google released its papers on 2003 many developers started implement them. Apache Hadoop is the biggest result of that implementation. Around Hadoop many other technologies was born and the Apache Software Foundation helped the most promising to grow up. Below there is an (incomplete) list of the Hadoop-related softwares.

Apache Hadoop (HDFS, MapReduce)

apache_hadoop_logoHadoop is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage.

The Hadoop Distributed File System (HDFS) offers a way to store large files across multiple machines. Hadoop and HDFS was derived from Google’s MapReduce and Google File System (GFS) papers.

Apache Hive (Github)

apache_hive_logoHive is a data warehouse system for Hadoop […] Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL.”

MapReduce paradigm is extremely powerful but programmers use SQL to query data from years. HiveQL is a SQL-like language to query data over the Hadoop filesystem.

An example of HiveQL:

CREATE TABLE page_view(viewTime INT, userid BIGINT,
page_url STRING, referrer_url STRING,
friends ARRAY<BIGINT>, properties MAP<STRING, STRING>
ip STRING COMMENT 'IP Address of the User')
COMMENT 'This is the page view table'
PARTITIONED BY(dt STRING, country STRING)
CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '1'
COLLECTION ITEMS TERMINATED BY '2'
MAP KEYS TERMINATED BY '3'
STORED AS SEQUENCEFILE;

Apache Pig (Github)

pig_logoPig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. […] Pig’s infrastructure layer consists of a compiler that produces sequences of Map-Reduce programs. […] Pig’s language layer currently consists of a textual language called Pig Latin

If you don’t like SQL maybe you prefer a sort of procedural language. Pig Latin is different than HiveQL but have the same purpose: query data.

An example of Pig Latin:

set default_parallel 10;
daily   = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close);
bysymbl = group daily by symbol;
average = foreach bysymbl generate group, AVG(daily.close) as avg;
sorted  = order average by avg desc;

Apache Avro (GitHub)

avro_logoAvro is a data serialization system.

It’s a framework for performing remote procedure calls and data serialization. It can be used to pass data from one program or language to another (e.g. from C to Pig). It is particularly suited for use with scripting languages such as Pig, because data is always stored with its schema in Avro, and therefore the data is self-describing.

Apache Chukwa (Github)

chukwa_logoChukwa is an open source data collection system for monitoring large distributed systems. It’s built on top of the Hadoop Distributed File System (HDFS) and Map/Reduce framework and inherits Hadoop’s scalability and robustness.

It’s used to process and analyze generated logs and has different components:
  • Agents that run on each machine to collect the logs generated from various applications.
  • Collectors that receive data from the agent and write it to stable storage
  • MapReduce jobs or parsing and archiving the data.

chukwa_structure

Apache Drill (Github)

drill_logoDrill is a distributed system for interactive analysis of large-scale datasets, based on Google’s Dremel. Its goal is to efficiently process nested data. It is a design goal to scale to 10,000 servers or more and to be able to process petabyes of data and trillions of records in seconds.

Idea behind Drill is to build a low-latency execution engine, enabling interactive queries across billions of records instead of using a batch MapReduce process.

Apache Flume (Github)

flume_logoFlume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

It is a distributed service that makes it very easy to collect and aggregate your data into a persistent store such as HDFS. Flume can read data from almost any source – log files, Syslog packets, the standard output of any Unix process – and can deliver it to a batch processing system like Hadoop or a real-time data store like HBase.

Apache HBase (Github)

hbase_logoHBase is the Hadoop database, a distributed, scalable, big data store.

It is an open source, non-relational, distributed database modeled after Google’s BigTable, is written in Java and provides a fault-tolerant way of storing large quantities of sparse data. HBase features compression, in-memory operation, and Bloom filters on a per-column basis as outlined in the original BigTable paper. Tables in HBase can serve as the input and output for MapReduce jobs run in Hadoop.

Apache HCatalog (Github)

HCatalog is a table and storage management service for data created using Hadoop

Hadoop needs a better abstraction for data storage, and it needs a metadata service. HCatalog addresses both of these issues. It presents users with a table abstraction. This frees them from knowing where or how their data is stored. It allows data producers to change how they write data while still supporting existing data in the old format so that data consumers do not have to change their processes. It provides a shared schema and data model for Pig, Hive, and MapReduce. It will enable notifications of data availability. And it will provide a place to store state information about the data so that data cleaning and archiving tools can know which data sets are eligible for their services.

Apache Mahout (Github)

mahout_logoMahout is a machine learning library’s goal is to build scalable machine learning libraries”

It’s an implementations of distributed machine learning algorithms on the Hadoop platform. While Mahout‘s core algorithms for clustering, classification and batch based collaborative filtering are implemented on top of Apache Hadoop using the map/reduce paradigm, it does not restrict contributions to Hadoop based implementations.

Apache Oozie (Github)

oozie_logoOozie is a workflow scheduler system to manage Apache Hadoop jobs.”

Tasks performed in Hadoop sometimes require multiple Map/Reduce jobs to be chained together to complete its goal.

Oozie is a Java Web-Application that runs in a Java servlet-container and uses a database to store:

  • Workflow definitions
  • Currently running workflow instances, including instance states and variables

Oozie workflow is a collection of actions (i.e. Hadoop Map/Reduce jobs, Pig jobs) arranged in a control dependency DAG (Direct Acyclic Graph), specifying a sequence of actions execution. This graph is specified in hPDL (a XML Process Definition Language).

Apache Sqoop (Github)

sqoop_logoSqoop is a tool designed for efficiently transferring bulk data between Apache Hadoop and structured datastores such as relational databases.”

Sqoop (“SQL-to-Hadoop”) is a straightforward command-line tool with the following capabilities:

  • Imports individual tables or entire databases to files in HDFS
  • Generates Java classes to allow you to interact with your imported data
  • Provides the ability to import from SQL databases straight into your Hive data warehouse

After setting up an import job in Sqoop, you can get started working with SQL database-backed data from your Hadoop MapReduce cluster in minutes.

Apache ZooKeeper (Github)

zookeeper_logoZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications.”

it is an open source, in-memory, distributed NoSQL database, typically used for storing configuration variables.

Apache Giraph (Github)

giraph_logoGiraph is an iterative graph processing system built for high scalability. For example, it is currently used at Facebook to analyze the social graph formed by users and their connections. Giraph originated as the open-source counterpart to Pregel, the graph processing architecture developed at Google”

While it is possible to do processing on graphs with MapReduce, this approach is suboptimal for two reasons:

  1. MapReduce’s view of the world as keys and values is not the greatest way to think of graphs and often requires a significant effort to pound graph-shaped problems into MapReduce-shaped solutions.
  2. Most graph algorithms involve repeatedly iterating over the graph states, which in a MapReduce world requires multiple chained jobs. This, in turn, requires the state to be loaded and saved between each iteration, operations that can easily dominate the runtime of the computation overall.

Giraph attempts to alleviate these limitations by providing a more natural way to model graph problems:

  1. Think like a vertex!
  2. Keep the graph state in memory during the whole of the algorithm, only writing out the final state (and possibly some optional checkpointing to save progress as we go).

Rather than implementing mapper and reducer classes, one implements a Vertex, which has a value and edges and is able to send and receive messages to other vertices in the graph as the computation iterates.

Apache Accumulo (Github)

accumulo_logoAccumulo sorted, distributed key/value store is a robust, scalable, high performance data storage and retrieval system.”

It is a sorted, distributed key/value store based on Google’s BigTable design. Written in Java, Accumulo has cell-level access labels (useful for security purpose) and server-side programming mechanisms called Iterators that allows users to perform additional processing at the Tablet Server.

Apache S4 (Github)

s4_logoS4 is a general-purpose, distributed, scalable, fault-tolerant, pluggable platform that allows programmers to easily develop applications for processing continuous unbounded streams of data.”

Developed by Yahoo (which released the Yahoo’s S4 paper) and then open sourced to Apache. Inspired by MapReduce and Actor model for computation. Basic components are:

  • Processing Element (PE): Basic computational unit which can send and receive messages called Events.
  • Processing Node (PN): The logical hosts to PEs
  • Adapter: injects events into the S4 cluster and receives from it via the Communication Layer.

Apache Thrift (Github)

Thrift software framework, for scalable cross-language services development, combines a software stack with a code generation engine to build services”

It is an interface definition language that is used to define and create services for numerous languages It is used as a remote procedure call (RPC) framework and was developed at Facebook for “scalable cross-language services development”. It combines a software stack with a code generation engine to build services that work efficiently together.

To put it simply, Apache Thrift is a binary communication protocol.

Sources

Recently I needed to select best hosted service for some datastore to use for a large and complex project. Starting from Heroku and AppFog’s add-ons I found many free plans useful to test service and/or to use in production if your app is small enough (as example this blog runs on Heroku PostgreSQL’s Dev plan). Here the list:

MySQL

  • Xeround (Starter plan): 5 connection and 10 MB of storage
  • ClearDB (Ignite plan): 10 connections and 5 MB of storage

MongoDB

  • MongoHQ (Sandbox): 50MB of memory, 512MB of data
  • MongoLab (Starter plan): 496 MB of storage

Redis

  • RedisToGo (Nano plan): 5MB, 1 DB, 10 connections and no backups.
  • RedisCloud by Garantia Data: 20MB, 1 DB, 10 connections and no backups.
  • MyRedis (Gratis plan): 5MB, 1 DB, 3 connections and no backups.

Memcache

CouchDB

  • IrisCouch (up to 5$): No limits, usage fees for HTTP requests and storage.
  • Cloudant (Oxygen plan): 150,000 request, 250 MB of storage.

PostgreSQL – Heroku PostgreSQL (Dev plan): 20 connections, 10.000 rows of data
Cassandra – Cassandra.io (Beta on Heroku): 500 MB and 50 transactions per second
Riak – RiakOn! (Sandbox): 512MB of memory
Hadoop – Treasure Data (Nano plan): 100MB (compressed), data retention for 90 days
Neo4j – Heroku Neo4j (Heroku AddOn beta): 256MB of memory and 512MB of data.
OrientDB – NuvolaBase (Free): 100MB of storage and 100.000 records
TempoDB – TempoDB Hosted (Development plan): 50.000.000 data points, 50 series.
JustOneDB – Heroku JustOneDB (Lambda plan): 50MB of data

In previous post I analyzed Facebook main infrastructure. Now I’m going deeper into services.

Facebook Images

Facebook is the biggest photo sharing service in the world and grows by several millions of images every week. Pre-2009 infrastructure uses three NFS tier. Also with some optimization this solution can’t easily scale over a few billions of images.

So in 2009 Facebook develop Haystack, an HTTP based photo server. It is composed by 5 layers: HTTP server, Photo Store, Haystack Object Store, Filesystem and Storage.

Storage is made on storage blades using a RAID-6 configuration who provides adequate redundancy and excellent read performance. The poor write performance is partially mitigated by the RAID controller NVRAM write-back cache. Filesystem used is XFS and manage only storage-blade-local files, no NFS is used.

Haystack Object Store is a simple log structured (append-only) object store containing needles representing the stored objects. A Haystack consists of two files:

the actual haystack store file containing the needles

haystack_content

plus an index file

haystack_index

Photo Store server is responsible for accepting HTTP requests and translating them to the corresponding Haystack store operations. It keeps an in-memory index of all photo offsets in the haystack store file. The HTTP framework we use is the simple evhttp server provided with the open source libevent library.

Insights and Sources

Facebook Messages and Chat

Facebook messaging system is powered by a system called Cell. The entire messaging system (email, SMS, Facebook Chat, and the Facebook Inbox) is divided into cells, and each cell contains only a subset of users. Every Cell is composed by a cluster of application server (where business logic is defined) monitored by different ZooKeeper instances.

Application servers use a data acces layer to communicate with metadata storage, an HBase based system (old messaging infrastructure relied on Cassandra) which contains all the informations related to messages and users.

Cells are the “core” of the system. To connect them to the frontend there are different “entry points”. An MTA proxy parses mail and redirect data to the correct application. Emails are stored in the same structure than photos: Haystack. There are also discovering service to map user-to-Cell (based on hashing) and service-to-Cell (based on ZooKeeper notifications) and everything expose an API.

There is a “dirty” cache based on Memcached to serve messages (from a local cache of datacenter) and social information about the users (like social indexes).

facebook_messages_architecture

The search engine for messages is built using an inverted index stored in HBase.

Chat is based on an Epoll server developed in Erlang and accessed using Thrift and there is also a subsystem for logging chat messages (in C++). Both subsystems are clustered and partitioned for reliability and efficient failover.

Real-time presence notification is the most resource-intensive operation performed (not sending messages): keeping each online user aware of the online-idle-offline states of their friends. Real-time messaging is done using a variation of Comet, specifically XHR long polling, and/or BOSH.

Insights and Sources

Facebook Search

Original Facebook search engine simply searches into cached users informations: friends list, like lists and so on. Typeahead search (the search box on the top of Facebook frontend) came on 2009. It try to suggest you most interesting results. Performances are really important and results must be available within 100ms. It has to be fast and scalable and the structure of the system is built as follow:

typeahead_search

First attempts are still on browser cache where are stored informations about user (friends, like, pages). If cache misses starts and AJAX request.

Many Leaf services search for results inside theirs indexes (stored into an inverted index called Unicorn). When results references are fetched from the indexes, they are merged and loaded from global datastore. An aggregator provide a single channel to send data to client. Obviously query are cached.

On 2012 Facebook starts from the core part of typeahead search to build a new search tool. Unicorn is the core part of the new Graph Search. Formally is an in-memory inverted index which maps Facebook contents as a graph database would do and you can query it as graph traversing tool. To be used for Graph SearchUnicorn was updated to be more than a traversing tool. Now supports nested queries, scoring and support for different kind of resources. Results are aggregated on different levels.

unicorn

Query lifecycle is usually made by 2 steps: the Suggestion Phase and the Search Phase.

The Suggestion Phase works like “autocomplete” and Is powered by a Natural Language Processing (NLP) module attempts to parse text based on a grammar. It identifies parts of the query as potential entities and passes these parts down to Unicorn to search for them.

The Search Phase begins when the searcher has made a selection from the suggestions. The parse tree, along with the fbids of the matched entities, is sent back to the Top Aggregator. A user readable version of this query is displayed as part of the URL.

Currently Graph Search is still in beta.

Insights and Sources

Resources and insights

This post and the previous one are based and inspired by the answer of Michaël Figuière to the following question on Quora: What is Facebook’s architecture?

Additional stuff:

Yesterday @lastknight was looking for something to store and query a huge graph dataset. He found Titan developed by Aurelius and released last August. It is a distributed graph database who can rely on Apache Cassandra, Apache HBase or Oracle BerkeyDB for storage. It promises to be fully distributed and horizontally scalable, it’s really ambitious and presentation seem really interesting 🙂


Aurelius also develops Faunus, a Apache Hadoop-based graph analytics engine for analyzing massive-scale graphs.

More about Titan and Faunus:

[UPDATE 2013-03-10] Subscription page describe details about software (developed and planned) by Aurelius. The Fulgora processor seems really interesting. http://thinkaurelius.com/subscription/

In the beginning was RediSQL, a “Hybrid Relational-Database/NOSQL-Datastore written in ANSI C”. After a while they changed the name to AlchemyDB.

Everything is built over Redis adding a Lua interpreter to implement a really interesting technique they call Datastore-Side-Scripting. Is like to use stored procedure, putting logic into datastore. They can achieve many different goals using this technique:

  • Implement a SQL-like language using Lua to decode SQL requests
  • Implement many datatypes not supported by Redis using Lua to fit into common Redis types the new structure
  • Serve content (like web pages o JSON data) directly from the datastore using a REST API.
  • Implement a GraphDB using SQL for Index and Lua for graph-traversal logic.
  • Implement Document-oriented model using Lua
  • Implement an ObjectDB using Lua

Last year Citrusleaf acquired AlchemyDB and Russ Sullivan (the guy behind AlchemyDB) incrementally porting functionality to run on top of Citrusleaf’s proven distributed high-availability linearly-scalable key-value store: Aerospike. It is a distributed NoSQL database, the first solution to claim ACID support and an extremely fast architecture optimized to run using SSDs.

I didn’t test it yet but as far I can see they provide and SDK for most popular programming languages. The Ruby one requires a native library. To start you need to add a node:

require "Citrusleaf"
c = Citrusleaf.new
c.add_node "10.1.1.6", 3000

And set, get and delete operations are done as follow:

# Writing Values
c.put 'namespace', 'myset', 'mykey', 'bin_name', value
# Reading Values
rv, gen, value = c.get 'namespace', 'myset', 'mykey', 'binx'
# Deleting Values
c.delete 'namespace', 'myset', 'mykey'

Documentation isn’t useful yet. The only way to understand how if is cool or not is to test it. That’s what I’ll do.