Using a python queue to upload files to S3 using Boto

Not too long ago I wrote a quick article on how to upload files using boto and the multiprocessing module or the Threading module. Here’s yet another variant, using a Queue!

AWS_KEY = ""
AWS_SECRET = ""
 
import time
from boto.s3.connection import S3Connection
from Queue import *
from threading import Thread
 
number_workers = 4
q = Queue()
 
filenames = ['1.json', '2.json', '3.json', '4.json', '5.json', '6.json', '7.json', '8.json', '9.json', '10.json']
 
# the actual upload
def upload(myfile):
    conn = S3Connection(aws_access_key_id=AWS_KEY, aws_secret_access_key=AWS_SECRET)
    bucket = conn.get_bucket("parallel_upload_tests")
    key = bucket.new_key(myfile).set_contents_from_string('some content')
    print "Uploaded %s." % myfile
 
# each worker does this job
def pull_from_queue():
    while True:
        item = q.get()
        print "Found %s in queue" % item
        upload(item)
 
# init the workers
for i in range(number_workers):
    t = Thread(target=pull_from_queue)
    t.daemon = True
    t.start()
 
# put files in the queue
for fname in filenames:
    q.put(fname)
 
while True:
    time.sleep(1)

Parallel S3 uploads using Boto and threads in python

A typical setup

Uploading multiple files to S3 can take a while if you do it sequentially, that is, waiting for every operation to be done before starting another one. S3 latency can also vary, and you don’t want one slow upload to back up everything else. Here’s a typical setup for uploading files – it’s using Boto for python :

AWS_KEY = "your_aws_key"
AWS_SECRET = "your_aws_secret"
 
from boto.s3.connection import S3Connection
 
filenames = ['1.json', '2.json', '3.json', '4.json', '5.json', '6.json', '7.json', '8.json', '9.json', '10.json']
conn = S3Connection(aws_access_key_id=AWS_KEY, aws_secret_access_key=AWS_SECRET)
for fname in filenames:
    bucket = conn.get_bucket("parallel_upload_tests")
    key = bucket.new_key(fname).set_contents_from_string('some content')
    print "uploaded file %s" % fname

Nothing fancy, this works fine, and it reuses the same S3Connection object. If I print the execution time though, it’s around 1.3 seconds.

How to speed this up

A) Using the multiprocessing module’s ThreadPool (concurrency)

Python has a multiprocessing module, which allows you to “side-step the Global Interpreter Lock by using subprocesses instead of threads”. What this means is that if you have a multi-processor machine, you can leverage them to your advantage. Here’s an example using a ThreadPool:

AWS_KEY = "your_aws_key"
AWS_SECRET = "your_aws_secret"
 
from boto.s3.connection import S3Connection
from multiprocessing.pool import ThreadPool
 
filenames = ['1.json', '2.json', '3.json', '4.json', '5.json', '6.json', '7.json', '8.json', '9.json', '10.json']
conn = S3Connection(aws_access_key_id=AWS_KEY, aws_secret_access_key=AWS_SECRET)
 
def upload(myfile):
        bucket = conn.get_bucket("parallel_upload_tests")
        key = bucket.new_key(myfile).set_contents_from_string('some content')
        return myfile
 
pool = ThreadPool(processes=10)
pool.map(upload, filenames)

Execution time? 0.3 seconds! That’s about 4X faster than our previous example. I’m running this example on a 4-CPU ThinkPad. Note that there’s an overhead cost of starting a 10 process ThreadPool as opposed to just using the same process over and over. Also note that we’re also reusing our S3Connection here, since we’re using subprocesses and not threads per se.

B) Using threads (parallelism)

This solution will effectively spawn new threads of control, which can be quite expensive. Also important to note, we can’t reuse our S3 connection here since Boto’s library isn’t thread-safe, apparently:

AWS_KEY = "your_aws_key"
AWS_SECRET = "your_aws_secret"
 
from boto.s3.connection import S3Connection
import threading
 
filenames = ['1.json', '2.json', '3.json', '4.json', '5.json', '6.json', '7.json', '8.json', '9.json', '10.json']
def upload(myfile):
    conn = S3Connection(aws_access_key_id=AWS_KEY, aws_secret_access_key=AWS_SECRET)
    bucket = conn.get_bucket("parallel_upload_tests")
    key = bucket.new_key(myfile).set_contents_from_string('some content')
    return myfile
 
for fname in filenames:
    t = threading.Thread(target = upload, args=(fname,)).start()

Execution time? 0.018 seconds, about 72X faster than our original script. Not bad at all – but don’t forget, we’re creating 10 threads here, uploading the files in parallel. Your threads will automatically die when the uploads finish, “when its run() method terminate” according to the docs. Please keep in mind that if have tons of files to upload at once, this might not be the best approach – on this topic, here’s a good discussion on How Many Threads is Too Many?