Singly and doubly linked lists in python

Singly-linked list implementation with python:

class Node(object):
 
    def __init__(self, data, next):
        self.data = data
        self.next = next
 
 
class SingleList(object):
 
    head = None
    tail = None
 
    def show(self):
        print "Showing list data:"
        current_node = self.head
        while current_node is not None:
            print current_node.data, " -> ",
            current_node = current_node.next
        print None
 
    def append(self, data):
        node = Node(data, None)
        if self.head is None:
            self.head = self.tail = node
        else:
            self.tail.next = node
        self.tail = node
 
    def remove(self, node_value):
        current_node = self.head
        previous_node = None
        while current_node is not None:
            if current_node.data == node_value:
                # if this is the first node (head)
                if previous_node is not None:
                    previous_node.next = current_node.next
                else:
                    self.head = current_node.next
 
            # needed for the next iteration
            previous_node = current_node
            current_node = current_node.next
 
 
s = SingleList()
s.append(31)
s.append(2)
s.append(3)
s.append(4)
 
s.show()
s.remove(31)
s.remove(3)
s.remove(2)
s.show()

Doubly-linked list implementation in python

class Node(object):
 
    def __init__(self, data, prev, next):
        self.data = data
        self.prev = prev
        self.next = next
 
 
class DoubleList(object):
 
    head = None
    tail = None
 
    def append(self, data):
        new_node = Node(data, None, None)
        if self.head is None:
            self.head = self.tail = new_node
        else:
            new_node.prev = self.tail
            new_node.next = None
            self.tail.next = new_node
            self.tail = new_node
 
    def remove(self, node_value):
        current_node = self.head
 
        while current_node is not None:
            if current_node.data == node_value:
                # if it's not the first element
                if current_node.prev is not None:
                    current_node.prev.next = current_node.next
                    current_node.next.prev = current_node.prev
                else:
                    # otherwise we have no prev (it's None), head is the next one, and prev becomes None
                    self.head = current_node.next
                    current_node.next.prev = None
 
            current_node = current_node.next
 
    def show(self):
        print "Show list data:"
        current_node = self.head
        while current_node is not None:
            print current_node.prev.data if hasattr(current_node.prev, "data") else None,
            print current_node.data,
            print current_node.next.data if hasattr(current_node.next, "data") else None
 
            current_node = current_node.next
        print "*"*50
 
 
d = DoubleList()
 
d.append(5)
d.append(6)
d.append(50)
d.append(30)
 
d.show()
 
d.remove(50)
d.remove(5)
 
d.show()

Implementing a fake filesystem using a trie in python

A trie (also sometimes called a prefix tree or radix tree) is an ordered tree data structure. I was looking at ways to implement such a thing the other day, and came up with 3 possible applications, off the top of my head:

  • a URL shortening service
  • a virtual filesystem
  • a search engine

The idea behind a trie is that you can store shared parts of data in an ordered fashion. If you have a URL shortening service that needs to keep track of the url “http://baseurl.com/ABCDE” and “http://baseurl.com/ABCDEF”, you’re storing almost 2 identical strings, with a 1 character difference. How could we do this better? This is what a trie looks like:

secondTrie

It’s basically a map (or dictionary in python) of nested maps. It can be handy to have a delimiter when a node is the last one, but this is not necessary. To exemplify how it works in practice, I’ve started coding a quick and dirty fake filesystem in python. The idea behind using a trie to hold data structures if the same: if I have a directory /var, a directory, /var/www and another directory /var/www/mysite, it’s not necessary to repeat the “var” part 3 times, nor the “www” twice.

There are tons of tricky things about filesystems that complicates matters such as hidden files, creating intermediate directories when they don’t exist, permissions, etc. but I’ve ignored them for the most part here. I mainly wanted to implement an “add” function to see if this worked with a trie, and it sort of looks like this:

    @_input_path_sanitizer
    def add(self, path):
        logging.info("Adding directory {}".format(path))
        pieces = self.get_pieces(path)
        temp_trie = self._paths
        for piece in pieces:
            # if we're at the end of the trie, take out the delimiter.
            if piece in temp_trie and self.__end in temp_trie[piece]:
                del temp_trie[piece][self.__end]
            # setdefault() tries to get the key, if it exists. if it doesn't set it to the second parameter
            temp_trie = temp_trie.setdefault(piece, {})
        # set the delimiter here in all cases
        temp_trie.setdefault(self.__end, self.__end)

