The big-data environment at the moment is really “collaborative”. Each project is ready to run on almost every available platform and this is good. However, recently two factions are forming: people who use the Hadoop 2.0 Stack and people who use the BDAS.

Hadoop 2.0 Stack

hadoop_stack

The most important difference between Hadoop 2.0 and previous versions is YARN, the new cluster resource manager and next generation MapReduce. It can run almost every kind of big-data project:

  • Traditional Map Reduce (new version is backward compatible) with Hive or Pig or Cascading for query.
  • Interactive near real-time Map Reduce (using Tez)
  • HBase and Accumulo
  • Storm and S4 for stream processing
  • Giraph for graph processing
  • OpenMPI for message passing
  • Spark as In-memory Map Reduce
  • HDFS as distributed filesystem
  • more…

Most interesting companies here are IntelCloudera, MapR and Hortonworks.

BDAS (Berkely Data Analytics Stack)

berkeley_stack

On the BDAS everything is built around Mesos: the cluster resource manager. It’a relative new project is already widely used. Traditional HDFS is accelerated by Tachyon (in-memory file system). The main integration is around Spark which is the base for:

Mesos can also run traditional Hadoop environment and other projects (such as Storm and OpenMPI). You can also run traditional applications (also Rails apps) using Marathon.

The most interesting companies here are Databricks and Mesosphere.

Who will win? 😀

A few hours after I posted about DataSift architecture, @choult, one of the about 25 ninjas who develop DataSift platform, tweet me.

The following SlideShare presentation by @stuherbert, another ninja, talks about the use of PHP in DataSift. Unlike what you may think, PHP is widely used in data processing.

datasift_repo_languges

System is decomposable in three major data pipelines:

  • Data Archiving (Adds new data to Historic Archive)
  • Filtering Pipeline (Filtering and delivery data in realtime)
  • Playback Pipeline (Filtering and delivery data from Historic Archive)

And PHP is used for many parts of these.

datasift_php

They use a custom build of PHP 5.3.latest with several optimizations and compiled-in extensions (ZeroMQ, APC, XHProof, Redis, XDebug). The also develop some internal components:

  • Frink, tweetrmeme’s framework
  • Stone: foundation of in-house test tools, Hornet and Storyteller (they probably open source a fork named Storyplayer).

Unfortunately I wasn’t able to find more details about these. Anyway, here is the presentation:


Everything started while I was writing my first post about the Hadoop Ecosystem. I was relatively new to Hadoop and I wanted to discover all useful projects. I started collecting projects for about 9 months building a simple index.

About a month ago I found an interesting thread posted on the Hadoop Users Group on LinkedIn written by Javi Roman, High Performance Computing Manager at CEDIANT (UAX). He talks about a table which maps the Hadoop ecosystem likewise I did on my list.

He published his list on Github a couple of day later and called it the Hadoop Ecosystem Table. It was an HTML table, really interesting but really hard to use for other purpose. I wanted to merge my list with this table so I decided to fork it and add more abstractions.

I wrote a couple of Ruby scripts (thanks Nokogiri) to extract data from my list and Javi’s table and put in an agnostic container. After a couple of days spent hacking on these parsers I found a simple but elegant solution: JSON.

Information about each project is stored in a separated JSON file:

{
"name": "Apache HDFS",
"description": "The Hadoop Distributed File System (HDFS) offers a way to store large files across \nmultiple machines. Hadoop and HDFS was derived from Google File System (GFS) paper. \nPrior to Hadoop 2.0.0, the NameNode was a single point of failure (SPOF) in an HDFS cluster. \nWith Zookeeper the HDFS High Availability feature addresses this problem by providing \nthe option of running two redundant NameNodes in the same cluster in an Active/Passive \nconfiguration with a hot standby. ",
"abstract": "a way to store large files across multiple machines",
"category": "Distributed Filesystem",
"tags": [
],
"links": [
{
"text": "hadoop.apache.org",
"url": "http://hadoop.apache.org/"
},
{
"text": "Google FileSystem - GFS Paper",
"url": "http://research.google.com/archive/gfs.html"
},
{
"text": "Cloudera Why HDFS",
"url": "http://blog.cloudera.com/blog/2012/07/why-we-build-our-platform-on-hdfs/"
},
{
"text": "Hortonworks Why HDFS",
"url": "http://hortonworks.com/blog/thinking-about-the-hdfs-vs-other-storage-technologies/"
}
]
}

