# enter: SimpleHttp

## SimpleHttp

2. Open Source, http://github.com/bitly/simplehttp/
3. Provides simple bindings to various utilities including:
• Databases
• Queues
• Pubsubs
4. Everything is done with HTTP GET/POST requests which means there are no compatibility issues

# WTF is a plurbsurb pubsub?

## pubsub ~= Town Crier

### Takes information from the "king" and distributes it out to the "villagers"

Data comes in from our data sources and the pubsub publishes the data to all of the subscribers
note: there can be more than one data source all publishing to the same pubsub

# The Tools

## pubsub

• simple HTTP interface to a pubsub
• /pub to publish a message, /sub to subscribe

## simplequeue

• simple HTTP interface to a network queue
• /put to put a message on the queue, /get to get the top item

## ps_to_http

• simple pubsub subscriber that pushes messages to a queue
• it's the glue between a pubsub and a simplequeue

## curl

• simple way to fetch a URL
• similar to simply visiting the given url in the browser, but in the command line!
• try running: curl "http://google.com/"

## Start the pubsub

        $pubsub --port=8080 registering callback for path "/pub*" registering callback for path "/sub*" registering callback for path "/stats*" registering callback for path "/clients" listening on 0.0.0.0:8080   $ curl "http://localhost:8080/stats"
Active connections: 0
Total connections: 0
Messages sent: 0
Kicked clients: 0


## Start a queue

        $simplequeue --port=8081 Version: 1.3, http://code.google.com/p/simplehttp/ use --help for options registering callback for path "/put*" registering callback for path "/get*" registering callback for path "/mget*" registering callback for path "/mput*" registering callback for path "/dump*" registering callback for path "/stats*" listening on 0.0.0.0:8081   $ curl localhost:8081/stats
puts:0
gets:0
depth:0
depth_high_water:0
bytes:0
overflow:0


## Connect the pubsub to the queue

        $ps_to_http --destination-get-url="http://127.0.0.1:8081/put?data=%s" \ --pubsub-url="http://127.0.0.1:8080/sub?multipart=0" AUTODETECTING ENCODING FOR http://127.0.0.1:8080/sub?multipart=0 CONNECTING TO SOURCE http://127.0.0.1:8080/sub?multipart=0  # Congrats! You setup a data pipeline ##### (with no data) ## Adding data is easy!  $ echo "H3110" | curl -d@- "http://localhost:8080/pub"
Published 1 messages to 1 clients.


## And Fetching data is also easy!

        $curl -s "http://localhost:8081/stats" # get the stats for our simplequeue puts:1 gets:0 depth:1 depth_high_water:1 bytes:5 overflow:0$ curl -s "http://localhost:8081/get" # fetch the data
H3110$curl -s "http://localhost:8081/stats" # look at the stats again puts:1 gets:1 depth:0 depth_high_water:1 bytes:0 overflow:0  ## Now You! • ##### Create a pubsub/simplequeue chain, add data to it and retrieve it. • ##### What happens if you have multiple things reading from the queue? Who gets what data? • ##### Try using scripts/count_to_pubsub.sh to help play around with data-flow • ##### 15min ## You'll be done when you: 1. Understand the flow of data through the pubsub/simplequeue chain 2. Thought this is AWESOME at least 3 times # So... I have to do everything with curl? ## or: why python is so amazing # enter: pysimplehttp ### (part of bitly's simplehttp project) ## pysimplehttp.BaseReader ~= translator ### translates the low level simplehttp protocol into something more useful ### you simply define tasks on data and the BaseReader will perform all tasks on all data in the simplequeue! ### NOTE: BaseReader operates on JSON blobs ## pysimplehttp.BaseReader ### The important bits:  BaseReader( simplequeue_address, # location of data ("http://localhost:8081") all_tasks, # dict of functions, all of which get all data queuename=None, # optional name for the queue max_tries=5, # how many times to retry on piece of data requeue_delay=90, # time data waits after being requeued mget_items=0, # batch size to fetch from simplequeue preprocess_method=None, # function to preprocess data validate_method=None, # function that decides to process a given data requeue_giveup=None, # function to run on data we give up on ) ###### Read the docs! There are a bunch of other cool options ## Simple Example  # examples/01_simple_reader.py from pysimplehttp.BaseReader import BaseReader from pysimplehttp.http import pubsub_write def print_message(data): print data return True # we must return True to say our tasked finished correctly if __name__ == "__main__": tasks = {"print" : print_message} # tasks are named for logging reader = BaseReader("http://localhost:8081", tasks, queuename="message_printer") # There is also easy utilities to write to a pubsub! pubsub_write("http://localhost:8080", {'msg' : 'H4110!'}) reader.run()  ## Simple Example (cont) $ python examples/01_simple_reader.py
{u'msg': u'H3110'}