The whole thing is on github, but this function does most of the heavy lifting. Then you can do stuff like :

fs = FakeFs()
fs.mkdir("/poo/woo/soo/doo/poo")
fs.mkdir("/etc")
fs.mkdir("/")
fs.mkdir("/tmp")
fs.mkdir("/tmp/whatever")
fs.mkdir("/var/www/mysite")
fs.dir_exists("/sadsad")
fs.dir_exists("/etc")
print(fs.paths)

Which yields something along the lines of:

2014-08-23 12:15:42,449 Adding directory /poo/woo/soo/doo/poo
2014-08-23 12:15:42,449 Adding directory /etc
2014-08-23 12:15:42,449 Adding directory /
2014-08-23 12:15:42,449 Adding directory /tmp
2014-08-23 12:15:42,450 Adding directory /tmp/whatever
2014-08-23 12:15:42,450 Adding directory /var/www/mysite
2014-08-23 12:15:42,450 /sadsad does not exist
2014-08-23 12:15:42,450 /etc exists
['/', '/etc', '/poo/woo/soo/doo/poo', '/tmp/whatever', '/var/www/mysite']

Python threads VS processes

I was revisiting Jeff Knupp’s great article from 2012, Python’s hardest problemIt talks about the Global Interpreter Lock, or GIL, in python. It basically explains how the GIL works, and why it’s such an important problem for python coders.

Probably the most notable consequence of the GIL is that python cannot do “pure” multi-threaded operations, in the sense that only one thread can execute at any time. The GIL prevents strange things from happening when you can have more than one thread write to the same chunk of memory. Knupp also wrote a follow-up to that article, Python’s hardest problem, revisitedwhere he advises people who want to do many things at the same time (parallelism) to use the multiprocessing module.

It’s great advice, I’ve used multiprocessing in the wild. It needs a bit more effort to communicate data between the processes (typically using queues), but it’s well worth the security that separate processes afford you. In essence, every process can then run it’s own thread without sharing any memory.

As I was reading down the article, I noticed he didn’t have any examples! So I started playing around, just for fun. Let’s make a very simple  program that appends integers from 0 to 999998 and discards the list, 50 times.

Version 1: simple single-threaded

import time
 
nb_repeat = 50
 
 
def a_complex_operation(*args):
    a = []
    for x in range(999999):
        a.append(x)
    return None
 
 
t1 = time.time()
for _ in range(nb_repeat):
    a_complex_operation()
print time.time()-t1

Running time: 4.82960796356 seconds

Version 2: with processes

from multiprocessing import Pool
nb_repeat = 50
 
def a_complex_operation(*args):
    a = []
    for x in range(999999):
        a.append(x)
    return None
pool = Pool(processes=nb_repeat)
results = pool.map(a_complex_operation, [None for _ in range(nb_repeat)])

Running time: 2.74916887283 seconds! Almost half the initial time.

Version 3: threaded version

from threading import Thread
import time
 
nb_repeat = 50
 
def a_complex_operation(*args):
    a = []
    for x in range(999999):
        a.append(x)
    return None
 
t1 = time.time()
threads = []
for _ in range(nb_repeat):
    threads.append(Thread(target=a_complex_operation))
 
[x.start() for x in threads]
[x.join() for x in threads]
print time.time()-t1

Running time: 14.0888431072 seconds!

Not extremely surprising, but quite interesting still. As expected, the version with processes is the fastest. But threading our program does not only not improve its running time, it actually slows it down quite a bit. This is probably due to the overhead involved in switching context between 50 threads. The multiprocess version is a nice little optimization, but it’s fair to say that the normal, single-threaded version is running pretty quickly too. When in doubt, keep it simple!

Efficient paging with mysql and redis ordered sets

Paging API results with mysql

Let’s say you’re building an API and need to fetch a bunch of products, based on category. Your query will probably look something like this:

SELECT * FROM products WHERE category=1;

Now, this query might have thousands of results, and you’re certainly not going to return them all at once. You need to page your results – what are the possible solutions? Well, the obvious solution is to use an offset :

SELECT * FROM products WHERE category=1 LIMIT 100 OFFSET 200;

This would return page=2, if you return 100 results per page. But then you’re going to start running into problems very quickly when you get into this sort of territory:

SELECT * FROM products WHERE category=1 LIMIT 100 OFFSET 4000;

As explained here, offset actually needs to run through those 4000 rows before returning the ones you’re after.

There are a few other solutions, such as adding an id in the where clause like such:

SELECT * FROM products WHERE category=1 WHERE id > 12345 LIMIT 100;

But this only works if:

  • We’re building a webpage with a bunch of links for pages, to which we can append something like “last_id_seen=456″ as a querystring parameter and use that as our criterion (basically building the page links all in advance). This example case is for an API, so that won’t work. Also this is not a very elegant solution, I find.
  • Or if your ids are all perfectly consecutive, with no gaps, which will never happen.

Better paging with redis ordered sets

The main idea here is to perform our DB query only once and cache the result (pretty standard). There is an important nuance though, we cache the result in an ordered set, and not a large serialized string, so that we can query only the parts that we want (remember, the result set might be 10 000 rows, we don’t want to load the whole thing, and then slice it). For this we’ll use 2 redis keys

  • products/12345 as an ordered set to hold the products data from category 12345. Ordered sets cannot expire, so we need another key to know when to refresh the data.
  • valid:products/12345 as a normal key/value to tell us whether the data should be expired or not. For this example, I push the length of the result set to this key (I reuse it later to indicate the total length of the result set to the client response), but it could be anything else, really.

As mentioned earlier we need to push data into the ordered set by having continuous indexes we can use to query certain ranges. Redis’ ordered sets fits this perfectly, as you can assign a “score” to each element, and the value that matches it. Assuming a python implementation with Flask, here’s what it might look like:

cache_key = "products/{0}".format(product_category)
cache_key_valid = "valid:{0}".format(cache_key)
 
# get querystring parameters from the query. implement these as you wish.
count, page = filtered_params
offset = count*page
 
# we use a separate key to check whether we need to refresh the data in set
key_is_valid = redisser.get_key_is_valid(current_app.redis_conn, cache_key_valid)
 
# cache has expired, grab the data from db
if key_is_valid is None:
    # first make sure to delete the current redis cached data
    redisser.delete_ordered_set(current_app.redis_conn, cache_key)
 
    # get everything from DB, load up in redis
    products = current_app.db.get_products_for_category(category)
    redisser.add_elements_to_sorted_set(current_app.redis_conn, cache_key, products)
 
    # get the count of all elements, put in valid key element of redis
    cache_count = redisser.get_ordered_set_count(current_app.redis_conn, cache_key)
    redisser.add_key_valid(current_app.redis_conn, cache_key_valid, cache_count)
 
# the correct data is now in redis. fetch the right range from it
cached_products = redisser.get_products(current_app.redis_conn, cache_key, offset, count)
products = [pickle.loads(x) for x in cached_products]

As you’ll notice, we check to see if our “valid” key has expired yet. If the value is None, we reload the data from DB into an ordered set. We need to delete the set first, otherwise we might have extra data leftover if we insert new data that is smaller than the original one that was there (assuming the same key name). For good measure, here is what the other functions could look like:

def get_ordered_set_count(r, key):
    return r.zcount(key, "-inf", "+inf")
 
 
def add_key_valid(r, key, count):
    expiry = 60*60
    r.set(key, count, expiry)
 
 
def get_key_is_valid(r, key):
    result = r.get(key)
    return result
 
 
def delete_ordered_set(r, key):
    return r.delete(key)
 
 
def get_products(r, key, offset, count):
    upper_boundary = offset+int(count)-1
    return r.zrange(key, offset, upper_boundary)
 
 
def add_elements_to_sorted_set(r, key, data):
    f1 = (i for i in range(len(data)))
    f2 = (pickle.dumps(x) for x in data)
    together = zip(f1, f2)
    merged_list = list(itertools.chain(*together))
    r.zadd(key, *merged_list)

The magic really happens in the get_products function, where we query the ordered set and return only a particular range. In add_elements_to_sorted_set we define the score of the elements to just be a number from 0 to len(data). We’re basically rebuilding a small queryable table with consecutive ids, in memory.

This solution still means you have to optimize your original SQL query to be reasonably fast, otherwise the first client to hit the server with no cache available will wait a very long time. I’ve managed to get such queries returning under 1000 milliseconds, and everything else just hits redis. You should also play with the expiry time of the “valid” key, depending on how fresh you need your data to be.

It’s the small things

I few weeks ago, Jetbrains (the ones that make the excellent PHPStorm, IntelliJ, RubyMine and others) released the community version of PyCharm – and since I code python most of the day…woohoo! I had tried other IDEs but nothing really stuck so I just kept using Sublime for the last year or two, which worked so so. Sublime is just a pretty notepad, there isn’t that much more to it (I DO like writing my essays on it though).

After about 2 weeks on PyCharm, I can already feel a difference. I’m not very good with the shortcuts yet, but all the small annoyances that make up that stinky PEP-8 specification are starting to sink in (notice the nice space after every comma?). It also helps you with small idioms and repetitive chunks of code. Case in point:

if a < 10 and a > 20:
    dostuff()

Will yield a squiggly line that you can click and…

if 10 > a > 20:
    dostuff()

Boom! Simplified chained comparison. Of course it also tells you which vars are not used, or referenced before being set and other nice nifty things – but these little tricks add up. I cannot recommend this IDE enough (it’s fast too!), try it – your codebase will definitively benefit from it.

Using a python queue to upload files to S3 using Boto

Not too long ago I wrote a quick article on how to upload files using boto and the multiprocessing module or the Threading module. Here’s yet another variant, using a Queue!

AWS_KEY = ""
AWS_SECRET = ""
 
import time
from boto.s3.connection import S3Connection
from Queue import *
from threading import Thread
 
number_workers = 4
q = Queue()
 
filenames = ['1.json', '2.json', '3.json', '4.json', '5.json', '6.json', '7.json', '8.json', '9.json', '10.json']
 
# the actual upload
def upload(myfile):
    conn = S3Connection(aws_access_key_id=AWS_KEY, aws_secret_access_key=AWS_SECRET)
    bucket = conn.get_bucket("parallel_upload_tests")
    key = bucket.new_key(myfile).set_contents_from_string('some content')
    print "Uploaded %s." % myfile
 
# each worker does this job
def pull_from_queue():
    while True:
        item = q.get()
        print "Found %s in queue" % item
        upload(item)
 
# init the workers
for i in range(number_workers):
    t = Thread(target=pull_from_queue)
    t.daemon = True
    t.start()
 
# put files in the queue
for fname in filenames:
    q.put(fname)
 
while True:
    time.sleep(1)

True can be False and False can be True

In python, you can actually do :

In [1]: True=False
In [2]: True is False
Out[2]: True

Seems a bit dangerous, as you modify the behavior of True/False, but as this post points out, you can do stuff like this:

>>> something=True=False
>>> myVariable=something
>>> myOtherVariable=True
>>> print myVariable
False
>>> print myOtherVariable
False
>>> print True
False

Parallel S3 uploads using Boto and threads in python

A typical setup

Uploading multiple files to S3 can take a while if you do it sequentially, that is, waiting for every operation to be done before starting another one. S3 latency can also vary, and you don’t want one slow upload to back up everything else. Here’s a typical setup for uploading files – it’s using Boto for python :

AWS_KEY = "your_aws_key"
AWS_SECRET = "your_aws_secret"
 
from boto.s3.connection import S3Connection
 
filenames = ['1.json', '2.json', '3.json', '4.json', '5.json', '6.json', '7.json', '8.json', '9.json', '10.json']
conn = S3Connection(aws_access_key_id=AWS_KEY, aws_secret_access_key=AWS_SECRET)
for fname in filenames:
    bucket = conn.get_bucket("parallel_upload_tests")
    key = bucket.new_key(fname).set_contents_from_string('some content')
    print "uploaded file %s" % fname

Nothing fancy, this works fine, and it reuses the same S3Connection object. If I print the execution time though, it’s around 1.3 seconds.

How to speed this up

A) Using the multiprocessing module’s ThreadPool (concurrency)

Python has a multiprocessing module, which allows you to “side-step the Global Interpreter Lock by using subprocesses instead of threads”. What this means is that if you have a multi-processor machine, you can leverage them to your advantage. Here’s an example using a ThreadPool:

AWS_KEY = "your_aws_key"
AWS_SECRET = "your_aws_secret"
 
from boto.s3.connection import S3Connection
from multiprocessing.pool import ThreadPool
 
filenames = ['1.json', '2.json', '3.json', '4.json', '5.json', '6.json', '7.json', '8.json', '9.json', '10.json']
conn = S3Connection(aws_access_key_id=AWS_KEY, aws_secret_access_key=AWS_SECRET)
 
def upload(myfile):
        bucket = conn.get_bucket("parallel_upload_tests")
        key = bucket.new_key(myfile).set_contents_from_string('some content')
        return myfile
 
pool = ThreadPool(processes=10)
pool.map(upload, filenames)

Execution time? 0.3 seconds! That’s about 4X faster than our previous example. I’m running this example on a 4-CPU ThinkPad. Note that there’s an overhead cost of starting a 10 process ThreadPool as opposed to just using the same process over and over. Also note that we’re also reusing our S3Connection here, since we’re using subprocesses and not threads per se.

B) Using threads (parallelism)

This solution will effectively spawn new threads of control, which can be quite expensive. Also important to note, we can’t reuse our S3 connection here since Boto’s library isn’t thread-safe, apparently:

AWS_KEY = "your_aws_key"
AWS_SECRET = "your_aws_secret"
 
from boto.s3.connection import S3Connection
import threading
 
filenames = ['1.json', '2.json', '3.json', '4.json', '5.json', '6.json', '7.json', '8.json', '9.json', '10.json']
def upload(myfile):
    conn = S3Connection(aws_access_key_id=AWS_KEY, aws_secret_access_key=AWS_SECRET)
    bucket = conn.get_bucket("parallel_upload_tests")
    key = bucket.new_key(myfile).set_contents_from_string('some content')
    return myfile
 
for fname in filenames:
    t = threading.Thread(target = upload, args=(fname,)).start()

Execution time? 0.018 seconds, about 72X faster than our original script. Not bad at all – but don’t forget, we’re creating 10 threads here, uploading the files in parallel. Your threads will automatically die when the uploads finish, “when its run() method terminate” according to the docs. Please keep in mind that if have tons of files to upload at once, this might not be the best approach – on this topic, here’s a good discussion on How Many Threads is Too Many?

python map()

A map is used to :

Apply function to every item of iterable and return a list of the results. [...] The iterable arguments may be a sequence or any iterable object; the result is always a list.

Let’s create a function that simply adds 1 to any number:

def addone(x): 
    return x+1

Let’s define a list of numbers. If we call the function addone() on every item of “numbers”, we get the following:

numbers = [0,1,2,3,4]
print map(addone, numbers)
>>> [1, 2, 3, 4, 5]

If we wanted, we could actually do this with a list comprehension – it’s just a little longer – something like this:

print [addone(x) for x in numbers]
>>> [1, 2, 3, 4, 5]

So map() is actually just an efficient shortcut for a loop that calls the function on every item:

added_numbers = []
for number in numbers:
    added_numbers.append(addone(number))
print added_numbers
>>> [1, 2, 3, 4, 5]

But now our code is getting out of hand…Let’s try something else. We can pass multiple arguments to a map(), which have to all be of the same size. Otherwise they will be defaulted to None.

Let’s say we have a multiplier function:

def multiply(x, y): 
    return x*y

Now let’s create two lists of numbers to be multiplied together, and use map() to do so:

numbers = [0,1,2,3,4]
multipliers = [10, 20, 30, 40, 50]
print map(multiply, numbers, multipliers)
>>> [0, 20, 60, 120, 200]

Same result, with a list comprehension (slightly longer) :

print [multiply(numbers[x], multipliers[x]) for x in xrange(len(numbers))]
>>> [0, 20, 60, 120, 200]

And just to be fully ridiculous, with the complete loop :

multiplied_numbers = []
for number_index in xrange(len(numbers)):
    multiplied_numbers.append(multiply(numbers[number_index], multipliers[number_index]))
print multiplied_numbers
>>> [0, 20, 60, 120, 200]

But really, don’t do this. Just use map()!

Create groups from lists with itertools.groupby

Simple lists

Given a list that looks like this:

animals = ['cow', 'cow', 'bird', 'pony', 'pony', 'pony', 'fish', 'cow']

Let’s say we wanted to ‘group’ together all the animals that are the same. Instead of looping through all the elements and keep temporary lists, let’s use itertools.groupby.

import itertools
for key, group in itertools.groupby(animals):
    print key, group

We effectively get :

cow <itertools._grouper object at 0x7faa6266af50>
bird <itertools._grouper object at 0x7faa6266af90>
pony <itertools._grouper object at 0x7faa6266af50>
fish <itertools._grouper object at 0x7faa6266af90>
cow <itertools._grouper object at 0x7faa6266af50>

From the docs:

Make an iterator that returns consecutive keys and groups from the iterable. The key is a function computing a key value for each element. If not specified or is None, key defaults to an identity function and returns the element unchanged. Generally, the iterable needs to already be sorted on the same key function.

The returned group is itself an iterator that shares the underlying iterable with groupby(). Because the source is shared, when the groupby() object is advanced, the previous group is no longer visible. So, if that data is needed later, it should be stored as a list:

You’ll notice we have multiple groups of the same animal. To group ALL of them together, just sort the list.

import itertools
for key, group in itertools.groupby(sorted(animals)):
    print key, group
bird <itertools._grouper object at 0x7f6e21c97f10>
cow <itertools._grouper object at 0x7f6e21c97f50>
fish <itertools._grouper object at 0x7f6e21c97f10>
pony <itertools._grouper object at 0x7f6e21c97f50>

And since we sorted, we also get things in alphabetical order.

Lists of dictionaries

Now given the following dict :

animals = [
    {'name':'cow', 'size':'large'}
    {'name':'bird', 'size':'small'},
    {'name':'fish', 'size':'small'},
    {'name':'rabbit', 'size':'medium'},
    {'name':'pony', 'size':'large'},
    {'name':'squirrel', 'size':'medium'},
    {'name':'fox', 'size':'medium'}
]

Let’s say we wanted to group by animal size. We could do something like:

for x in xrange(len(animals)):
    if x>0 and animals[x]['size'] == animals[x-1]['size']:
        #add to a new dict or something...

But that wouldn’t be very pythonic…With itertools:

import itertools
for key, group in itertools.groupby(animals, key=lambda x:x['size']):
    print key, group
large <itertools._grouper object at 0x7fce989a2f50>
small <itertools._grouper object at 0x7fce989a2f90>
medium <itertools._grouper object at 0x7fce989a2f50>
large <itertools._grouper object at 0x7fce989a2f90>
medium <itertools._grouper object at 0x7fce989a2f50>

Once again, let’s sort that list:

import itertools
from operator import itemgetter
sorted_animals = sorted(animals, key=itemgetter('size'))
for key, group in itertools.groupby(sorted_animals, key=lambda x:x['size']):
    print key,
    print list(group)

And the result:

large [{'name': 'cow', 'size': 'large'}, {'name': 'pony', 'size': 'large'}]
medium [{'name': 'rabbit', 'size': 'medium'}, {'name': 'squirrel', 'size': 'medium'}, {'name': 'fox', 'size': 'medium'}]
small [{'name': 'bird', 'size': 'small'}, {'name': 'fish', 'size': 'small'}]