It includes: project name, long and short description, category, tags and links.

I merged data into these files, and wrote a couple of generator in order to put data into different templates. Now i can generate code for my WordPress page and an update version of Javi’s table.

Finally I added more data into more generic categories not strictly related to Hadoop (like MySQL forks, Memcached forks and Search Engine platforms) and build a new version of the table: The Big Data Ecosystem table. JSON files are available to everyone and will be served directly from a CDN located under same domain of table.

This is how I built an open source big data map 🙂

vkontakte_logo

Informations about VKontakte, the largest european social network, and its infrastructure are very few and fragmented. The only recent insights, in english, about its technology is a BTI’s press release which talks about VK migration on their infrastructure. Everything was top secret.

Only on 2011 at Moscow HighLoad++Pavel Durov and Oleg Illarionov told something about the architecture of the social network and insights are collected into this post (in russian). 

VK seems not different than any other popular social network: is over a LAMP stack and uses many other open source technologies.

  • Debian is the base for their custom Linux distro.
  • nginx mange load balancing in front of Apache who runs PHP using mod_php and XCache as opcode cacher.
  • MySQL is the main datastore but a custom DBMS (written using C and based on memcached protocol) is used for some magics. memcached helps also page caching.
  • XMPP is used for messages and chats and runs over node.js. Availability is granted by HAProxy who handle the node’s fragility.
  • Multimedia files are stored using xfs and media encoding is made using ffmpeg.
  • Everything is distributed over more than 4 datacenters

vk_logoThe main difference betweek VK and other social network is about server functions: VK servers are multifunctional. There is no clear distinction between database servers or file servers, they are used simultaneously in several roles.

Load balancing between servers occurs on a layered circuit which includes at balancing DNS, as well as routing requests within the system, wherein the different servers are used for different types of requests. 

For example, microblogging is working on a tricky circuit using memcached protocol capability for parallel sending requests for data on a large number of keys. In the absence of data in the cache, the same request is sent to the storage system, and the results are subjected to sorting, filtering and discarding the excess at the level of PHP-code.

The custom database is still a secret and is widely used in VKontakte. Many services use it: private messages, messages on the walls, statuses, search, privacy, friends lists and probably more. It uses a non-relational data model, and most operations are performed in memory. Access interface is an advanced protocol memcached. Specially compiled keys return the results of complex queries. They said is developed “best minds” of Russia.

I wasn’t able to find any other insight about VK infrastructure after this speech. They are like KGB 😀

From the home page

Storm is a free and open source distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing. […] Storm has many use cases: realtime analytics, online machine learning, continuous computation, distributed RPC, ETL, and more. Storm is fast: a benchmark clocked it at over a million tuples processed per second per node. It is scalable, fault-tolerant, guarantees your data will be processed, and is easy to set up and operate.”

Introduction

Storm enables you to define a Topology (an abstraction of cluster computation) in order to describe how to handle data flow. In a topology you can define some Spouts (entry point for your data with basic preprocessing) and some Bolts (a single step of data manipulation). This simple strategy enable you to define a complex processing of streams of data.

Storm nodes are of two kinds: master and worker. Master node runs Nimbus, it is responsible for distributing code around the cluster, assigning tasks to machines, and monitoring for failures. Worker nodes run Supervisor. The Supervisor listens for work assigned to its machine and starts and stops worker processes as necessary based on what Nimbus has assigned to it. Everything is done through ZooKeeper.

Libraries

Resources

Books

Last summer I had the pleasure to review a really interesting book about Spark written by Holden Karau for PacktPub. She is a really smart woman currently software development engineer at Google, active in Spark‘s developers community. In the past she worked for MicrosoftAmazon and Foursquare.

Spark is a framework for writing fast, distributed programs. It’s similar to Hadoop MapReduce but uses fast in-memory approach. Spark ecosystem incorporates an inbuilt tools for interactive query analysis (Shark), a large-scale graph processing and analysis framework (Bagel), and real-time analysis framework (Spark Streaming). I discovered them a few months ago exploring the extended Hadoop ecosystem.

The book covers topics about how to write distributed map reduce style programs. You can find everything you need: setting up your Spark cluster, use the interactive shell and write and deploy distributed jobs in Scala, Java and Python. Last chapters look at how to use Hive with Spark to use a SQL-like query syntax with Shark, and manipulating resilient distributed datasets (RDDs).

Have fun reading it! 😀

Fast data processing with Spark
by Holden Karau

fast_data_processing_with_spark_cover

The title is also listed into Research Areas & Publications section of Google Research portal: http://research.google.com/pubs/pub41431.html

klout_logo

According to WikipediaKlout is “a website and mobile app that uses social media analytics to rank its users according to online social influence via the “Klout Score“, which is a numerical value between 1 and 100“.

This is not so different from what I try to do everyday. They get signals from social networks, process them in order to extract relevant data and show some diagrams and a synthetic index of user influence. It’s really interesting for me observe how their data is stored and processed.

At Hadoop Summit 2012, Dave Mariani (by Klout) and Denny Lee (by Microsoft) presented the Klout architecture and shown the following diagram:

klout_architecture

It shows many different technologies, a great example of polyglot persistence 🙂

Klout uses a lot of Hadoop. It’s used to collect signals coming from different Signal Collectors (one for each social network i suppose). Procedure to enhance data are written using Pig and Hive used also for data warehouse.

Currently MySQL is used only to collect user registrations, ingested into the data warehouse system. In the past they use it as bridge between the data warehouse and their “Cube“, a Microsoft SQL Server Analysis Services (SSAS). They use it for Business Intelligence with Excel and other custom apps. On 2011 data were migrated using Sqoop. Now they can leverage on Microsoft’s Hive ODBC driver and MySQL isn’t used anymore.

Website and mobile app are based on the Klout API. Data is collected from the data warehouse and stored into HBase (users profile and score) and MongoDB (interaction between users). ElasticSearch is used as search index.

Most of custom components are written in Scala. The only exception is the website, written in Javascript/Node.js.

In the end Klout is probably the biggest company working both using open source tools coming from the Hadoop ecosystem and Microsoft tools. The Hadoop version for Windows Azure, developed in pair with Hortonworks, is probably the first product of this collaboration.

Sources

I use to spend much time playing with data in order to import, export and aggregate it. MySQL is one on my primary source of data because stays behind many popular projects and is usually first choice also for custom solutions.

Recently I discovered some really useful unknown functions which help me to export complex data.

GROUP_CONCAT

This function returns a string result with the concatenated non-NULL values from a group. It returns NULL if there are no non-NULL values.

SELECT student_name,
GROUP_CONCAT(test_score SEPARATOR ',')
FROM student
GROUP BY student_name;

SELECT INTO

The SELECT * INTO OUTFILE statement is intended primarily to let you very quickly dump a table to a text file on the server machine. It is really useful to export data as CSV directly from your master server. You can also use DUMPFILE if you need a raw output.

SELECT a,b,a+b INTO OUTFILE '/tmp/result.txt'
FIELDS TERMINATED BY ','
OPTIONALLY ENCLOSED BY '"'
ESCAPED BY '"'
LINES TERMINATED BY '\n'
FROM test_table;

If you plan to use it with a standard CSV library you must refer to RFC 4180 for correct format in order to avoid reading errors.

Apache Sqoop

If you database is bigger than you are able to manage you probably need Sqoop. It is is a tool designed for efficiently transferring bulk data between Apache Hadoop and structured datastores such as relational databases.

You can import data from you MySQL database to a CSV file stored on HDFS and access it from anywhere in your cluster.

sqoop import \
--connect jdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities

References

It’s about half an year I want to move my blog away from Heroku. It’s the best PaaS I ever used but the free plan has a huge limit: the dynos idle. In a previous post i talked about how to use Heroku to build a reverse proxy in front of AppFog to avoid theirs custom domain limit but the idle problem is still there. My blog has less than 100 visits per day and almost every visitor has to wait 5-10 seconds to view home page because dynos are always idle.

openshift_logoToday I decided to move to another platform suggested by my friend @dani_viga: OpenShift. It’s a PaaS similar to Heroku which use Git to control revision and has a similar scaling system. And the free plan hasn’t the idle problem and it’s 10 times faster!

I created a new application using the following cartridge: PHP 5.3, MySQL 5.1 (I’d like to use MariaDB but cartridge is still in development and I couldn’t install it) and phpMyAdmin 3.4. They require a Git repo to setup application and provide a WordPress template to start. I used it as template moving code of my blog into /php directory.

The hard part was to migrate my PostgreSQL database into the new MySQL. To start I removed PG4WP plugin following installation instruction in reverse order.

Then I exported my PostgreSQL database using heroku db:pull command. It’s based on taps and is really useful. I had some problems with my local installation of MySQL because taps has no options about packet size and character set so you must set them as default. I added a few line to my.cnf configuration:

# enlarged, before was 1M
max_allowed_packet = 10M
# default to utf-8
skip-character-set-client-handshake
character_set_client=utf8
character_set_server=utf8

At the end of the pull my local database contains a exact copy of the Heroku one and I can dump to a SQL file and import into the new MySQL cartridge using phpMyAdmin.

The only problem I had was about SSL certificate. The free plan doesn’t offer SSL certificate for custom domain so I have to remove the use of HTTPS for the login. You can do in the wp-config.php setting:

define('FORCE_SSL_ADMIN', false);

Now my blog runs on OpenShift and by now seems incredibly faster 😀

Last month while I was inspecting the Hadoop ecosystem I found many other software related to big-data. Below the (incomplete again) list.

N.B. Informations and texts are taken from official websites or sources referenced at the end of the article.

facebook_scribe_logoFacebook Scribe (Github)

Scribe is a server for aggregating log data streamed in real time from a large number of servers. It is designed to be scalable, extensible without client-side modification, and robust to failure of the network or any specific machine.”

facebook_scribe_diagramScribe servers are arranged in a directed graph, with each server knowing only about the next server in the graph. This network topology allows for adding extra layers of fan-in as a system grows, and batching messages before sending them between datacenters, without having any code that explicitly needs to understand datacenter topology, only a simple configuration.
Scribe was designed to consider reliability but to not require heavyweight protocols and expansive disk usage.

Facebook McDipper

McDipper is a highly performant flash-based cache server that is Memcache protocol compatible.”

Facebook heavily use Memcached in early stages and when RAM’s cost became too expensive had to find a different solution. This is why McDipper was born. It is not different then other projects like Fatcache or Edis. As many DBMS did, they try to mimic an in-memory caching engine in order to extend available space. SSD are fast enough to have no problem about speed loss.

Facebook Haystack

“The new photo infrastructure merges the photo serving tier and storage tier into one physical tier. It implements a HTTP based photo server which stores photos in a generic object store called Haystack.”

A system specifically designed to serve static content using HTTP as fast as possible. Haystack can be broken down into these functional layers –

  • HTTP server
  • Photo Store
  • Haystack Object Store
  • Filesystem
  • Storage

I talked about it in a previous post.

Facebook Memcached (Github)

“Here at Facebook, we’re likely the world’s largest user of memcached. We use memcached to alleviate database load. memcached is already fast, but we need it to be faster and more efficient than most installations.”

Twitter Storm (Github)

Storm is a distributed realtime computation system. Similar to how Hadoop provides a set of general primitives for doing batch processing, Storm provides a set of general primitives for doing realtime computation.”

twitter_storm_topologyThere are just three abstractions in Storm: spouts, bolts, and topologies.

A spout is a source of streams in a computation. Typically a spout reads from a queueing broker such as Kestrel, RabbitMQ, or Kafka, but a spout can also generate its own stream or read from somewhere.

A bolt processes any number of input streams and produces any number of new output streams. Most of the logic of a computation goes into bolts, such as functions, filters, streaming joins, streaming aggregations, talking to databases, and so on.

A topology is a network of spouts and bolts, with each edge in the network representing a bolt subscribing to the output stream of some other spout or bolt.

Twitter Snowflakes (Github)

