I few days ago I have been at Codemotion in Milan and I had the opportunity to discover some insights about technologies used by two of our main competitor in Italy: BlogMeter and Datalytics. It’s quite interesting because, also if technical challenges are almost the same, each company use a differente approach with a different stack.


Datalytics a is relatively new company founded 4 months ago. They had a desk at Codemotion to show theirs products and recruit new people. I chatted with Marco Caruso, the CTO (who probably didn’t know who I am, sorry Marco, I just wanted to avoid hostility 😉 ), about technologies they use and developer profile they were looking for. Requires skills was:

Their tech team is composed by 4 developers (including the CTO) and main products are: Datalytics Monitoring™ (a sort of statistical dashboard that shows buzz stats in real time) and Datalytics Engage™ (a real time analytics dashboard for live events). I have no technical insights about how they systems works but I can guess some details inferring them from the buzz words they use.

Supported sources are Twitter, Facebook (only public data), Instagram, Youtube, Vine (logos are on their website) and probably Pinterest.

They use DataSift as data source in addition to standard APIs. I suppose their processing pipeline uses Storm to manage streaming input, maybe with an importing layer before. Data is crunched using Hadoop and Java and results are stored on MongoDB (Massimo Brignoli, Italian MongoDB evangelist, advertise their company during his presentation so I suppose they largely use it).

Node.js should be used for frontend. Is fast enough for near real time application (also using websockets) and play really well both with Angular.js and MongoDB (the MEAN stack). D3.js is obviously the only choice for complex dynamic charts.

I’m not so happy when I discover a new competitor in our market segment. Competition gets harder and this is not fun. Anyway guys at Datalytics seems smart (and nice) and compete with them would be a pleasure and will push me to do my best.

Now I’m curios to know if Datalytics is monitoring buzz on the web around its company name. I’m going to tweet about this article using #Datalytics hashtag. If you find this article please tweet me “Yes, we found it bwahaha” 😛

[UPDATE 2014-12-27 21:18 CET]

@DatalyticsIT favorite my tweet on December 1st. This probably means they found my article but the didn’t read it! 😀


DataSift, as they said on their home page, “aggregate, process and deliver social data“. It is one of the oldest Twitter certified partners and offers data coming from almost every existing social network. I use it everyday to “listen” the net and find data I need for my analysis.

It’s impressive to watch how fast they collect data from external sources and deliver it to your chosen destination. When I tweet, a couple of minutes ago a JSON file land my S3 bucket.

To create an Internet scale filtering is not easy. Their infrastructure is really complex and optimized. This is a 2011 diagram of their workflow.


Twitter generates more than 500 million tweets per day and is only one of the available resources. The DataSift system performs 250+ million sentiment analysis with sub 100ms latency, and several TB of augmented (includes gender, sentiment, etc) data transits the platform daily. Data Filtering Nodes Can process up to 10,000 unique streams. Can do data-lookup’s on 10,000,000+ username lists in real-time. Links Augmentation Performs 27 million link resolves + lookups plus 15+ million full web page aggregations per day.

C++ is used for the performance-critical components, like the core filtering engine and PHP is for the site, external API server, most of the internal web services, and a custom-built, high performance job queue manager. Java/Scala for batch processing with HBase and MapReduce jobs. Kafka is used as queuing system and Ruby is used for deploys and provisioning. Thrift is widely used.

MySQL (Percona server) on SSD drives is used as primary storage, HBase cluster over more than 30 Hadoop nodes provides a place to store historical data and Memcached and Redis are used for caching.

Here is a schema of the processing unit which build the historical database.


Message queues are another critical component of the infrastructure. 0mq (custom build from latest alpha branch, with some stability fixes, to use publisher-side filtering), used in different configurations:

  • PUB-SUB for replication / message broadcasting;
  • PUSH-PULL for round-robin workload distribution;
  • REQ-REP for health checks of different components.

Kafka for high-performance persistent queues. In both cases they’re working with the developers and contributing bug reports / traces / fixes / client libraries.

All code is pulled from the repo from Jenkins every 5 mins, automatically tested and verified with several QA tools, packaged as an RPM and moved to the dev package repo. Chef is used to automate deployments and manage configuration. All services emit StatsD events, which are combined with other system-level checks, added to Zenoss and displayed with Graphite.

The biggest challenge IMHO is filtering. Filtering at this scale requires a different approach. They started with work they did at TweetMeme. The core filter engine is in C++ and is called the Pickle Matrix. Over three years they’ve developed a compiler and their own virtual machine. We don’t know what their technology is exactly, but it might be something like Distributed Complex Event Processing with Query Rewriting.


Almost all content of this post come from the wonderful article “DataSift Architecture: Realtime Datamining At 120,000 Tweets Per Second” posted on HighScalability. Some details also from “Historical Architecture – Data Mining Billions of Tweets” from DataSift blog.


According to WikipediaKlout is “a website and mobile app that uses social media analytics to rank its users according to online social influence via the “Klout Score“, which is a numerical value between 1 and 100“.

This is not so different from what I try to do everyday. They get signals from social networks, process them in order to extract relevant data and show some diagrams and a synthetic index of user influence. It’s really interesting for me observe how their data is stored and processed.

At Hadoop Summit 2012, Dave Mariani (by Klout) and Denny Lee (by Microsoft) presented the Klout architecture and shown the following diagram:


It shows many different technologies, a great example of polyglot persistence 🙂

Klout uses a lot of Hadoop. It’s used to collect signals coming from different Signal Collectors (one for each social network i suppose). Procedure to enhance data are written using Pig and Hive used also for data warehouse.

Currently MySQL is used only to collect user registrations, ingested into the data warehouse system. In the past they use it as bridge between the data warehouse and their “Cube“, a Microsoft SQL Server Analysis Services (SSAS). They use it for Business Intelligence with Excel and other custom apps. On 2011 data were migrated using Sqoop. Now they can leverage on Microsoft’s Hive ODBC driver and MySQL isn’t used anymore.

Website and mobile app are based on the Klout API. Data is collected from the data warehouse and stored into HBase (users profile and score) and MongoDB (interaction between users). ElasticSearch is used as search index.

Most of custom components are written in Scala. The only exception is the website, written in Javascript/Node.js.

In the end Klout is probably the biggest company working both using open source tools coming from the Hadoop ecosystem and Microsoft tools. The Hadoop version for Windows Azure, developed in pair with Hortonworks, is probably the first product of this collaboration.


Few days after Google released its papers on 2003 many developers started implement them. Apache Hadoop is the biggest result of that implementation. Around Hadoop many other technologies was born and the Apache Software Foundation helped the most promising to grow up. Below there is an (incomplete) list of the Hadoop-related softwares.

Apache Hadoop (HDFS, MapReduce)

apache_hadoop_logoHadoop is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage.

The Hadoop Distributed File System (HDFS) offers a way to store large files across multiple machines. Hadoop and HDFS was derived from Google’s MapReduce and Google File System (GFS) papers.

Apache Hive (Github)

apache_hive_logoHive is a data warehouse system for Hadoop […] Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL.”

MapReduce paradigm is extremely powerful but programmers use SQL to query data from years. HiveQL is a SQL-like language to query data over the Hadoop filesystem.

An example of HiveQL:

CREATE TABLE page_view(viewTime INT, userid BIGINT,
page_url STRING, referrer_url STRING,
friends ARRAY<BIGINT>, properties MAP<STRING, STRING>
ip STRING COMMENT 'IP Address of the User')
COMMENT 'This is the page view table'

Apache Pig (Github)

pig_logoPig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. […] Pig’s infrastructure layer consists of a compiler that produces sequences of Map-Reduce programs. […] Pig’s language layer currently consists of a textual language called Pig Latin

If you don’t like SQL maybe you prefer a sort of procedural language. Pig Latin is different than HiveQL but have the same purpose: query data.

An example of Pig Latin:

set default_parallel 10;
daily   = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close);
bysymbl = group daily by symbol;
average = foreach bysymbl generate group, AVG(daily.close) as avg;
sorted  = order average by avg desc;

Apache Avro (GitHub)

avro_logoAvro is a data serialization system.

It’s a framework for performing remote procedure calls and data serialization. It can be used to pass data from one program or language to another (e.g. from C to Pig). It is particularly suited for use with scripting languages such as Pig, because data is always stored with its schema in Avro, and therefore the data is self-describing.

Apache Chukwa (Github)

chukwa_logoChukwa is an open source data collection system for monitoring large distributed systems. It’s built on top of the Hadoop Distributed File System (HDFS) and Map/Reduce framework and inherits Hadoop’s scalability and robustness.

It’s used to process and analyze generated logs and has different components:
  • Agents that run on each machine to collect the logs generated from various applications.
  • Collectors that receive data from the agent and write it to stable storage
  • MapReduce jobs or parsing and archiving the data.


Apache Drill (Github)

drill_logoDrill is a distributed system for interactive analysis of large-scale datasets, based on Google’s Dremel. Its goal is to efficiently process nested data. It is a design goal to scale to 10,000 servers or more and to be able to process petabyes of data and trillions of records in seconds.

Idea behind Drill is to build a low-latency execution engine, enabling interactive queries across billions of records instead of using a batch MapReduce process.

Apache Flume (Github)

flume_logoFlume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

It is a distributed service that makes it very easy to collect and aggregate your data into a persistent store such as HDFS. Flume can read data from almost any source – log files, Syslog packets, the standard output of any Unix process – and can deliver it to a batch processing system like Hadoop or a real-time data store like HBase.

Apache HBase (Github)

hbase_logoHBase is the Hadoop database, a distributed, scalable, big data store.

It is an open source, non-relational, distributed database modeled after Google’s BigTable, is written in Java and provides a fault-tolerant way of storing large quantities of sparse data. HBase features compression, in-memory operation, and Bloom filters on a per-column basis as outlined in the original BigTable paper. Tables in HBase can serve as the input and output for MapReduce jobs run in Hadoop.

Apache HCatalog (Github)

HCatalog is a table and storage management service for data created using Hadoop

Hadoop needs a better abstraction for data storage, and it needs a metadata service. HCatalog addresses both of these issues. It presents users with a table abstraction. This frees them from knowing where or how their data is stored. It allows data producers to change how they write data while still supporting existing data in the old format so that data consumers do not have to change their processes. It provides a shared schema and data model for Pig, Hive, and MapReduce. It will enable notifications of data availability. And it will provide a place to store state information about the data so that data cleaning and archiving tools can know which data sets are eligible for their services.

Apache Mahout (Github)

mahout_logoMahout is a machine learning library’s goal is to build scalable machine learning libraries”

It’s an implementations of distributed machine learning algorithms on the Hadoop platform. While Mahout‘s core algorithms for clustering, classification and batch based collaborative filtering are implemented on top of Apache Hadoop using the map/reduce paradigm, it does not restrict contributions to Hadoop based implementations.

Apache Oozie (Github)

oozie_logoOozie is a workflow scheduler system to manage Apache Hadoop jobs.”

Tasks performed in Hadoop sometimes require multiple Map/Reduce jobs to be chained together to complete its goal.

Oozie is a Java Web-Application that runs in a Java servlet-container and uses a database to store:

  • Workflow definitions
  • Currently running workflow instances, including instance states and variables

Oozie workflow is a collection of actions (i.e. Hadoop Map/Reduce jobs, Pig jobs) arranged in a control dependency DAG (Direct Acyclic Graph), specifying a sequence of actions execution. This graph is specified in hPDL (a XML Process Definition Language).

Apache Sqoop (Github)

