datasift_logo

DataSift, as they said on their home page, “aggregate, process and deliver social data“. It is one of the oldest Twitter certified partners and offers data coming from almost every existing social network. I use it everyday to “listen” the net and find data I need for my analysis.

It’s impressive to watch how fast they collect data from external sources and deliver it to your chosen destination. When I tweet, a couple of minutes ago a JSON file land my S3 bucket.

To create an Internet scale filtering is not easy. Their infrastructure is really complex and optimized. This is a 2011 diagram of their workflow.

datasift_infrastructure

Twitter generates more than 500 million tweets per day and is only one of the available resources. The DataSift system performs 250+ million sentiment analysis with sub 100ms latency, and several TB of augmented (includes gender, sentiment, etc) data transits the platform daily. Data Filtering Nodes Can process up to 10,000 unique streams. Can do data-lookup’s on 10,000,000+ username lists in real-time. Links Augmentation Performs 27 million link resolves + lookups plus 15+ million full web page aggregations per day.

C++ is used for the performance-critical components, like the core filtering engine and PHP is for the site, external API server, most of the internal web services, and a custom-built, high performance job queue manager. Java/Scala for batch processing with HBase and MapReduce jobs. Kafka is used as queuing system and Ruby is used for deploys and provisioning. Thrift is widely used.

MySQL (Percona server) on SSD drives is used as primary storage, HBase cluster over more than 30 Hadoop nodes provides a place to store historical data and Memcached and Redis are used for caching.

Here is a schema of the processing unit which build the historical database.

datasift_historical

Message queues are another critical component of the infrastructure. 0mq (custom build from latest alpha branch, with some stability fixes, to use publisher-side filtering), used in different configurations:

  • PUB-SUB for replication / message broadcasting;
  • PUSH-PULL for round-robin workload distribution;
  • REQ-REP for health checks of different components.

Kafka for high-performance persistent queues. In both cases they’re working with the developers and contributing bug reports / traces / fixes / client libraries.

All code is pulled from the repo from Jenkins every 5 mins, automatically tested and verified with several QA tools, packaged as an RPM and moved to the dev package repo. Chef is used to automate deployments and manage configuration. All services emit StatsD events, which are combined with other system-level checks, added to Zenoss and displayed with Graphite.

The biggest challenge IMHO is filtering. Filtering at this scale requires a different approach. They started with work they did at TweetMeme. The core filter engine is in C++ and is called the Pickle Matrix. Over three years they’ve developed a compiler and their own virtual machine. We don’t know what their technology is exactly, but it might be something like Distributed Complex Event Processing with Query Rewriting.

Sources

Almost all content of this post come from the wonderful article “DataSift Architecture: Realtime Datamining At 120,000 Tweets Per Second” posted on HighScalability. Some details also from “Historical Architecture – Data Mining Billions of Tweets” from DataSift blog.

Last month while I was inspecting the Hadoop ecosystem I found many other software related to big-data. Below the (incomplete again) list.

N.B. Informations and texts are taken from official websites or sources referenced at the end of the article.

facebook_scribe_logoFacebook Scribe (Github)

Scribe is a server for aggregating log data streamed in real time from a large number of servers. It is designed to be scalable, extensible without client-side modification, and robust to failure of the network or any specific machine.”

facebook_scribe_diagramScribe servers are arranged in a directed graph, with each server knowing only about the next server in the graph. This network topology allows for adding extra layers of fan-in as a system grows, and batching messages before sending them between datacenters, without having any code that explicitly needs to understand datacenter topology, only a simple configuration.
Scribe was designed to consider reliability but to not require heavyweight protocols and expansive disk usage.

Facebook McDipper

McDipper is a highly performant flash-based cache server that is Memcache protocol compatible.”

Facebook heavily use Memcached in early stages and when RAM’s cost became too expensive had to find a different solution. This is why McDipper was born. It is not different then other projects like Fatcache or Edis. As many DBMS did, they try to mimic an in-memory caching engine in order to extend available space. SSD are fast enough to have no problem about speed loss.

Facebook Haystack

“The new photo infrastructure merges the photo serving tier and storage tier into one physical tier. It implements a HTTP based photo server which stores photos in a generic object store called Haystack.”

A system specifically designed to serve static content using HTTP as fast as possible. Haystack can be broken down into these functional layers –

  • HTTP server
  • Photo Store
  • Haystack Object Store
  • Filesystem
  • Storage

I talked about it in a previous post.

Facebook Memcached (Github)

“Here at Facebook, we’re likely the world’s largest user of memcached. We use memcached to alleviate database load. memcached is already fast, but we need it to be faster and more efficient than most installations.”

Twitter Storm (Github)

Storm is a distributed realtime computation system. Similar to how Hadoop provides a set of general primitives for doing batch processing, Storm provides a set of general primitives for doing realtime computation.”

twitter_storm_topologyThere are just three abstractions in Storm: spouts, bolts, and topologies.

A spout is a source of streams in a computation. Typically a spout reads from a queueing broker such as Kestrel, RabbitMQ, or Kafka, but a spout can also generate its own stream or read from somewhere.

A bolt processes any number of input streams and produces any number of new output streams. Most of the logic of a computation goes into bolts, such as functions, filters, streaming joins, streaming aggregations, talking to databases, and so on.

A topology is a network of spouts and bolts, with each edge in the network representing a bolt subscribing to the output stream of some other spout or bolt.

Twitter Snowflakes (Github)

Snowflake is a network service for generating unique ID numbers at high scale with some simple guarantees.”

Twitter needed something that could generate tens of thousands of ids per second in a highly available manner. This naturally led us to choose an uncoordinated approach.

These ids need to be roughly sortable, meaning that if tweets A and B are posted around the same time, they should have ids in close proximity to one another since this is how we and most Twitter clients sort tweets. Additionally, these numbers have to fit into 64 bits.

To generate them in an uncoordinated manner, we settled on a composition of: timestamp, worker number and sequence number. Sequence numbers are per-thread and worker numbers are chosen at startup via zookeeper (though that’s overridable via a config file).

Twitter Fatcache (Github)

Fatcache is memcache on SSD. Think of fatcache as a cache for your big data.”

SSD-backed memory presents a viable alternative for applications with large workloads that need to maintain high hit rate for high performance.

Like Facebook McDipper, Fatcache try to overcome memory limitations. It maintains an in-memory index for all data stored on disk. An in-memory index serves two purposes: cheap object existence checks and on-disk object address storage.

To minimize the number of small, random writes, fatcache treats the SSD as a log-structured object store. All writes are aggregated in memory and written to the end of the circular log in batches – usually multiples of 1 MB.

Twitter FlockDB (Github)

FlockDB is a database that stores graph data, but it isn’t a database optimized for graph-traversal operations. Instead, it’s optimized for very large adjacency lists, fast reads and writes, and page-able set arithmetic queries.”

flockdb_diagramIt is a distributed graph database for storing adjacency lists, with goals of supporting:

  • a high rate of add/update/remove operations
  • potientially complex set arithmetic queries
  • paging through query result sets containing millions of entries
  • ability to “archive” and later restore archived edges
  • horizontal scaling including replication
  • online data migration

Non-goals include: multi-hop queries (or graph-walking queries), automatic shard migrations

FlockDB is much simpler than other graph databases such as Neo4j because it tries to solve fewer problems. It scales horizontally and is designed for on-line, low-latency, high throughput environments such as web-sites.

twemcache_logoTwitter Twemcache (Github)

“We built Twemcache because we needed a more robust and manageable version of Memcached, suitable for our large-scale production environment.”

Apache Kafka (Github)

Kafka is publish-subscribe messaging rethought as a distributed commit log.”

apache_kafka_diagramIt is designed to support the following

  • Persistent messaging with O(1) disk structures that provide constant time performance even with many TB of stored messages.
  • High-throughput: even with very modest hardware Kafka can support hundreds of thousands of messages per second.
  • Explicit support for partitioning messages over Kafka servers and distributing consumption over a cluster of consumer machines while maintaining per-partition ordering semantics.
  • Support for parallel data load into Hadoop.

Kafka is aimed at providing a publish-subscribe solution that can handle all activity stream data and processing on a consumer-scale web site.

apache_gora_logoApache Gora (Github)

Gora is an open source framework provides an in-memory data model and persistence for big data. Gora supports persisting to column stores, key value stores, document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce support.”

Apache Mesos (Github)

Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications, or frameworks. It can run Hadoop, MPI, Hypertable, Spark”

apache_mesos_structureMain features are

  • Fault-tolerant replicated master using ZooKeeper.
  • Scalability to 10,000s of nodes using fast, event-driven C++ implementation.
  • Isolation between tasks with Linux Containers.
  • Multi-resource scheduling (memory and CPU aware).
  • Efficient application-controlled scheduling mechanism.
  • Java, Python and C++ APIs for developing new parallel applications.
  • Web UI for viewing cluster state.

apache_spark_logoApache Spark (Github)