Snowflake is a network service for generating unique ID numbers at high scale with some simple guarantees.”

Twitter needed something that could generate tens of thousands of ids per second in a highly available manner. This naturally led us to choose an uncoordinated approach.

These ids need to be roughly sortable, meaning that if tweets A and B are posted around the same time, they should have ids in close proximity to one another since this is how we and most Twitter clients sort tweets. Additionally, these numbers have to fit into 64 bits.

To generate them in an uncoordinated manner, we settled on a composition of: timestamp, worker number and sequence number. Sequence numbers are per-thread and worker numbers are chosen at startup via zookeeper (though that’s overridable via a config file).

Twitter Fatcache (Github)

Fatcache is memcache on SSD. Think of fatcache as a cache for your big data.”

SSD-backed memory presents a viable alternative for applications with large workloads that need to maintain high hit rate for high performance.

Like Facebook McDipper, Fatcache try to overcome memory limitations. It maintains an in-memory index for all data stored on disk. An in-memory index serves two purposes: cheap object existence checks and on-disk object address storage.

To minimize the number of small, random writes, fatcache treats the SSD as a log-structured object store. All writes are aggregated in memory and written to the end of the circular log in batches – usually multiples of 1 MB.

Twitter FlockDB (Github)

FlockDB is a database that stores graph data, but it isn’t a database optimized for graph-traversal operations. Instead, it’s optimized for very large adjacency lists, fast reads and writes, and page-able set arithmetic queries.”

flockdb_diagramIt is a distributed graph database for storing adjacency lists, with goals of supporting:

  • a high rate of add/update/remove operations
  • potientially complex set arithmetic queries
  • paging through query result sets containing millions of entries
  • ability to “archive” and later restore archived edges
  • horizontal scaling including replication
  • online data migration

Non-goals include: multi-hop queries (or graph-walking queries), automatic shard migrations

FlockDB is much simpler than other graph databases such as Neo4j because it tries to solve fewer problems. It scales horizontally and is designed for on-line, low-latency, high throughput environments such as web-sites.

twemcache_logoTwitter Twemcache (Github)

“We built Twemcache because we needed a more robust and manageable version of Memcached, suitable for our large-scale production environment.”

Apache Kafka (Github)

Kafka is publish-subscribe messaging rethought as a distributed commit log.”

apache_kafka_diagramIt is designed to support the following

  • Persistent messaging with O(1) disk structures that provide constant time performance even with many TB of stored messages.
  • High-throughput: even with very modest hardware Kafka can support hundreds of thousands of messages per second.
  • Explicit support for partitioning messages over Kafka servers and distributing consumption over a cluster of consumer machines while maintaining per-partition ordering semantics.
  • Support for parallel data load into Hadoop.

Kafka is aimed at providing a publish-subscribe solution that can handle all activity stream data and processing on a consumer-scale web site.

apache_gora_logoApache Gora (Github)

Gora is an open source framework provides an in-memory data model and persistence for big data. Gora supports persisting to column stores, key value stores, document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce support.”

Apache Mesos (Github)

Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications, or frameworks. It can run Hadoop, MPI, Hypertable, Spark”

apache_mesos_structureMain features are

  • Fault-tolerant replicated master using ZooKeeper.
  • Scalability to 10,000s of nodes using fast, event-driven C++ implementation.
  • Isolation between tasks with Linux Containers.
  • Multi-resource scheduling (memory and CPU aware).
  • Efficient application-controlled scheduling mechanism.
  • Java, Python and C++ APIs for developing new parallel applications.
  • Web UI for viewing cluster state.

apache_spark_logoApache Spark (Github)

Spark is an open source cluster computing system that aims to make data analytics fast  both fast to run and fast to write. To run programs faster, Spark provides primitives for in-memory cluster computing: your job can load data into memory and query it repeatedly much more quickly than with disk-based systems like Hadoop MapReduce.”

Spark was initially developed for two applications where keeping data in memory helps: iterative algorithms, which are common in machine learning, and interactive data mining. In both cases, Spark can run up to100x faster than Hadoop MapReduce. However, you can use Spark for general data processing too.

Shark: Hive on Spark (Github)