## You'll be done when you:

1. Experimented with and understand the message guarantees with multiple pubsub subscribers or data consumers
2. Extra: can you say anything about the statistics of the numbers through the reader?

# FAST MATH!

## Online Algorithms

• Algorithms that update per datapoint
• Perfect for streams: algorithms are single pass and don't require batch operations

# First, the algorithm!

## Knuth's Algorithm

• define: $$x_n =$$ observation @ time n
• mean = $$\bar{x}_n = \bar{x}_{n-1} + \frac{x_n - \bar{x}_{n-1}}{n}$$
• aux variable = $$M_{2,n} = M_{2,n-1} + \left( x_n - \bar{x}_n \right)^2$$
• sample variance = $$\frac{M_{2,n}}{n-1}$$
• population variance = $$\frac{M_{2,n}}{n}$$

## What is this $$M_{2,n}$$ variable?

• Sum of the powers of the difference from the mean, related to central moments
• $$M_{k,n} = \sum \left( x_n - \bar{x}_n \right)^k$$
• Higher order M's can be used to calculate higher order moments of your distribution
• ie: To get kurtosis, calculate: $$\{ M_{2,n}, M_{3,n}, M_{4,n} \}$$

# Critical question: will it blend converge?

## slight tangent: numerical stability

### Algorithms generally go unstable when:

• Two very similar numbers are subtracted from each-other (floating point error)
• High precision is necessary in your calculation (ie: irrational numbers or significance in very small decimals)
• The algorithm itself isn't stable (ie: $$\sum^{\infty}x^2$$)

### Rules of Thumb

• Try to keep your numbers as close to 1 as possible (ie: normalize!)
• Simple algebraic manipulations may be enough to reformulate an algorithm to avoid floating point error (see wiki/Floating point error#Accuracy Problems)
• log space is a nice place to be
• Sometimes you just have to find a new algorithm or reformulate the problem!

## You'll be done when you:

1. Know what the secret mean/variance of my stream is!
2. Feel comfortable with your implementation of the algorithm
3. Understand possible stability concerns with the algorithm

# quick over view of my favorite important data structures in redis

## example:

### Let's connect to redis, set key and retrieve it

# examples/02_redis_set.py
import redis
import random

db = redis.StrictRedis(db=1)
db.set("test_value", random.random())

value = db.get("test_value")
print "Random number = ", value
print "Response type = ", type(value)

$python examples/02_redis_set.py Random number = 0.358030987168 Response type = <type 'str'> ## example: ### Let's connect to redis, set hash and increase values atomically # examples/03_redis_hash.py import redis db = redis.StrictRedis(db=1) db.hset("micha", "friends", 1) db.hset("dan", "friends", 9001) db.hset("mike", "friends", 24) for i in range(100): db.hincrby("micha", "friends", 10) print "dan has %s friends" % db.hget("dan", "friends") print "mike has %s friends" % db.hget("mike", "friends") print "micha has %s friends" % db.hget("micha", "friends") $ python 03_redis_hash.py
dan has 9001 friends
mike has 24 friends
micha has 1001 friends

## aside: atomic-ity

### What happens if I update a value in redis like this with multiple processes?

import redis

db = redis.StrictRedis(db=1)

value = db.get("important_number")
value += 1  # line (A)
db.set("important_number", value)

print "The value is currently %s" % value

### We start with a value of n=0 in the db

t  | Process 1 (Mr. T)
------------------------
0  |
1  | value = db.get("n")
2  |
3  | value += 1 # line (A)
4  |
5  | db.set("n", value)
6  |

t  | Process 2 (Bozo)
------------------------
0  |
1  | value = db.get("n")
2  |
3  | value += 1 # line (A)
4  |
5  |
6  | db.set("n", value)


# atomic operations FTW!

## You'll be done when you:

1. You have a sense of which datatype to use for which problem
2. You understand the data-flow with multiple queuereaders
3. Extra: understand transactions and atomicity of a redis db
4. Extra: understand the benefits of a connection pipeline

# Let's do something interesting!

## First, let's check out the data source!

### http://developer.usa.gov/1usagov is the URL of a pubsub!


$curl -s http://developer.usa.gov/1usagov | head -n 1 { "a": "Mozilla\/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident\/5.0)", "c": "US", "nk": 0, "tz": "America\/Chicago", "gr": "MN", "g": "lyz2Nv", "h": "lyz2Nv", "l": "bitly", "al": "en-US", "hh": "1.usa.gov", "r": "http:\/\/www.usa.gov\/index.shtml", "u": "http:\/\/answers.usa.gov\/system\/selfservice.controller?CONFIGURATION=1000&PARTITION_ID=1&CMD=VIEW_ARTICLE&USERTYPE=1&LANGUAGE=en&COUNTRY=US&ARTICLE_ID=10564", "t": 1346619003, "hc": 1308822087, "cy": "Minneapolis", "ll": [ 44.982300, -93.306297 ] }  { "a": "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)", "c": "US", "nk": 0, "tz": "America/Chicago", "gr": "MN", "g": "lyz2Nv", "h": "lyz2Nv", "l": "bitly", "al": "en-US", "hh": "1.usa.gov", "r": "http://www.usa.gov/index.shtml", "u": "http://answers.usa.gov/....", "t": 1346619003, "hc": 1308822087, "cy": "Minneapolis", "ll": [ 44.9823, -93.306297 ] } ### Go here for field descriptions # So much possibility! # Let's try making a graph ## What do you mean by graph? ### We will have nodes, representing some piece of data, and connect them together in some meaningful way # How do we represent a graph? ## Adjacency list ### Node elements are stored in pairs with a weighting between them ### The weight represents how "close" the pair are #### becomes A, B, 5 A, C, 4 A, D, 9 C, D, 7 D, E, 9  ###### (graph shamelessly taken from d3) # How will we store this? # redis zsets! ### (you could also use a hash, but zset make it easierto implement more complex algorithms) ## example: ### to store the DIRECTED graph node connections (A, B, 5) and (A, C, 1) we do: # examples/04_redis_zset_graph.py import redis db = redis.StrictRedis(db=1) db.zadd("A", 5.0, "B") db.zadd("A", 1.0, "C") # get all elements in the set with scores [0, ∞) # returns a list of tuples in the form: # [ (property, score), (property, score), ... ] node_a = db.zrange("A", 0, -1, withscores=True) print "Current graph:" for link in node_a: print "A, %s, %f" % (link[0], link[1])  $ python examples/04_redis_zset_graph.py
A, C, 1.000000
A, B, 5.000000

# we can also do some other cool stuff with zset

## You'll be done when you:

1. Get a sense of how to build graphs in redis and what the limitations are
2. Can think of some interesting analysis to run on the resulting graph and understand how to fetch the data to do so
3. Think: wow, realtime streams really are awesome!

/