Steal This Database
After we built an RSVPs stream for HTTP and WebSockets, it was only natural to extend the realtime functionality to meetups themselves. You could find out about a lot of meetup events by monitoring the RSVPs stream, but only a stream that’s triggered by event creations and changes can keep a listener fully up-to-date on the fundamental details of the event.
The pacing of such a stream is a little irregular, since unlike RSVPs event changes are not an everyday user’s activity. Organizers may create an event and then update it two or three times right after. And for events that repeat, a number of messages are fired rapidly when the event is created and any time the series is edited.
So this stream runs in starts and fits. Unlike the RSVPs stream it would not be fun to just stare at, but it’s just the thing for keeping an always-fresh copy of all the upcoming events in public meetup groups. The longer you consume the stream the more complete your data set is, until you eventually have a meetup mirror with a lag time of less than a second.
With the right libraries, it’s a cinch to consume and store a chunked HTTP stream. For our steal demo app in Python, we opted to use PycURL to consume the stream and sqlite3 to store it. All that’s left to write is glue code. In case that sounds like an exaggeration, here’s all the code:
#!/usr/bin/env python import stream, storage if __name__ == "__main__": # start piping the stream to our storage stream.jsonizer(storage.event_callback)
Here we’re just importing the other modules and hooking them up.
import pycurl, json def jsonizer(event_callback): """Passes json dicts to the given callback""" last = [""] def consume(data): """Buffers data and invokes event_callback on completed lines""" lines = (last + data).split("\n") for l in lines[:-1]: event_callback(json.loads(l)) last = lines[-1] conn = pycurl.Curl() conn.setopt(pycurl.URL, "http://stream.meetup.com/2/open_events") conn.setopt(pycurl.WRITEFUNCTION, consume) # perform() blocks until interrupted or connection is lost conn.perform()
Our stream module reads in the stream, parses completed lines as JSON, then passes them to the supplied event callback function.
from __future__ import with_statement import sqlite3 conn = sqlite3.connect("meetups.sqlite") # create tables if not already there with conn: conn.execute("""create table if not exists event (id text, name text, description text, url text, time integer)""") conn.execute("create unique index if not exists event_id on event (id)") def event_callback(event): """Passed event dictionaries as they are streamed in""" with conn: values = [event.get(k, None) for k in ['id', 'name', 'description', 'event_url', 'time']] conn.execute("""insert or replace into event values (?, ?, ?, ?, ?)""", values) print("stored %s" % event['name'])
And lastly, the storage module creates the database and table if they happen to not exist yet and supplies a callback function that stores event objects into its database.
Which is all there is to it. This is a simplistic example, but it covers the novel parts of consuming an HTTP stream that might otherwise be figured difficult.
Discovering the data’s hidden truths about local group organization is an exercise left up to the thief.