Shark is a large-scale data warehouse system for Spark designed to be compatible with Apache Hive. It can execute Hive QL queries up to 100 times faster than Hive without any modification to the existing data or queries. Shark supports Hive’s query language, metastore, serialization formats, and user-defined functions, providing seamless integration with existing Hive deployments and a familiar, more powerful option for new ones.”

shark_spark_diagramShark is built on top of Spark, a data-parallel execution engine that is fast and fault-tolerant. Even if data are on disk, Shark can be noticeably faster than Hive because of the fast execution engine. It avoids the high task launching overhead of Hadoop MapReduce and does not require materializing intermediate data between stages on disk. Thanks to this fast engine, Shark can answer queries in sub-second latency.

fluentd_logoFluentd (Github)

Fluentd is an open-source tool to collect events and logs. 150+ plugins instantly enables you to store the massive data for Log Search, Big Data Analytics, and Archiving (MongoDB, S3, Hadoop)”

fluentd_diagram

The fundamental problem with logs is that they are usually stored in files although they are best represented as streams (by Adam Wiggins, CTO at Heroku). Traditionally, they have been dumped into text-based files and collected by rsync in hourly or daily fashion. With today’s web/mobile applications, this creates two problems.

  1. Need ad-hoc parsing: The text-based logs have their own format, and analytics engineer need to write a dedicated parser for each format. But that’s probably not the best use of your time. You should be analyzing data to make better business decisions instead of writing one parser after another.
  2. Lacks Freshness: The logs lag. The realtime analysis of user behavior makes feature iterations a lot faster. A nimbler A/B testing will help you differentiate your service from competitors.

This is where Fluentd comes in. We believe Fluentd solves all issues of scalable log collection by getting rid of files and turning logs into true semi-structured data streams.

kestrel_logo

Kestrel (Github)

“Kestrel is a very simple message queue that runs on the JVM. It supports multiple protocols: memcache: the memcache protocol, with some extensions, thrift: Apache Thrift-based RPC, text: a simple text-based protocol”

A cluster of kestrel servers is like a memcache cluster: the servers don’t know about each other, and don’t do any cross-communication, so you can add as many as you like. The simplest clients have a list of all servers in the cluster, and pick one at random for each operation. In this way, each queue appears to be spread out across every server, with items in a loose ordering. More advanced clients can find kestrel servers via ZooKeeper.

impala_logoCloudera Impala (Github)

Impala is an open source Massively Parallel Processing (MPP) query engine that runs natively on Apache Hadoop”

With Impala, you can query data, whether stored in HDFS or Apache HBase – including SELECT, JOIN, and aggregate functions – in real time.

To avoid latency, Impala circumvents MapReduce to directly access the data through a specialized distributed query engine that is very similar to those found in commercial parallel RDBMSs. The result is order-of-magnitude faster performance than Hive, depending on the type of query and configuration. (See FAQ below for more details.) Note that this performance improvement has been confirmed by several large companies that have tested Impala on real-world workloads for several months now.

Structure diagram:

cloudera_impala_diagram

HadoopDB

HadoopDB is an Architectural Hybrid of MapReduce and DBMS Technologies for Analytical Workloads”

HadoopDB is:

  • A hybrid of DBMS and MapReduce technologies that targets analytical workloads
  • Designed to run on a shared-nothing cluster of commodity machines, or in the cloud
  • An attempt to fill the gap in the market for a free and open source parallel DBMS
  • Much more scalable than currently available parallel database systems and DBMS/MapReduce hybrid systems.
  • As scalable as Hadoop, while achieving superior performance on structured data analysis workloads

hypertable_logoHypertable

Hypertable is an open source database system inspired by publications on the design of Google’s BigTable. The project is based on experience of engineers who were solving large-scale data-intensive tasks for many years.”

This project is for the design and implementation of a high performance, scalable, distributed storage and processing system for structured and unstructured data. It is designed to manage the storage and processing of information on a large cluster of commodity servers, providing resilience to machine and component failures. Data is represented in the system as a multi-dimensional table of information. The data in a table can be transformed and organized at high speed by performing computations in parallel, pushing them to where the data is physically stored.

Sources: