How it Works: Datalytics

I few days ago I have been at Codemotion in Milan and I had the opportunity to discover some insights about technologies used by two of our main competitor in Italy: BlogMeter and Datalytics. It’s quite interesting because, also if technical challenges are almost the same, each company use a differente approach with a different stack.

datalytics_logo

Datalytics a is relatively new company founded 4 months ago. They had a desk at Codemotion to show theirs products and recruit new people. I chatted with Marco Caruso, the CTO (who probably didn’t know who I am, sorry Marco, I just wanted to avoid hostility 😉 ), about technologies they use and developer profile they were looking for. Requires skills was:

Their tech team is composed by 4 developers (including the CTO) and main products are: Datalytics Monitoring™ (a sort of statistical dashboard that shows buzz stats in real time) and Datalytics Engage™ (a real time analytics dashboard for live events). I have no technical insights about how they systems works but I can guess some details inferring them from the buzz words they use.

Supported sources are Twitter, Facebook (only public data), Instagram, Youtube, Vine (logos are on their website) and probably Pinterest.

They use DataSift as data source in addition to standard APIs. I suppose their processing pipeline uses Storm to manage streaming input, maybe with an importing layer before. Data is crunched using Hadoop and Java and results are stored on MongoDB (Massimo Brignoli, Italian MongoDB evangelist, advertise their company during his presentation so I suppose they largely use it).

Node.js should be used for frontend. Is fast enough for near real time application (also using websockets) and play really well both with Angular.js and MongoDB (the MEAN stack). D3.js is obviously the only choice for complex dynamic charts.

I’m not so happy when I discover a new competitor in our market segment. Competition gets harder and this is not fun. Anyway guys at Datalytics seems smart (and nice) and compete with them would be a pleasure and will push me to do my best.

Now I’m curios to know if Datalytics is monitoring buzz on the web around its company name. I’m going to tweet about this article using #Datalytics hashtag. If you find this article please tweet me “Yes, we found it bwahaha” 😛

[UPDATE 2014-12-27 21:18 CET]

@DatalyticsIT favorite my tweet on December 1st. This probably means they found my article but the didn’t read it! 😀

How it works: DataSift

datasift_logo

DataSift, as they said on their home page, “aggregate, process and deliver social data“. It is one of the oldest Twitter certified partners and offers data coming from almost every existing social network. I use it everyday to “listen” the net and find data I need for my analysis.

It’s impressive to watch how fast they collect data from external sources and deliver it to your chosen destination. When I tweet, a couple of minutes ago a JSON file land my S3 bucket.

To create an Internet scale filtering is not easy. Their infrastructure is really complex and optimized. This is a 2011 diagram of their workflow.

datasift_infrastructure

Twitter generates more than 500 million tweets per day and is only one of the available resources. The DataSift system performs 250+ million sentiment analysis with sub 100ms latency, and several TB of augmented (includes gender, sentiment, etc) data transits the platform daily. Data Filtering Nodes Can process up to 10,000 unique streams. Can do data-lookup’s on 10,000,000+ username lists in real-time. Links Augmentation Performs 27 million link resolves + lookups plus 15+ million full web page aggregations per day.

C++ is used for the performance-critical components, like the core filtering engine and PHP is for the site, external API server, most of the internal web services, and a custom-built, high performance job queue manager. Java/Scala for batch processing with HBase and MapReduce jobs. Kafka is used as queuing system and Ruby is used for deploys and provisioning. Thrift is widely used.

MySQL (Percona server) on SSD drives is used as primary storage, HBase cluster over more than 30 Hadoop nodes provides a place to store historical data and Memcached and Redis are used for caching.

Here is a schema of the processing unit which build the historical database.

datasift_historical

Message queues are another critical component of the infrastructure. 0mq (custom build from latest alpha branch, with some stability fixes, to use publisher-side filtering), used in different configurations:

  • PUB-SUB for replication / message broadcasting;
  • PUSH-PULL for round-robin workload distribution;
  • REQ-REP for health checks of different components.

Kafka for high-performance persistent queues. In both cases they’re working with the developers and contributing bug reports / traces / fixes / client libraries.

All code is pulled from the repo from Jenkins every 5 mins, automatically tested and verified with several QA tools, packaged as an RPM and moved to the dev package repo. Chef is used to automate deployments and manage configuration. All services emit StatsD events, which are combined with other system-level checks, added to Zenoss and displayed with Graphite.

The biggest challenge IMHO is filtering. Filtering at this scale requires a different approach. They started with work they did at TweetMeme. The core filter engine is in C++ and is called the Pickle Matrix. Over three years they’ve developed a compiler and their own virtual machine. We don’t know what their technology is exactly, but it might be something like Distributed Complex Event Processing with Query Rewriting.

Sources

Almost all content of this post come from the wonderful article “DataSift Architecture: Realtime Datamining At 120,000 Tweets Per Second” posted on HighScalability. Some details also from “Historical Architecture – Data Mining Billions of Tweets” from DataSift blog.

How it works: Klout

klout_logo

According to WikipediaKlout is “a website and mobile app that uses social media analytics to rank its users according to online social influence via the “Klout Score“, which is a numerical value between 1 and 100“.

This is not so different from what I try to do everyday. They get signals from social networks, process them in order to extract relevant data and show some diagrams and a synthetic index of user influence. It’s really interesting for me observe how their data is stored and processed.

At Hadoop Summit 2012, Dave Mariani (by Klout) and Denny Lee (by Microsoft) presented the Klout architecture and shown the following diagram:

klout_architecture

It shows many different technologies, a great example of polyglot persistence 🙂

Klout uses a lot of Hadoop. It’s used to collect signals coming from different Signal Collectors (one for each social network i suppose). Procedure to enhance data are written using Pig and Hive used also for data warehouse.

Currently MySQL is used only to collect user registrations, ingested into the data warehouse system. In the past they use it as bridge between the data warehouse and their “Cube“, a Microsoft SQL Server Analysis Services (SSAS). They use it for Business Intelligence with Excel and other custom apps. On 2011 data were migrated using Sqoop. Now they can leverage on Microsoft’s Hive ODBC driver and MySQL isn’t used anymore.

Website and mobile app are based on the Klout API. Data is collected from the data warehouse and stored into HBase (users profile and score) and MongoDB (interaction between users). ElasticSearch is used as search index.

Most of custom components are written in Scala. The only exception is the website, written in Javascript/Node.js.

In the end Klout is probably the biggest company working both using open source tools coming from the Hadoop ecosystem and Microsoft tools. The Hadoop version for Windows Azure, developed in pair with Hortonworks, is probably the first product of this collaboration.

Sources

The Apache Hadoop ecosystem

Few days after Google released its papers on 2003 many developers started implement them. Apache Hadoop is the biggest result of that implementation. Around Hadoop many other technologies was born and the Apache Software Foundation helped the most promising to grow up. Below there is an (incomplete) list of the Hadoop-related softwares.

Apache Hadoop (HDFS, MapReduce)

apache_hadoop_logoHadoop is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage.

The Hadoop Distributed File System (HDFS) offers a way to store large files across multiple machines. Hadoop and HDFS was derived from Google’s MapReduce and Google File System (GFS) papers.

Apache Hive (Github)

apache_hive_logoHive is a data warehouse system for Hadoop […] Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL.”

MapReduce paradigm is extremely powerful but programmers use SQL to query data from years. HiveQL is a SQL-like language to query data over the Hadoop filesystem.

An example of HiveQL:

CREATE TABLE page_view(viewTime INT, userid BIGINT,
    page_url STRING, referrer_url STRING,
    friends ARRAY<BIGINT>, properties MAP<STRING, STRING>
    ip STRING COMMENT 'IP Address of the User')
COMMENT 'This is the page view table'
PARTITIONED BY(dt STRING, country STRING)
CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '1'
    COLLECTION ITEMS TERMINATED BY '2'
    MAP KEYS TERMINATED BY '3'
STORED AS SEQUENCEFILE;

Apache Pig (Github)

pig_logoPig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. […] Pig’s infrastructure layer consists of a compiler that produces sequences of Map-Reduce programs. […] Pig’s language layer currently consists of a textual language called Pig Latin

If you don’t like SQL maybe you prefer a sort of procedural language. Pig Latin is different than HiveQL but have the same purpose: query data.

An example of Pig Latin:

set default_parallel 10;
daily   = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close);
bysymbl = group daily by symbol;
average = foreach bysymbl generate group, AVG(daily.close) as avg;
sorted  = order average by avg desc;

Apache Avro (GitHub)

avro_logoAvro is a data serialization system.

It’s a framework for performing remote procedure calls and data serialization. It can be used to pass data from one program or language to another (e.g. from C to Pig). It is particularly suited for use with scripting languages such as Pig, because data is always stored with its schema in Avro, and therefore the data is self-describing.

Apache Chukwa (Github)

chukwa_logoChukwa is an open source data collection system for monitoring large distributed systems. It’s built on top of the Hadoop Distributed File System (HDFS) and Map/Reduce framework and inherits Hadoop’s scalability and robustness.

It’s used to process and analyze generated logs and has different components:
  • Agents that run on each machine to collect the logs generated from various applications.
  • Collectors that receive data from the agent and write it to stable storage
  • MapReduce jobs or parsing and archiving the data.

chukwa_structure

Apache Drill (Github)

drill_logoDrill is a distributed system for interactive analysis of large-scale datasets, based on Google’s Dremel. Its goal is to efficiently process nested data. It is a design goal to scale to 10,000 servers or more and to be able to process petabyes of data and trillions of records in seconds.

Idea behind Drill is to build a low-latency execution engine, enabling interactive queries across billions of records instead of using a batch MapReduce process.

Apache Flume (Github)

flume_logoFlume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

It is a distributed service that makes it very easy to collect and aggregate your data into a persistent store such as HDFS. Flume can read data from almost any source – log files, Syslog packets, the standard output of any Unix process – and can deliver it to a batch processing system like Hadoop or a real-time data store like HBase.

Apache HBase (Github)

hbase_logoHBase is the Hadoop database, a distributed, scalable, big data store.

It is an open source, non-relational, distributed database modeled after Google’s BigTable, is written in Java and provides a fault-tolerant way of storing large quantities of sparse data. HBase features compression, in-memory operation, and Bloom filters on a per-column basis as outlined in the original BigTable paper. Tables in HBase can serve as the input and output for MapReduce jobs run in Hadoop.

Apache HCatalog (Github)

HCatalog is a table and storage management service for data created using Hadoop

Hadoop needs a better abstraction for data storage, and it needs a metadata service. HCatalog addresses both of these issues. It presents users with a table abstraction. This frees them from knowing where or how their data is stored. It allows data producers to change how they write data while still supporting existing data in the old format so that data consumers do not have to change their processes. It provides a shared schema and data model for Pig, Hive, and MapReduce. It will enable notifications of data availability. And it will provide a place to store state information about the data so that data cleaning and archiving tools can know which data sets are eligible for their services.

Apache Mahout (Github)

mahout_logoMahout is a machine learning library’s goal is to build scalable machine learning libraries”

It’s an implementations of distributed machine learning algorithms on the Hadoop platform. While Mahout‘s core algorithms for clustering, classification and batch based collaborative filtering are implemented on top of Apache Hadoop using the map/reduce paradigm, it does not restrict contributions to Hadoop based implementations.

Apache Oozie (Github)

oozie_logoOozie is a workflow scheduler system to manage Apache Hadoop jobs.”

Tasks performed in Hadoop sometimes require multiple Map/Reduce jobs to be chained together to complete its goal.

Oozie is a Java Web-Application that runs in a Java servlet-container and uses a database to store:

  • Workflow definitions
  • Currently running workflow instances, including instance states and variables

Oozie workflow is a collection of actions (i.e. Hadoop Map/Reduce jobs, Pig jobs) arranged in a control dependency DAG (Direct Acyclic Graph), specifying a sequence of actions execution. This graph is specified in hPDL (a XML Process Definition Language).

Apache Sqoop (Github)

sqoop_logoSqoop is a tool designed for efficiently transferring bulk data between Apache Hadoop and structured datastores such as relational databases.”

Sqoop (“SQL-to-Hadoop”) is a straightforward command-line tool with the following capabilities:

  • Imports individual tables or entire databases to files in HDFS
  • Generates Java classes to allow you to interact with your imported data
  • Provides the ability to import from SQL databases straight into your Hive data warehouse

After setting up an import job in Sqoop, you can get started working with SQL database-backed data from your Hadoop MapReduce cluster in minutes.

Apache ZooKeeper (Github)

zookeeper_logoZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications.”

it is an open source, in-memory, distributed NoSQL database, typically used for storing configuration variables.

Apache Giraph (Github)

giraph_logoGiraph is an iterative graph processing system built for high scalability. For example, it is currently used at Facebook to analyze the social graph formed by users and their connections. Giraph originated as the open-source counterpart to Pregel, the graph processing architecture developed at Google”

While it is possible to do processing on graphs with MapReduce, this approach is suboptimal for two reasons:

  1. MapReduce’s view of the world as keys and values is not the greatest way to think of graphs and often requires a significant effort to pound graph-shaped problems into MapReduce-shaped solutions.
  2. Most graph algorithms involve repeatedly iterating over the graph states, which in a MapReduce world requires multiple chained jobs. This, in turn, requires the state to be loaded and saved between each iteration, operations that can easily dominate the runtime of the computation overall.

Giraph attempts to alleviate these limitations by providing a more natural way to model graph problems:

  1. Think like a vertex!
  2. Keep the graph state in memory during the whole of the algorithm, only writing out the final state (and possibly some optional checkpointing to save progress as we go).

Rather than implementing mapper and reducer classes, one implements a Vertex, which has a value and edges and is able to send and receive messages to other vertices in the graph as the computation iterates.

Apache Accumulo (Github)

accumulo_logoAccumulo sorted, distributed key/value store is a robust, scalable, high performance data storage and retrieval system.”

It is a sorted, distributed key/value store based on Google’s BigTable design. Written in Java, Accumulo has cell-level access labels (useful for security purpose) and server-side programming mechanisms called Iterators that allows users to perform additional processing at the Tablet Server.

Apache S4 (Github)

s4_logoS4 is a general-purpose, distributed, scalable, fault-tolerant, pluggable platform that allows programmers to easily develop applications for processing continuous unbounded streams of data.”

Developed by Yahoo (which released the Yahoo’s S4 paper) and then open sourced to Apache. Inspired by MapReduce and Actor model for computation. Basic components are:

  • Processing Element (PE): Basic computational unit which can send and receive messages called Events.
  • Processing Node (PN): The logical hosts to PEs
  • Adapter: injects events into the S4 cluster and receives from it via the Communication Layer.

Apache Thrift (Github)

Thrift software framework, for scalable cross-language services development, combines a software stack with a code generation engine to build services”

It is an interface definition language that is used to define and create services for numerous languages It is used as a remote procedure call (RPC) framework and was developed at Facebook for “scalable cross-language services development”. It combines a software stack with a code generation engine to build services that work efficiently together.

To put it simply, Apache Thrift is a binary communication protocol.

Sources

Google and evolution of big-data

google_logo

I always underestimated the contribute of Google to the evolution of big-data processing. I used to think that Google only manages and shows some search results.  Not so much data. Not so much as Facebook or Twitter at least…

Obviously I was wrong. Google has to manage a HUGE amount of data and big-data processing was already a problem on 2002! Its contribution to the current processing technologies such as Hadoop and its filesystem HDFS and HBase was fundamental.

We can split contribution into two periods. The first of these (from 2003 to 2008) influenced technologies we are using today. The second (from 2009 since today) is influencing product we are going to use is the near future.

The first period gave us

  • GFS Google FileSystem (PDF paper), a scalable distributed file system for large distributed data-intensive applications which later inspire HDFS
  • BigTable (PDF paper), a columnar oriented database designed to store petabyte of data across large clusters which later inspire HBase
  • the concept of MapReduce (PDF paper), a programming model to process large datasets distributed across large cluster. Hadoop implements this programming model over the HDFS or similar filesystems.

This series of paper revolutionized the strategies behind data warehouse and now all the largest companies uses products, inspired by these papers, we all knows.

The second period is less popular at the moment. Google faced many limits in its previous infrastructure and tried to fix them and move ahead. This behavior gave as many other technologies, some of these not yet completely public:

  • Caffeine, a new search infrastructure who use GFS2, next-generation MapReduce and next-generation BigTable.
  • Colossus, formerly known as Google FileSystem 2 the next generation GFS.
  • Spanner (PDF paper), a scalable, multi-version, globally-distributed, and synchronously-replicated database, the NewSQL evolution of BigTable
  • Dremel (PDF paper), a scalable, near-real-time ad-hoc query system for analysis of read-only nested data, and its implementation for the GAEBigQuery.
  • Percolator (PDF paper), a platform for incremental processing which continually update the search index.
  • Pregel (PDF paper), a system for large-scale graph processing similar to MapReduce for columnar data.

Now market is different than 2002. Many companies such Cloudera and MapR are working hard for big-data and Apache Foundation as well. Anyway Google has 10 years of advantages and its technologies are still stunning.

Probably many of these papers are going to influence the next 10 year. First results are already here. Apache Drill and Cloudera Impala implement the Dremel paper specification, Apache Giraph implements the Pregel one and HBase Coprocessor the Percolator one.

And they are just some examples, a Google search can show you more 😉

Insights: