Realtime Data Science
How to deal with realtime streams
Micha Gorelick
Outline
Infrastructure + SimpleHTTP
Python + pySimpleHTTP
Redis + Redis
Graphs + Awesome
(there will be a 15min break ~115min in, checkout the schedule)
Note:
each section will have ~5 minutes of talking and then a lot of experimenting!
you have all my slides and all the code! i go fast but you don't need to scramble to take notes
Audience
YOU! So ask questions!
We want results NOW
not when hadoop can get around to it
Look at our most recent realtime awesomeness from bitly: rt.ly
Streams move fast
we need some infrastructure to support them
SimpleHttp
- Made by bitly!
- Open Source, http://github.com/bitly/simplehttp/
- Provides simple bindings to various utilities including:
- Everything is done with HTTP GET/POST requests which means there are no compatibility issues
We will focus on PUBSUBS and QUEUES
WTF is a plurbsurb pubsub?
pubsub ~= Town Crier
Takes information from the "king" and distributes it out to the "villagers"
Data comes in from our data sources and the pubsub publishes the data to all of the subscribers
note: there can be more than one data source all publishing to the same pubsub
Looks perfect!
No further work needed... right?
What if data is coming in faster than we can process?
The kernel cache will fill up, the connection will timeout and when we finally can reconnect will will have lost messages! (ie: things go bad)
enter: simplequeue
simple, network based queue system
queue ~= secretary
data processors are busy things! we need someone to help them manage their tasks
queues store messages on a stack until requested
If a processor is VERY slow, the queue will be
storing many messages (and using a lot of ram)
pubsub
- simple HTTP interface to a pubsub
/pub
to publish a message, /sub
to subscribe
simplequeue
- simple HTTP interface to a network queue
/put
to put a message on the queue, /get
to get the top item
ps_to_http
- simple pubsub subscriber that pushes messages to a queue
- it's the glue between a
pubsub
and a simplequeue
also...
curl
- simple way to fetch a URL
- similar to simply visiting the given url in the browser, but in the command line!
- try running:
curl "http://google.com/"
Start the pubsub
$ pubsub --port=8080
registering callback for path "/pub*"
registering callback for path "/sub*"
registering callback for path "/stats*"
registering callback for path "/clients"
listening on 0.0.0.0:8080
$ curl "http://localhost:8080/stats"
Active connections: 0
Total connections: 0
Messages received: 0
Messages sent: 0
Kicked clients: 0
Start a queue
$ simplequeue --port=8081
Version: 1.3, http://code.google.com/p/simplehttp/
use --help for options
registering callback for path "/put*"
registering callback for path "/get*"
registering callback for path "/mget*"
registering callback for path "/mput*"
registering callback for path "/dump*"
registering callback for path "/stats*"
listening on 0.0.0.0:8081
$ curl localhost:8081/stats
puts:0
gets:0
depth:0
depth_high_water:0
bytes:0
overflow:0
Connect the pubsub to the queue
$ ps_to_http --destination-get-url="http://127.0.0.1:8081/put?data=%s" \
--pubsub-url="http://127.0.0.1:8080/sub?multipart=0"
AUTODETECTING ENCODING FOR http://127.0.0.1:8080/sub?multipart=0
CONNECTING TO SOURCE http://127.0.0.1:8080/sub?multipart=0
Congrats! You setup a data pipeline
(with no data)
Adding data is easy!
$ echo "H3110" | curl -d@- "http://localhost:8080/pub"
Published 1 messages to 1 clients.
And Fetching data is also easy!
$ curl -s "http://localhost:8081/stats" # get the stats for our simplequeue
puts:1
gets:0
depth:1
depth_high_water:1
bytes:5
overflow:0
$ curl -s "http://localhost:8081/get" # fetch the data
H3110$ curl -s "http://localhost:8081/stats" # look at the stats again
puts:1
gets:1
depth:0
depth_high_water:1
bytes:0
overflow:0
Now You!
Create a pubsub/simplequeue chain, add data to it and retrieve it.
What happens if you have multiple things reading from the queue? Who gets what data?
Try using scripts/count_to_pubsub.sh to help play around with data-flow
15min
You'll be done when you:
- Understand the flow of data through the pubsub/simplequeue chain
- Thought this is AWESOME at least 3 times
So... I have to do everything with curl?
or: why python is so amazing
enter: pysimplehttp
(part of bitly's simplehttp project)
pysimplehttp.BaseReader ~= translator
translates the low level simplehttp protocol into something more useful
you simply define tasks on data and the BaseReader will perform all tasks on all data in the simplequeue!
NOTE: BaseReader operates on JSON blobs
pysimplehttp.BaseReader
The important bits:
BaseReader(
simplequeue_address, # location of data ("http://localhost:8081")
all_tasks, # dict of functions, all of which get all data
queuename=None, # optional name for the queue
max_tries=5, # how many times to retry on piece of data
requeue_delay=90, # time data waits after being requeued
mget_items=0, # batch size to fetch from simplequeue
preprocess_method=None, # function to preprocess data
validate_method=None, # function that decides to process a given data
requeue_giveup=None, # function to run on data we give up on
)
Read the docs! There are a bunch of other cool options
Simple Example
# examples/01_simple_reader.py
from pysimplehttp.BaseReader import BaseReader
from pysimplehttp.http import pubsub_write
def print_message(data):
print data
return True # we must return True to say our tasked finished correctly
if __name__ == "__main__":
tasks = {"print" : print_message} # tasks are named for logging
reader = BaseReader("http://localhost:8081", tasks, queuename="message_printer")
# There is also easy utilities to write to a pubsub!
pubsub_write("http://localhost:8080", {'msg' : 'H4110!'})
reader.run()
Simple Example (cont)
$ python examples/01_simple_reader.py
{u'msg': u'H3110'}
Now You!
Connect to a simplequeue that outputs random numbers, \( x \in [0,1) \), using BaseReader. (checkout scripts/random_to_pubsub.py)
Fail on a message every once in a while... what happens to it?
See what happens when multiple readers connect
Extra: With preprocess_method
, only accept numbers in (0.5, 1.0).
Extra: With validate_method
, only consume 25% of the stream.
15min
You'll be done when you:
- Experimented with and understand the message guarantees with multiple pubsub subscribers or data consumers
- Extra: can you say anything about the statistics of the numbers through the reader?
Great... what can I do with streams?
Online Algorithms
- Algorithms that update per datapoint
- Perfect for streams: algorithms are single pass and don't require batch operations
Examples
Let's start easy...
Calculate the mean/variance for a stream of random numbers
Knuth's Algorithm
- define: \( x_n = \) observation @ time n
- mean = \( \bar{x}_n = \bar{x}_{n-1} + \frac{x_n - \bar{x}_{n-1}}{n} \)
- aux variable = \( M_{2,n} = M_{2,n-1} + \left( x_n - \bar{x}_n \right)^2 \)
- sample variance = \( \frac{M_{2,n}}{n-1} \)
- population variance = \( \frac{M_{2,n}}{n} \)
note: this algorithm can even be used in parallel over multiple data consumers!
What is this \( M_{2,n} \) variable?
- Sum of the powers of the difference from the mean, related to central moments
- \( M_{k,n} = \sum \left( x_n - \bar{x}_n \right)^k \)
- Higher order M's can be used to calculate higher order moments of your distribution
- ie: To get kurtosis, calculate: \( \{ M_{2,n}, M_{3,n}, M_{4,n} \} \)
This looks amazing!
Critical question: will it blend converge?
Online algorithms are generally more prone to numerical error, which can yield incorrect answers!
(luckily, this one is quite stable!)
slight tangent: numerical stability
Algorithms generally go unstable when:
- Two very similar numbers are subtracted from each-other (floating point error)
- High precision is necessary in your calculation (ie: irrational numbers or significance in very small decimals)
- The algorithm itself isn't stable (ie: \( \sum^{\infty}x^2 \))
Rules of Thumb
- Try to keep your numbers as close to 1 as possible (ie: normalize!)
- Simple algebraic manipulations may be enough to reformulate an algorithm to avoid floating point error (see wiki/Floating point error#Accuracy Problems)
log
space is a nice place to be
- Sometimes you just have to find a new algorithm or reformulate the problem!
Now You!
Connect to a simplequeue of normally distributed random numbers and calculate the mean/variance of the numbers (output the result ever N samples)
Try changing the actual mean/variance of the generated numbers... how does the algorithm perform?
What happens when you try drawing random numbers from other distributions? (numpy.random)
30min
You'll be done when you:
- Know what the secret mean/variance of my stream is!
- Feel comfortable with your implementation of the algorithm
- Understand possible stability concerns with the algorithm
Now I can do everything... right?
What happens if:
The algorithm is very complex and you need to have multiple boxes with multiple readers working on it?
We need to share data between readers!
enter: redis
the great dictionary in the sky
about redis:
in memory, network accessible dictionary
lightweight
many different datatypes (ref)
atomic operations (ref)
data expiration time (accurate to the millisecond, ref)
easy to use! (just type redis-server
to start
quick over view of my favorite important data structures in redis
set
~= python set
unique list of elements with FAST add/remove/contains
(useful for unique-ifying a set over multiple data processors)
hashes
~= python dict
low overhead and fast element retrieval
(useful for storing object properties over multiple data processors)
zset
~= ordered set
sorted set where every element has a numerical score (float)
you can easily get the n-most or n-least elements
(useful for storing distributions and graphs... more on this later)
example:
Let's connect to redis, set key and retrieve it
# examples/02_redis_set.py
import redis
import random
db = redis.StrictRedis(db=1)
db.set("test_value", random.random())
value = db.get("test_value")
print "Random number = ", value
print "Response type = ", type(value)
$ python examples/02_redis_set.py
Random number = 0.358030987168
Response type = <type 'str'>
example:
Let's connect to redis, set hash and increase values atomically
# examples/03_redis_hash.py
import redis
db = redis.StrictRedis(db=1)
db.hset("micha", "friends", 1)
db.hset("dan", "friends", 9001)
db.hset("mike", "friends", 24)
for i in range(100):
db.hincrby("micha", "friends", 10)
print "dan has %s friends" % db.hget("dan", "friends")
print "mike has %s friends" % db.hget("mike", "friends")
print "micha has %s friends" % db.hget("micha", "friends")
$ python 03_redis_hash.py
dan has 9001 friends
mike has 24 friends
micha has 1001 friends
aside: atomic-ity
What happens if I update a value in redis like this with multiple processes?
import redis
db = redis.StrictRedis(db=1)
value = db.get("important_number")
value += 1 # line (A)
db.set("important_number", value)
print "The value is currently %s" % value
We start with a value of n=0
in the db
t | Process 1 (Mr. T)
------------------------
0 |
1 | value = db.get("n")
2 |
3 | value += 1 # line (A)
4 |
5 | db.set("n", value)
6 |
t | Process 2 (Bozo)
------------------------
0 |
1 | value = db.get("n")
2 |
3 | value += 1 # line (A)
4 |
5 |
6 | db.set("n", value)
Now the value is n=1
instead of n=2
!
this is called a race condition
atomic operations FTW!
Read more here and definitely look at the *incrby operations in redis
Now You!
Create a queuereader that reads random integers from different distributions and stores a histogram of them.
Have multiple queuereaders connect to the same stream and the same redis DB
extra: Make the mean/var algorithm work with redis. How will you keep the readers from clobbering each others data? (hint: atomic transactions)
20min
You'll be done when you:
- You have a sense of which datatype to use for which problem
- You understand the data-flow with multiple queuereaders
- Extra: understand transactions and atomicity of a redis db
- Extra: understand the benefits of a connection pipeline
So... after this I can only deal with random numbers?
data can be any json blob
Let's do something interesting!
1usagov data source of clicks on usa.gov links through bitly
First, let's check out the data source!
http://developer.usa.gov/1usagov
is the URL of a pubsub!
$ curl -s http://developer.usa.gov/1usagov | head -n 1
{ "a": "Mozilla\/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident\/5.0)", "c": "US", "nk": 0, "tz": "America\/Chicago", "gr": "MN", "g": "lyz2Nv", "h": "lyz2Nv", "l": "bitly", "al": "en-US", "hh": "1.usa.gov", "r": "http:\/\/www.usa.gov\/index.shtml", "u": "http:\/\/answers.usa.gov\/system\/selfservice.controller?CONFIGURATION=1000&PARTITION_ID=1&CMD=VIEW_ARTICLE&USERTYPE=1&LANGUAGE=en&COUNTRY=US&ARTICLE_ID=10564", "t": 1346619003, "hc": 1308822087, "cy": "Minneapolis", "ll": [ 44.982300, -93.306297 ] }
{
"a": "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)",
"c": "US",
"nk": 0,
"tz": "America/Chicago",
"gr": "MN",
"g": "lyz2Nv",
"h": "lyz2Nv",
"l": "bitly",
"al": "en-US",
"hh": "1.usa.gov",
"r": "http://www.usa.gov/index.shtml",
"u": "http://answers.usa.gov/....",
"t": 1346619003,
"hc": 1308822087,
"cy": "Minneapolis",
"ll": [
44.9823,
-93.306297
]
}
Go here for field descriptions
What do you mean by graph?
We will have nodes, representing some piece of data, and connect them together in some meaningful way
How do we represent a graph?
Adjacency list
Node elements are stored in pairs with a weighting between them
The weight represents how "close" the pair are
becomes
A, B, 5
A, C, 4
A, D, 9
C, D, 7
D, E, 9
(graph shamelessly taken from d3)
redis zsets!
(you could also use a hash, but zset make it easier
to implement more complex algorithms)
example:
to store the DIRECTED graph node connections (A, B, 5) and (A, C, 1) we do:
# examples/04_redis_zset_graph.py
import redis
db = redis.StrictRedis(db=1)
db.zadd("A", 5.0, "B")
db.zadd("A", 1.0, "C")
# get all elements in the set with scores [0, ∞)
# returns a list of tuples in the form:
# [ (property, score), (property, score), ... ]
node_a = db.zrange("A", 0, -1, withscores=True)
print "Current graph:"
for link in node_a:
print "A, %s, %f" % (link[0], link[1])
$ python examples/04_redis_zset_graph.py
A, C, 1.000000
A, B, 5.000000
Now You!
Create a graph using the 1usagov data! Have referrer URL's (key r
) link to target URL (key u
) with weight corresponding to the frequency of the event
extra: How could you easily turn this into an UNDIRECTED graph
extra: What other cool graphs can you form with the data?
40min
You'll be done when you:
- Get a sense of how to build graphs in redis and what the limitations are
- Can think of some interesting analysis to run on the resulting graph and understand how to fetch the data to do so
- Think: wow, realtime streams really are awesome!
Bonus: try scripts/redis_graph_to_networkx.py to visualize your graph!
And that's all I know
Love,
Micha
Intentionally left blank
(or was it?)
←
→
/