Spark is an open source cluster computing system that aims to make data analytics fast  both fast to run and fast to write. To run programs faster, Spark provides primitives for in-memory cluster computing: your job can load data into memory and query it repeatedly much more quickly than with disk-based systems like Hadoop MapReduce.”

Spark was initially developed for two applications where keeping data in memory helps: iterative algorithms, which are common in machine learning, and interactive data mining. In both cases, Spark can run up to100x faster than Hadoop MapReduce. However, you can use Spark for general data processing too.

Shark: Hive on Spark (Github)

Shark is a large-scale data warehouse system for Spark designed to be compatible with Apache Hive. It can execute Hive QL queries up to 100 times faster than Hive without any modification to the existing data or queries. Shark supports Hive’s query language, metastore, serialization formats, and user-defined functions, providing seamless integration with existing Hive deployments and a familiar, more powerful option for new ones.”

shark_spark_diagramShark is built on top of Spark, a data-parallel execution engine that is fast and fault-tolerant. Even if data are on disk, Shark can be noticeably faster than Hive because of the fast execution engine. It avoids the high task launching overhead of Hadoop MapReduce and does not require materializing intermediate data between stages on disk. Thanks to this fast engine, Shark can answer queries in sub-second latency.

fluentd_logoFluentd (Github)

Fluentd is an open-source tool to collect events and logs. 150+ plugins instantly enables you to store the massive data for Log Search, Big Data Analytics, and Archiving (MongoDB, S3, Hadoop)”

fluentd_diagram

The fundamental problem with logs is that they are usually stored in files although they are best represented as streams (by Adam Wiggins, CTO at Heroku). Traditionally, they have been dumped into text-based files and collected by rsync in hourly or daily fashion. With today’s web/mobile applications, this creates two problems.

  1. Need ad-hoc parsing: The text-based logs have their own format, and analytics engineer need to write a dedicated parser for each format. But that’s probably not the best use of your time. You should be analyzing data to make better business decisions instead of writing one parser after another.
  2. Lacks Freshness: The logs lag. The realtime analysis of user behavior makes feature iterations a lot faster. A nimbler A/B testing will help you differentiate your service from competitors.

This is where Fluentd comes in. We believe Fluentd solves all issues of scalable log collection by getting rid of files and turning logs into true semi-structured data streams.

kestrel_logo

Kestrel (Github)

“Kestrel is a very simple message queue that runs on the JVM. It supports multiple protocols: memcache: the memcache protocol, with some extensions, thrift: Apache Thrift-based RPC, text: a simple text-based protocol”

A cluster of kestrel servers is like a memcache cluster: the servers don’t know about each other, and don’t do any cross-communication, so you can add as many as you like. The simplest clients have a list of all servers in the cluster, and pick one at random for each operation. In this way, each queue appears to be spread out across every server, with items in a loose ordering. More advanced clients can find kestrel servers via ZooKeeper.

impala_logoCloudera Impala (Github)

Impala is an open source Massively Parallel Processing (MPP) query engine that runs natively on Apache Hadoop”

With Impala, you can query data, whether stored in HDFS or Apache HBase – including SELECT, JOIN, and aggregate functions – in real time.

To avoid latency, Impala circumvents MapReduce to directly access the data through a specialized distributed query engine that is very similar to those found in commercial parallel RDBMSs. The result is order-of-magnitude faster performance than Hive, depending on the type of query and configuration. (See FAQ below for more details.) Note that this performance improvement has been confirmed by several large companies that have tested Impala on real-world workloads for several months now.

Structure diagram:

cloudera_impala_diagram

HadoopDB

HadoopDB is an Architectural Hybrid of MapReduce and DBMS Technologies for Analytical Workloads”

HadoopDB is:

  • A hybrid of DBMS and MapReduce technologies that targets analytical workloads
  • Designed to run on a shared-nothing cluster of commodity machines, or in the cloud
  • An attempt to fill the gap in the market for a free and open source parallel DBMS
  • Much more scalable than currently available parallel database systems and DBMS/MapReduce hybrid systems.
  • As scalable as Hadoop, while achieving superior performance on structured data analysis workloads

hypertable_logoHypertable

Hypertable is an open source database system inspired by publications on the design of Google’s BigTable. The project is based on experience of engineers who were solving large-scale data-intensive tasks for many years.”

This project is for the design and implementation of a high performance, scalable, distributed storage and processing system for structured and unstructured data. It is designed to manage the storage and processing of information on a large cluster of commodity servers, providing resilience to machine and component failures. Data is represented in the system as a multi-dimensional table of information. The data in a table can be transformed and organized at high speed by performing computations in parallel, pushing them to where the data is physically stored.

Sources: