Realtime Data Science

How to deal with realtime streams

Micha Gorelick

@mynameisfiber
micha@bit.ly
github.com/mynameisfiber


w/ Mike Dewar && Dan Frank

Git Repository



Rendered Slides

(those are links!)

Outline

Note:

each section will have ~5 minutes of talking and then a lot of experimenting!


you have all my slides and all the code! i go fast but you don't need to scramble to take notes

Audience

YOU! So ask questions!

Why streams?

THERE IS A LOT OF DATA!

We want results NOW

not when hadoop can get around to it

Look at our most recent realtime awesomeness from bitly: rt.ly

Streams move fast

we need some infrastructure to support them

enter: SimpleHttp

SimpleHttp

  1. Made by bitly!
  2. Open Source, http://github.com/bitly/simplehttp/
  3. Provides simple bindings to various utilities including:
  4. Everything is done with HTTP GET/POST requests which means there are no compatibility issues

We will focus on PUBSUBS and QUEUES

WTF is a plurbsurb pubsub?

pubsub ~= Town Crier

Takes information from the "king" and distributes it out to the "villagers"

Data comes in from our data sources and the pubsub publishes the data to all of the subscribers
note: there can be more than one data source all publishing to the same pubsub

Looks perfect!
No further work needed... right?

What if data is coming in faster than we can process?

The kernel cache will fill up, the connection will timeout and when we finally can reconnect will will have lost messages! (ie: things go bad)

unacceptable!

enter: simplequeue

simple, network based queue system

queue ~= secretary

data processors are busy things! we need someone to help them manage their tasks

queues store messages on a stack until requested

If a processor is VERY slow, the queue will be
storing many messages (and using a lot of ram)

The Tools

pubsub

simplequeue

ps_to_http


also...

curl

Start the pubsub

        $ pubsub --port=8080
registering callback for path "/pub*"
registering callback for path "/sub*"
registering callback for path "/stats*"
registering callback for path "/clients"
listening on 0.0.0.0:8080
    
        $ curl "http://localhost:8080/stats"
Active connections: 0
Total connections: 0
Messages received: 0
Messages sent: 0
Kicked clients: 0
    

Start a queue

        $ simplequeue --port=8081
Version: 1.3, http://code.google.com/p/simplehttp/
use --help for options
registering callback for path "/put*"
registering callback for path "/get*"
registering callback for path "/mget*"
registering callback for path "/mput*"
registering callback for path "/dump*"
registering callback for path "/stats*"
listening on 0.0.0.0:8081
    
        $ curl localhost:8081/stats
puts:0
gets:0
depth:0
depth_high_water:0
bytes:0
overflow:0
    

Connect the pubsub to the queue

        $ ps_to_http --destination-get-url="http://127.0.0.1:8081/put?data=%s" \
        --pubsub-url="http://127.0.0.1:8080/sub?multipart=0"
AUTODETECTING ENCODING FOR http://127.0.0.1:8080/sub?multipart=0
CONNECTING TO SOURCE http://127.0.0.1:8080/sub?multipart=0
    

Congrats! You setup a data pipeline

(with no data)

Adding data is easy!

        $ echo "H3110" | curl -d@- "http://localhost:8080/pub"
Published 1 messages to 1 clients.
    

And Fetching data is also easy!

        $ curl -s "http://localhost:8081/stats" # get the stats for our simplequeue
puts:1
gets:0
depth:1
depth_high_water:1
bytes:5
overflow:0
$ curl -s "http://localhost:8081/get" # fetch the data
H3110$ curl -s "http://localhost:8081/stats" # look at the stats again
puts:1
gets:1
depth:0
depth_high_water:1
bytes:0
overflow:0
    

Now You!

You'll be done when you:

  1. Understand the flow of data through the pubsub/simplequeue chain
  2. Thought this is AWESOME at least 3 times

So... I have to do everything with curl?

or: why python is so amazing

enter: pysimplehttp

(part of bitly's simplehttp project)

pysimplehttp.BaseReader ~= translator


translates the low level simplehttp protocol into something more useful


you simply define tasks on data and the BaseReader will perform all tasks on all data in the simplequeue!


NOTE: BaseReader operates on JSON blobs

pysimplehttp.BaseReader

The important bits:


BaseReader(
    simplequeue_address,    # location of data ("http://localhost:8081")
    all_tasks,              # dict of functions, all of which get all data
    queuename=None,         # optional name for the queue
    max_tries=5,            # how many times to retry on piece of data
    requeue_delay=90,       # time data waits after being requeued
    mget_items=0,           # batch size to fetch from simplequeue
    preprocess_method=None, # function to preprocess data
    validate_method=None,   # function that decides to process a given data
    requeue_giveup=None,    # function to run on data we give up on
)
Read the docs! There are a bunch of other cool options

Simple Example


# examples/01_simple_reader.py
from pysimplehttp.BaseReader import BaseReader
from pysimplehttp.http import pubsub_write

def print_message(data):
    print data
    return True # we must return True to say our tasked finished correctly

if __name__ == "__main__":
    tasks = {"print" : print_message} # tasks are named for logging
    reader = BaseReader("http://localhost:8081", tasks, queuename="message_printer")

    # There is also easy utilities to write to a pubsub!
    pubsub_write("http://localhost:8080", {'msg' : 'H4110!'})

    reader.run()
        

Simple Example (cont)


$ python examples/01_simple_reader.py
{u'msg': u'H3110'}
        

Now You!

You'll be done when you:

  1. Experimented with and understand the message guarantees with multiple pubsub subscribers or data consumers
  2. Extra: can you say anything about the statistics of the numbers through the reader?

Great... what can I do with streams?

MATH!

FAST MATH!

Online Algorithms

Examples

Let's start easy...

Calculate the mean/variance for a stream of random numbers

First, the algorithm!

Knuth's Algorithm

note: this algorithm can even be used in parallel over multiple data consumers!

What is this \( M_{2,n} \) variable?

This looks amazing!

USE IN ALL PLACES!

Critical question: will it blend converge?

Online algorithms are generally more prone to numerical error, which can yield incorrect answers!

(luckily, this one is quite stable!)

slight tangent: numerical stability

Algorithms generally go unstable when:

Rules of Thumb

Now You!

You'll be done when you:

  1. Know what the secret mean/variance of my stream is!
  2. Feel comfortable with your implementation of the algorithm
  3. Understand possible stability concerns with the algorithm

Now I can do everything... right?

...almost.

What happens if:

The algorithm is very complex and you need to have multiple boxes with multiple readers working on it?

We need to share data between readers!

enter: redis

the great dictionary in the sky

about redis:

quick over view of my favorite important data structures in redis

set ~= python set

unique list of elements with FAST add/remove/contains

(useful for unique-ifying a set over multiple data processors)

hashes ~= python dict

low overhead and fast element retrieval

(useful for storing object properties over multiple data processors)

zset ~= ordered set

sorted set where every element has a numerical score (float)

you can easily get the n-most or n-least elements

(useful for storing distributions and graphs... more on this later)

example:

Let's connect to redis, set key and retrieve it

# examples/02_redis_set.py
import redis
import random

db = redis.StrictRedis(db=1)
db.set("test_value", random.random())

value = db.get("test_value")
print "Random number = ", value
print "Response type = ", type(value)

$ python examples/02_redis_set.py
Random number =  0.358030987168
Response type =  <type 'str'>

example:

Let's connect to redis, set hash and increase values atomically

# examples/03_redis_hash.py
import redis

db = redis.StrictRedis(db=1)

db.hset("micha", "friends", 1)
db.hset("dan", "friends", 9001)
db.hset("mike", "friends", 24)

for i in range(100):
    db.hincrby("micha", "friends", 10)
    
print "dan has %s friends" % db.hget("dan", "friends")
print "mike has %s friends" % db.hget("mike", "friends")
print "micha has %s friends" % db.hget("micha", "friends")
$ python 03_redis_hash.py 
dan has 9001 friends
mike has 24 friends
micha has 1001 friends

aside: atomic-ity

What happens if I update a value in redis like this with multiple processes?

import redis

db = redis.StrictRedis(db=1)

value = db.get("important_number")
value += 1  # line (A)
db.set("important_number", value)

print "The value is currently %s" % value

We start with a value of n=0 in the db


t  | Process 1 (Mr. T)
------------------------
0  |
1  | value = db.get("n")
2  | 
3  | value += 1 # line (A)
4  | 
5  | db.set("n", value)
6  |
t  | Process 2 (Bozo)
------------------------
0  |
1  | value = db.get("n")
2  | 
3  | value += 1 # line (A)
4  |
5  |
6  | db.set("n", value)

Now the value is n=1 instead of n=2!


this is called a race condition

atomic operations FTW!

Read more here and definitely look at the *incrby operations in redis

Now You!

You'll be done when you:

  1. You have a sense of which datatype to use for which problem
  2. You understand the data-flow with multiple queuereaders
  3. Extra: understand transactions and atomicity of a redis db
  4. Extra: understand the benefits of a connection pipeline

So... after this I can only deal with random numbers?

data can be any json blob

Let's do something interesting!

1usagov data source of clicks on usa.gov links through bitly

First, let's check out the data source!

http://developer.usa.gov/1usagov is the URL of a pubsub!


$ curl -s http://developer.usa.gov/1usagov | head -n 1
{ "a": "Mozilla\/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident\/5.0)", "c": "US", "nk": 0, "tz": "America\/Chicago", "gr": "MN", "g": "lyz2Nv", "h": "lyz2Nv", "l": "bitly", "al": "en-US", "hh": "1.usa.gov", "r": "http:\/\/www.usa.gov\/index.shtml", "u": "http:\/\/answers.usa.gov\/system\/selfservice.controller?CONFIGURATION=1000&PARTITION_ID=1&CMD=VIEW_ARTICLE&USERTYPE=1&LANGUAGE=en&COUNTRY=US&ARTICLE_ID=10564", "t": 1346619003, "hc": 1308822087, "cy": "Minneapolis", "ll": [ 44.982300, -93.306297 ] }
        
{
    "a": "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)",
    "c": "US",
    "nk": 0,
    "tz": "America/Chicago",
    "gr": "MN",
    "g": "lyz2Nv",
    "h": "lyz2Nv",
    "l": "bitly",
    "al": "en-US",
    "hh": "1.usa.gov",
    "r": "http://www.usa.gov/index.shtml",
    "u": "http://answers.usa.gov/....",
    "t": 1346619003,
    "hc": 1308822087,
    "cy": "Minneapolis",
    "ll": [
        44.9823,
        -93.306297
    ]
}

Go here for field descriptions

So much possibility!

Let's try making a graph

What do you mean by graph?

We will have nodes, representing some piece of data, and connect them together in some meaningful way

How do we represent a graph?

Adjacency list


Node elements are stored in pairs with a weighting between them


The weight represents how "close" the pair are

becomes

A, B, 5
A, C, 4
A, D, 9
C, D, 7
D, E, 9
                

(graph shamelessly taken from d3)

How will we store this?

redis zsets!

(you could also use a hash, but zset make it easier
to implement more complex algorithms)

example:

to store the DIRECTED graph node connections (A, B, 5) and (A, C, 1) we do:

# examples/04_redis_zset_graph.py
import redis

db = redis.StrictRedis(db=1)

db.zadd("A", 5.0, "B")
db.zadd("A", 1.0, "C")

# get all elements in the set with scores [0, ∞)
# returns a list of tuples in the form: 
#   [ (property, score), (property, score), ... ]
node_a = db.zrange("A", 0, -1, withscores=True)
print "Current graph:"
for link in node_a:
    print "A, %s, %f" % (link[0], link[1])
        

$ python examples/04_redis_zset_graph.py
A, C, 1.000000
A, B, 5.000000

we can also do some other cool stuff with zset

look into zincrby, zrank and zrangebyscore

Now You!

You'll be done when you:

  1. Get a sense of how to build graphs in redis and what the limitations are
  2. Can think of some interesting analysis to run on the resulting graph and understand how to fetch the data to do so
  3. Think: wow, realtime streams really are awesome!

Bonus: try scripts/redis_graph_to_networkx.py to visualize your graph!

And that's all I know

Love,

Micha

Intentionally left blank

(or was it?)

/