sqoop_logoSqoop is a tool designed for efficiently transferring bulk data between Apache Hadoop and structured datastores such as relational databases.”

Sqoop (“SQL-to-Hadoop”) is a straightforward command-line tool with the following capabilities:

  • Imports individual tables or entire databases to files in HDFS
  • Generates Java classes to allow you to interact with your imported data
  • Provides the ability to import from SQL databases straight into your Hive data warehouse

After setting up an import job in Sqoop, you can get started working with SQL database-backed data from your Hadoop MapReduce cluster in minutes.

Apache ZooKeeper (Github)

zookeeper_logoZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications.”

it is an open source, in-memory, distributed NoSQL database, typically used for storing configuration variables.

Apache Giraph (Github)

giraph_logoGiraph is an iterative graph processing system built for high scalability. For example, it is currently used at Facebook to analyze the social graph formed by users and their connections. Giraph originated as the open-source counterpart to Pregel, the graph processing architecture developed at Google”

While it is possible to do processing on graphs with MapReduce, this approach is suboptimal for two reasons:

  1. MapReduce’s view of the world as keys and values is not the greatest way to think of graphs and often requires a significant effort to pound graph-shaped problems into MapReduce-shaped solutions.
  2. Most graph algorithms involve repeatedly iterating over the graph states, which in a MapReduce world requires multiple chained jobs. This, in turn, requires the state to be loaded and saved between each iteration, operations that can easily dominate the runtime of the computation overall.

Giraph attempts to alleviate these limitations by providing a more natural way to model graph problems:

  1. Think like a vertex!
  2. Keep the graph state in memory during the whole of the algorithm, only writing out the final state (and possibly some optional checkpointing to save progress as we go).

Rather than implementing mapper and reducer classes, one implements a Vertex, which has a value and edges and is able to send and receive messages to other vertices in the graph as the computation iterates.

Apache Accumulo (Github)

accumulo_logoAccumulo sorted, distributed key/value store is a robust, scalable, high performance data storage and retrieval system.”

It is a sorted, distributed key/value store based on Google’s BigTable design. Written in Java, Accumulo has cell-level access labels (useful for security purpose) and server-side programming mechanisms called Iterators that allows users to perform additional processing at the Tablet Server.

Apache S4 (Github)

s4_logoS4 is a general-purpose, distributed, scalable, fault-tolerant, pluggable platform that allows programmers to easily develop applications for processing continuous unbounded streams of data.”

Developed by Yahoo (which released the Yahoo’s S4 paper) and then open sourced to Apache. Inspired by MapReduce and Actor model for computation. Basic components are:

  • Processing Element (PE): Basic computational unit which can send and receive messages called Events.
  • Processing Node (PN): The logical hosts to PEs
  • Adapter: injects events into the S4 cluster and receives from it via the Communication Layer.

Apache Thrift (Github)

Thrift software framework, for scalable cross-language services development, combines a software stack with a code generation engine to build services”

It is an interface definition language that is used to define and create services for numerous languages It is used as a remote procedure call (RPC) framework and was developed at Facebook for “scalable cross-language services development”. It combines a software stack with a code generation engine to build services that work efficiently together.

To put it simply, Apache Thrift is a binary communication protocol.



I always underestimated the contribute of Google to the evolution of big-data processing. I used to think that Google only manages and shows some search results.  Not so much data. Not so much as Facebook or Twitter at least…

Obviously I was wrong. Google has to manage a HUGE amount of data and big-data processing was already a problem on 2002! Its contribution to the current processing technologies such as Hadoop and its filesystem HDFS and HBase was fundamental.

We can split contribution into two periods. The first of these (from 2003 to 2008) influenced technologies we are using today. The second (from 2009 since today) is influencing product we are going to use is the near future.

The first period gave us

  • GFS Google FileSystem (PDF paper), a scalable distributed file system for large distributed data-intensive applications which later inspire HDFS
  • BigTable (PDF paper), a columnar oriented database designed to store petabyte of data across large clusters which later inspire HBase
  • the concept of MapReduce (PDF paper), a programming model to process large datasets distributed across large cluster. Hadoop implements this programming model over the HDFS or similar filesystems.

This series of paper revolutionized the strategies behind data warehouse and now all the largest companies uses products, inspired by these papers, we all knows.

The second period is less popular at the moment. Google faced many limits in its previous infrastructure and tried to fix them and move ahead. This behavior gave as many other technologies, some of these not yet completely public:

  • Caffeine, a new search infrastructure who use GFS2, next-generation MapReduce and next-generation BigTable.
  • Colossus, formerly known as Google FileSystem 2 the next generation GFS.
  • Spanner (PDF paper), a scalable, multi-version, globally-distributed, and synchronously-replicated database, the NewSQL evolution of BigTable
  • Dremel (PDF paper), a scalable, near-real-time ad-hoc query system for analysis of read-only nested data, and its implementation for the GAEBigQuery.
  • Percolator (PDF paper), a platform for incremental processing which continually update the search index.
  • Pregel (PDF paper), a system for large-scale graph processing similar to MapReduce for columnar data.

Now market is different than 2002. Many companies such Cloudera and MapR are working hard for big-data and Apache Foundation as well. Anyway Google has 10 years of advantages and its technologies are still stunning.

Probably many of these papers are going to influence the next 10 year. First results are already here. Apache Drill and Cloudera Impala implement the Dremel paper specification, Apache Giraph implements the Pregel one and HBase Coprocessor the Percolator one.

And they are just some examples, a Google search can show you more 😉


Recently I needed to select best hosted service for some datastore to use for a large and complex project. Starting from Heroku and AppFog’s add-ons I found many free plans useful to test service and/or to use in production if your app is small enough (as example this blog runs on Heroku PostgreSQL’s Dev plan). Here the list:


  • Xeround (Starter plan): 5 connection and 10 MB of storage
  • ClearDB (Ignite plan): 10 connections and 5 MB of storage


  • MongoHQ (Sandbox): 50MB of memory, 512MB of data
  • MongoLab (Starter plan): 496 MB of storage


  • RedisToGo (Nano plan): 5MB, 1 DB, 10 connections and no backups.
  • RedisCloud by Garantia Data: 20MB, 1 DB, 10 connections and no backups.
  • MyRedis (Gratis plan): 5MB, 1 DB, 3 connections and no backups.



  • IrisCouch (up to 5$): No limits, usage fees for HTTP requests and storage.
  • Cloudant (Oxygen plan): 150,000 request, 250 MB of storage.

PostgreSQL – Heroku PostgreSQL (Dev plan): 20 connections, 10.000 rows of data
Cassandra – Cassandra.io (Beta on Heroku): 500 MB and 50 transactions per second
Riak – RiakOn! (Sandbox): 512MB of memory
Hadoop – Treasure Data (Nano plan): 100MB (compressed), data retention for 90 days
Neo4j – Heroku Neo4j (Heroku AddOn beta): 256MB of memory and 512MB of data.
OrientDB – NuvolaBase (Free): 100MB of storage and 100.000 records
TempoDB – TempoDB Hosted (Development plan): 50.000.000 data points, 50 series.
JustOneDB – Heroku JustOneDB (Lambda plan): 50MB of data

Facebook has one of the biggest hardware infrastructure in the world with more than 60.000 servers. But servers are worthless if you can’t efficiently use them. So how does Facebook works?

Facebook born in 2004 on a LAMP stack. A PHP website with MySQL datastore. Now is powered by a complex set of interconnected systems grown over the original stack. Main components are:

  1. Frontend
  2. Main persistence layer
  3. Data warehouse layer
  4. Facebook Images
  5. Facebook Messages and Chat
  6. Facebook Search

1. Frontend

hiphopThe frontend is written in PHP and compiled using HipHop (open sourced on github) and g++. HipHop converts PHP in C++ you can compile. Facebook frontend is a single 1,5 GB binary file deployed using BitTorrent. It provide logic and template system. Some parts are external and use HipHop Interpreter (to develop and debug) and HipHop Virtual Machine to run HipHop “ByteCode”.

Is not clear which web server they are using. Apache was used in the beginning and probably is still used for main fronted. In 2009 they acquired (and open sourced) Tornado from FriendFeed and are using it for Real-Time updates feature of the frontend. Static contents (such as fronted images) are served through Akamai.

Everything not in the main binary (or in the ByteCode source) is served using using Thrift protocol. Java services are served without Tomcat or Jetty. Overhead required by these application server is worthless for Facebook architecture. Varnish is used to accelerate responses to HTTP requests.

To speed up frontend rendering they use BigPipe, a sort of mod_pagespeed. After a HTTP request servers fetch data and build HTML structure. HTML is sent before data retrieval and is rendered by the browser. After render, Javascript retrieve and display data (already available) in asynchronous way.

memcachedThe Memcached infrastructure is based on more than 800 servers with 28TB of memory. They use a modded version of Memcached which reimplement the connection pool buffer (and rely on some mods on the network layer of Linux) to better manage hundred of thousand of connections.

Insights and Sources:

2. Main Persistence layer

Facebook rely on MySQL as main datastore. I know, I can’t believe it too…

mysql_clusterThey have one of the largest MySQL Cluster deploy in the world and use the standard InnoDB engine. As many other people they designd the system to easìly handle MySQL failure, they simply enforce backup. Recently Facebook Engineers published a post about their 3-level backup stack:

  • Stage 1: each node (all the replica set) of cluster has 2 rack backup. One of these backup binary log (to speed up slave promotion) and one with mysqldump snapshot taken every night (to handle node failure).
  • Stage 2: each binlog and backup are copied in a larger Hadoop cluster mapped using HDFS. This layer provide a fast and distributed source of data useful to restore nodes also in differenti locations.
  • Stage 3: provide a long-term storage copying data from Hadoop cluster to a discrete storage in a separate region.

Facebook collect several statistics about failure, traffic and backups. These statistics contribute to build a “score” of a node. If a node fails or has a score too low an automatic system provide to restore it from Stage 1 or Stage 2. They don’t try to avoid problem, they simply handle them as fast as possible.

Insights and Sources:

3. Data warehouse layer

hadoopMySQL data is also moved to a “cold” storage to be analyzed. This storage is based on Hadoop HDFS (which leverage on HBase) and is queried using Hive. Data such as logging, clicks and feeds transit using Scribe and are aggregating and stored in Hadoop HDFS using Scribe-HDFS.

Standard Hadoop HDFS implementation has some limitations. Facebook adds a different kind of NameNodes called AvatarNodes which use Apache ZooKeeper to handle node failure. They also adds RaidNode that use XOR-parity files to decrease storage usage.


Analysis are made using MapReduce. Anyway analyze several petabytes of data is hard and standard Hadoop MapReduce framework became a limit on 2011. So they developed Corona, a new scheduling framework that separates cluster resource management from job coordination. Results are stored into Oracle RAC (Real Application Cluster).

Insights and Sources:

In the next post I’m going to analyze the other Facebook services: Images, Message and Search.

Yesterday @lastknight was looking for something to store and query a huge graph dataset. He found Titan developed by Aurelius and released last August. It is a distributed graph database who can rely on Apache Cassandra, Apache HBase or Oracle BerkeyDB for storage. It promises to be fully distributed and horizontally scalable, it’s really ambitious and presentation seem really interesting 🙂

Aurelius also develops Faunus, a Apache Hadoop-based graph analytics engine for analyzing massive-scale graphs.

More about Titan and Faunus:

[UPDATE 2013-03-10] Subscription page describe details about software (developed and planned) by Aurelius. The Fulgora processor seems really interesting. http://thinkaurelius.com/subscription/