How it works: DataSift

datasift_logo

DataSift, as they said on their home page, “aggregate, process and deliver social data“. It is one of the oldest Twitter certified partners and offers data coming from almost every existing social network. I use it everyday to “listen” the net and find data I need for my analysis.

It’s impressive to watch how fast they collect data from external sources and deliver it to your chosen destination. When I tweet, a couple of minutes ago a JSON file land my S3 bucket.

To create an Internet scale filtering is not easy. Their infrastructure is really complex and optimized. This is a 2011 diagram of their workflow.

datasift_infrastructure

Twitter generates more than 500 million tweets per day and is only one of the available resources. The DataSift system performs 250+ million sentiment analysis with sub 100ms latency, and several TB of augmented (includes gender, sentiment, etc) data transits the platform daily. Data Filtering Nodes Can process up to 10,000 unique streams. Can do data-lookup’s on 10,000,000+ username lists in real-time. Links Augmentation Performs 27 million link resolves + lookups plus 15+ million full web page aggregations per day.

C++ is used for the performance-critical components, like the core filtering engine and PHP is for the site, external API server, most of the internal web services, and a custom-built, high performance job queue manager. Java/Scala for batch processing with HBase and MapReduce jobs. Kafka is used as queuing system and Ruby is used for deploys and provisioning. Thrift is widely used.

MySQL (Percona server) on SSD drives is used as primary storage, HBase cluster over more than 30 Hadoop nodes provides a place to store historical data and Memcached and Redis are used for caching.

Here is a schema of the processing unit which build the historical database.

datasift_historical

Message queues are another critical component of the infrastructure. 0mq (custom build from latest alpha branch, with some stability fixes, to use publisher-side filtering), used in different configurations:

  • PUB-SUB for replication / message broadcasting;
  • PUSH-PULL for round-robin workload distribution;
  • REQ-REP for health checks of different components.

Kafka for high-performance persistent queues. In both cases they’re working with the developers and contributing bug reports / traces / fixes / client libraries.

All code is pulled from the repo from Jenkins every 5 mins, automatically tested and verified with several QA tools, packaged as an RPM and moved to the dev package repo. Chef is used to automate deployments and manage configuration. All services emit StatsD events, which are combined with other system-level checks, added to Zenoss and displayed with Graphite.

The biggest challenge IMHO is filtering. Filtering at this scale requires a different approach. They started with work they did at TweetMeme. The core filter engine is in C++ and is called the Pickle Matrix. Over three years they’ve developed a compiler and their own virtual machine. We don’t know what their technology is exactly, but it might be something like Distributed Complex Event Processing with Query Rewriting.

Sources

Almost all content of this post come from the wonderful article “DataSift Architecture: Realtime Datamining At 120,000 Tweets Per Second” posted on HighScalability. Some details also from “Historical Architecture – Data Mining Billions of Tweets” from DataSift blog.

Are three deploy environments enough?

When I used Rails for the first time I was impressed by the use of multiple environments to handle different setup, policies and behavior of an application. A few year ago use of environments wasn’t so common and switching between development, test and production was an innovation for me.

Anyway big projects who use custom frameworks introduced this structure several years before. I had the pleasure to work over a lot of legacy code who implement different environments setup. For example classified ADS channel of Repubblica.it (built by my mentor @FabiolousMate) uses dev, demo and production. Other projects I worked on use staging. After have listened a lot of opinions I asked myself which are the most important environments and if three are enough.

I’m mostly a Ruby developer and I know the Rails ecosystem who uses 3 basic environment.

  • development is used to when you code. Source code is reloaded each time. Log is EXTREMELY verbose. Libraries includes debug and error logging features. Database is full of garbage data.
  • test is for automatic testing. Data is loaded and cleaned automatically every time you run tests. Everything can be mocked (database, APIs, external services, …). Libraries includes testing frameworks and log is just for test output.
  • production is to be safe. Logging is just for errors. Sometimes there is a caching layer. Libraries are loaded once. Data is replicated. Everything is set up to improve both performances and robustness.

These environments are really useful in order to manage application development. Unfortunately are not enough to handle every situation. For example production is not appropriate for testing new feature because of the poor log and the strong optimization (and the precious production data) and is not appropriate as well for demo purpose because has to be used by customers. Development is alike not appropriate to find bottlenecks because of messy data and debug code.

In my experience I usually add three more environment to my application trying to fit every situation. Most of cases these are enough.

  • staging is for deep testing of new features. Production data and development logging and libraries. Enable you to test side effects of your new features in the real world. If an edit works here probably works also in production
  • demo is for showtime. Production environment with sandboxes features and demo data. You can open this environment to anyone and he can play whatever he wants without dangers.
  • profile is to find bottlenecks. Development environment with specific library to profile and fine tuning of you process. You can adjust data to stress your system without worry about coherence of data.

This is IMHO a good setup of you deploy environments. Depending on projects some of these aren’t useful but in a large project each one can save you life.

Performance driven data modeling using MongoDB – Part 1

This week my problem was to modelize a semi-relational structure. We decided to use MongoDB because (someone says) is fast, scalable and schema-less. Unfortunately I’m not a good MongoDB designer yet. Data modeling was mostly easy because I can copy the relational part of the schema. The biggest data modeling problem is about m-to-m relations. How to decide if embed m-to-m relations keys into documents or not? To make the right choice I decided to test different design solutions.

Foreign keys emdedded:

class A
  include Mongoid::Document
  field :name, type: String
  has_and_belongs_to_many :bs
end

class B
  include Mongoid::Document
  field :name, type: String
  has_and_belongs_to_many :as
end

def direct(small, large)
  small.times do |i|
    a = A.new
    a.name = "A#{i}"
    large.times do |j|
      b = B.create(name: "B#{j}")
      a.bs << b
    end
    a.save
  end
end

Foreign keys into an external document:

class C
  include Mongoid::Document
  field :name, type: String
  has_many :rels
end

class D
  include Mongoid::Document
  field :name, type: String
  has_many :rels
end

class Rel
  include Mongoid::Document
  belongs_to :c
  belongs_to :d
end

def with_rel(small, large)
  small.times do |i|
    c = C.new
    c.name = "C#{i}"
    large.times do |j|
      d = D.create(name: "D#{j}")
      Rel.create(c: c, d: d)
    end
  end
end

I tested insert time for a database with 10 objects related to a growing number of other objects each iteration (from 100 to 5000).

def measure(message, &block)
  cleanup
  start = Time.now.to_f
  yield
  finish = (Time.now.to_f - start).to_f
  puts "#{message}: #{"%0.3f" % finish}"
end

(1..50).each do |e|
  measure "10 A embeds #{e*100} B each one" do
    direct(10, e*100)
  end
  measure "10 A linked to #{e*100} B with extenal relation" do
    with_rel(10, e*100)
  end
end

Results are really interesting:

Number of relation for each element Insert time embedding relation key Insert time with external relation
100 0.693 1.021
200 1.435 2.006
300 1.959 2.720
400 2.711 3.587
500 3.477 4.531
600 4.295 5.414
700 5.106 6.369
800 5.985 7.305
900 6.941 8.221
1000 7.822 8.970
1200 12.350 13.946
1400 14.820 15.532
1600 15.806 17.344
1800 18.722 18.372
2000 21.552 20.732
3000 36.151 29.818
4000 56.060 38.154
5000 82.996 47.658

As you can see when number of embedded relation keys go over 2000, the time grow geometrically.

I know, this is not a real case test so we can’t say that using embedded relation is worse than using external. Anyway is really interesting observe that limits are always the same in both SQL and NoSQL world: when you hit a memory limit and need to go to disk, performances degrade.

In coming post I’m going to analyze reading performances.

Persistence in the Amazon AWS Cloud

I’m developing a new project which require a data structure not yet well defined. We are evaluating different solutions for persistence and Amazon AWS is one of the partners we are considering. I’m trying to recap solutions which it offers.

Amazon Relational Database Service (RDS)

Relational Database similar to MySQL and PostgreSQL. It offers 3 different engines (with different costs) and each one should be fully compatible with the protocol of the corresponding DBMS: Oracle, MySQL and Microsoft SQL Server.

You can use it with ActiveRecord (with MySQL adapter) on Rails or Sinatra easily. Simply replace you database.yml with given parameters:

production:
  adapter: mysql2
  host: myprojectname.somestuff.amazonaws.com
  database: myprojectname
  username: myusername
  password: mypass

Amazon DynamoDB

Key/Value Store similar to Riak and Cassandra. It is still in beta but Amazon released a paper (PDF) about its structure a few year ago which inspire many other products.

You can access it using Ruby and aws-sdk gem. I’m not an expert but this code should works for basic interaction (not tested yet).

require "aws"

# set connection parameters
AWS.config(
  access_key_id: ENV["AWS_KEY"],
  secret_access_key: ENV["AWS_SECRET"]
)

# open connection to DB
DB = AWS::DynamoDB.new

# create a table
TABLES["your_table_name"] = DB.tables["your_table_name"].load_schema
  rescue AWS::DynamoDB::Errors::ResourceNotFoundException
    table = DB.tables.create("your_table_name", 10, 5, schema)
    # it takes time to be created
    sleep 1 while table.status == :creating
    TABLES["your_table_name"] = table.load_schema
  end
end

After that you can interact with table:

# Create a new element
record = TABLES["your_table_name"].items.create(id: "andrea-mostosi")
record.attributes.add(name: ["Andrea"])
record.attributes.add(surname: ["Mostosi"])

# Search for value "andrea-mostosi" inside table
TABLES["your_table_name"].items.query(
  hash_value: "andrea-mostosi",
)

Amazon Redshift

Relational DBMS based on PostgreSQL structured for a petabyte-scale amount of data (for data-warehousing). It was released to public in the last days and SDK isn’t well documented yet. Seems to be very interesting for big-data processing on a relational structure.

Amazon ElastiCache

In-RAM caching system based on Memcached protocol. It should be used to cache any kind of object like Memcached. Is different (and worse IMHO) than Redis because doesn’t offer persistence. I prefer a different kind of caching but may be a good choice if your application already use Memcached.

Amazon SimpleDB

RESTFul Key/Value Store using only strings as data types. You can use it with any REST ORM like ActiveResource, dm-rest-adapter or, my favorite, Her (read previous article). If you prefer you can use with any HTTP client like Faraday or HTTParty.

[UPDATE 2013-02-19] SimpleDB isn’t listed into “Database” menu anymore and it seems no longer available for activation.

Other DBMS on markerplace

Many companies offer support to theirs software deployed on EC2 instance. Engines include MongoDB, CouchDB, MySQL, PostgreSQL, Couchbase Server, DB2, Riak, Memcache and Redis.

Sources