datasift_logo

DataSift, as they said on their home page, “aggregate, process and deliver social data“. It is one of the oldest Twitter certified partners and offers data coming from almost every existing social network. I use it everyday to “listen” the net and find data I need for my analysis.

It’s impressive to watch how fast they collect data from external sources and deliver it to your chosen destination. When I tweet, a couple of minutes ago a JSON file land my S3 bucket.

To create an Internet scale filtering is not easy. Their infrastructure is really complex and optimized. This is a 2011 diagram of their workflow.

datasift_infrastructure

Twitter generates more than 500 million tweets per day and is only one of the available resources. The DataSift system performs 250+ million sentiment analysis with sub 100ms latency, and several TB of augmented (includes gender, sentiment, etc) data transits the platform daily. Data Filtering Nodes Can process up to 10,000 unique streams. Can do data-lookup’s on 10,000,000+ username lists in real-time. Links Augmentation Performs 27 million link resolves + lookups plus 15+ million full web page aggregations per day.

C++ is used for the performance-critical components, like the core filtering engine and PHP is for the site, external API server, most of the internal web services, and a custom-built, high performance job queue manager. Java/Scala for batch processing with HBase and MapReduce jobs. Kafka is used as queuing system and Ruby is used for deploys and provisioning. Thrift is widely used.

MySQL (Percona server) on SSD drives is used as primary storage, HBase cluster over more than 30 Hadoop nodes provides a place to store historical data and Memcached and Redis are used for caching.

Here is a schema of the processing unit which build the historical database.

datasift_historical

Message queues are another critical component of the infrastructure. 0mq (custom build from latest alpha branch, with some stability fixes, to use publisher-side filtering), used in different configurations:

  • PUB-SUB for replication / message broadcasting;
  • PUSH-PULL for round-robin workload distribution;
  • REQ-REP for health checks of different components.

Kafka for high-performance persistent queues. In both cases they’re working with the developers and contributing bug reports / traces / fixes / client libraries.

All code is pulled from the repo from Jenkins every 5 mins, automatically tested and verified with several QA tools, packaged as an RPM and moved to the dev package repo. Chef is used to automate deployments and manage configuration. All services emit StatsD events, which are combined with other system-level checks, added to Zenoss and displayed with Graphite.

The biggest challenge IMHO is filtering. Filtering at this scale requires a different approach. They started with work they did at TweetMeme. The core filter engine is in C++ and is called the Pickle Matrix. Over three years they’ve developed a compiler and their own virtual machine. We don’t know what their technology is exactly, but it might be something like Distributed Complex Event Processing with Query Rewriting.

Sources

Almost all content of this post come from the wonderful article “DataSift Architecture: Realtime Datamining At 120,000 Tweets Per Second” posted on HighScalability. Some details also from “Historical Architecture – Data Mining Billions of Tweets” from DataSift blog.

Few days after Google released its papers on 2003 many developers started implement them. Apache Hadoop is the biggest result of that implementation. Around Hadoop many other technologies was born and the Apache Software Foundation helped the most promising to grow up. Below there is an (incomplete) list of the Hadoop-related softwares.

Apache Hadoop (HDFS, MapReduce)

apache_hadoop_logoHadoop is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage.

The Hadoop Distributed File System (HDFS) offers a way to store large files across multiple machines. Hadoop and HDFS was derived from Google’s MapReduce and Google File System (GFS) papers.

Apache Hive (Github)

apache_hive_logoHive is a data warehouse system for Hadoop […] Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL.”

MapReduce paradigm is extremely powerful but programmers use SQL to query data from years. HiveQL is a SQL-like language to query data over the Hadoop filesystem.

An example of HiveQL:

CREATE TABLE page_view(viewTime INT, userid BIGINT,
page_url STRING, referrer_url STRING,
friends ARRAY<BIGINT>, properties MAP<STRING, STRING>
ip STRING COMMENT 'IP Address of the User')
COMMENT 'This is the page view table'
PARTITIONED BY(dt STRING, country STRING)
CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '1'
COLLECTION ITEMS TERMINATED BY '2'
MAP KEYS TERMINATED BY '3'
STORED AS SEQUENCEFILE;

Apache Pig (Github)

pig_logoPig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. […] Pig’s infrastructure layer consists of a compiler that produces sequences of Map-Reduce programs. […] Pig’s language layer currently consists of a textual language called Pig Latin

If you don’t like SQL maybe you prefer a sort of procedural language. Pig Latin is different than HiveQL but have the same purpose: query data.

An example of Pig Latin:

set default_parallel 10;
daily   = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close);
bysymbl = group daily by symbol;
average = foreach bysymbl generate group, AVG(daily.close) as avg;
sorted  = order average by avg desc;

Apache Avro (GitHub)

avro_logoAvro is a data serialization system.

It’s a framework for performing remote procedure calls and data serialization. It can be used to pass data from one program or language to another (e.g. from C to Pig). It is particularly suited for use with scripting languages such as Pig, because data is always stored with its schema in Avro, and therefore the data is self-describing.

Apache Chukwa (Github)

chukwa_logoChukwa is an open source data collection system for monitoring large distributed systems. It’s built on top of the Hadoop Distributed File System (HDFS) and Map/Reduce framework and inherits Hadoop’s scalability and robustness.

It’s used to process and analyze generated logs and has different components:
  • Agents that run on each machine to collect the logs generated from various applications.
  • Collectors that receive data from the agent and write it to stable storage
  • MapReduce jobs or parsing and archiving the data.

chukwa_structure

Apache Drill (Github)

drill_logoDrill is a distributed system for interactive analysis of large-scale datasets, based on Google’s Dremel. Its goal is to efficiently process nested data. It is a design goal to scale to 10,000 servers or more and to be able to process petabyes of data and trillions of records in seconds.

Idea behind Drill is to build a low-latency execution engine, enabling interactive queries across billions of records instead of using a batch MapReduce process.

Apache Flume (Github)

flume_logoFlume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

It is a distributed service that makes it very easy to collect and aggregate your data into a persistent store such as HDFS. Flume can read data from almost any source – log files, Syslog packets, the standard output of any Unix process – and can deliver it to a batch processing system like Hadoop or a real-time data store like HBase.

Apache HBase (Github)

hbase_logoHBase is the Hadoop database, a distributed, scalable, big data store.

It is an open source, non-relational, distributed database modeled after Google’s BigTable, is written in Java and provides a fault-tolerant way of storing large quantities of sparse data. HBase features compression, in-memory operation, and Bloom filters on a per-column basis as outlined in the original BigTable paper. Tables in HBase can serve as the input and output for MapReduce jobs run in Hadoop.

Apache HCatalog (Github)

HCatalog is a table and storage management service for data created using Hadoop

Hadoop needs a better abstraction for data storage, and it needs a metadata service. HCatalog addresses both of these issues. It presents users with a table abstraction. This frees them from knowing where or how their data is stored. It allows data producers to change how they write data while still supporting existing data in the old format so that data consumers do not have to change their processes. It provides a shared schema and data model for Pig, Hive, and MapReduce. It will enable notifications of data availability. And it will provide a place to store state information about the data so that data cleaning and archiving tools can know which data sets are eligible for their services.

Apache Mahout (Github)

mahout_logoMahout is a machine learning library’s goal is to build scalable machine learning libraries”

It’s an implementations of distributed machine learning algorithms on the Hadoop platform. While Mahout‘s core algorithms for clustering, classification and batch based collaborative filtering are implemented on top of Apache Hadoop using the map/reduce paradigm, it does not restrict contributions to Hadoop based implementations.

Apache Oozie (Github)

oozie_logoOozie is a workflow scheduler system to manage Apache Hadoop jobs.”

Tasks performed in Hadoop sometimes require multiple Map/Reduce jobs to be chained together to complete its goal.

Oozie is a Java Web-Application that runs in a Java servlet-container and uses a database to store:

  • Workflow definitions
  • Currently running workflow instances, including instance states and variables

Oozie workflow is a collection of actions (i.e. Hadoop Map/Reduce jobs, Pig jobs) arranged in a control dependency DAG (Direct Acyclic Graph), specifying a sequence of actions execution. This graph is specified in hPDL (a XML Process Definition Language).

Apache Sqoop (Github)

sqoop_logoSqoop is a tool designed for efficiently transferring bulk data between Apache Hadoop and structured datastores such as relational databases.”

Sqoop (“SQL-to-Hadoop”) is a straightforward command-line tool with the following capabilities:

  • Imports individual tables or entire databases to files in HDFS
  • Generates Java classes to allow you to interact with your imported data
  • Provides the ability to import from SQL databases straight into your Hive data warehouse

After setting up an import job in Sqoop, you can get started working with SQL database-backed data from your Hadoop MapReduce cluster in minutes.

Apache ZooKeeper (Github)

zookeeper_logoZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications.”

it is an open source, in-memory, distributed NoSQL database, typically used for storing configuration variables.

Apache Giraph (Github)

giraph_logoGiraph is an iterative graph processing system built for high scalability. For example, it is currently used at Facebook to analyze the social graph formed by users and their connections. Giraph originated as the open-source counterpart to Pregel, the graph processing architecture developed at Google”

While it is possible to do processing on graphs with MapReduce, this approach is suboptimal for two reasons:

  1. MapReduce’s view of the world as keys and values is not the greatest way to think of graphs and often requires a significant effort to pound graph-shaped problems into MapReduce-shaped solutions.
  2. Most graph algorithms involve repeatedly iterating over the graph states, which in a MapReduce world requires multiple chained jobs. This, in turn, requires the state to be loaded and saved between each iteration, operations that can easily dominate the runtime of the computation overall.

Giraph attempts to alleviate these limitations by providing a more natural way to model graph problems:

  1. Think like a vertex!
  2. Keep the graph state in memory during the whole of the algorithm, only writing out the final state (and possibly some optional checkpointing to save progress as we go).

Rather than implementing mapper and reducer classes, one implements a Vertex, which has a value and edges and is able to send and receive messages to other vertices in the graph as the computation iterates.

Apache Accumulo (Github)

accumulo_logoAccumulo sorted, distributed key/value store is a robust, scalable, high performance data storage and retrieval system.”

It is a sorted, distributed key/value store based on Google’s BigTable design. Written in Java, Accumulo has cell-level access labels (useful for security purpose) and server-side programming mechanisms called Iterators that allows users to perform additional processing at the Tablet Server.

Apache S4 (Github)

s4_logoS4 is a general-purpose, distributed, scalable, fault-tolerant, pluggable platform that allows programmers to easily develop applications for processing continuous unbounded streams of data.”

Developed by Yahoo (which released the Yahoo’s S4 paper) and then open sourced to Apache. Inspired by MapReduce and Actor model for computation. Basic components are:

  • Processing Element (PE): Basic computational unit which can send and receive messages called Events.
  • Processing Node (PN): The logical hosts to PEs
  • Adapter: injects events into the S4 cluster and receives from it via the Communication Layer.

Apache Thrift (Github)

Thrift software framework, for scalable cross-language services development, combines a software stack with a code generation engine to build services”

It is an interface definition language that is used to define and create services for numerous languages It is used as a remote procedure call (RPC) framework and was developed at Facebook for “scalable cross-language services development”. It combines a software stack with a code generation engine to build services that work efficiently together.

To put it simply, Apache Thrift is a binary communication protocol.

Sources