The Python Oracle

Tweaking celery for high performance

--------------------------------------------------
Rise to the top 3% as a developer or hire one of them at Toptal: https://topt.al/25cXVn
--------------------------------------------------

Music by Eric Matyas
https://www.soundimage.org
Track title: Hypnotic Puzzle2

--

Chapters
00:00 Tweaking Celery For High Performance
01:53 Answer 1 Score 1
03:44 Answer 2 Score 7
04:04 Accepted Answer Score 4
04:31 Thank you

--

Full question
https://stackoverflow.com/questions/1884...

--

Content licensed under CC BY-SA
https://meta.stackexchange.com/help/lice...

--

Tags
#python #django #performance #rabbitmq #celery

#avk47



ANSWER 1

Score 7


Are you really running on Windows 8 without a Virtual Machine? I did the following simple test on 2 Core Macbook 8GB RAM running OS X 10.7:

import celery
from time import time

@celery.task
def test_task(i):
    return i

grp = celery.group(test_task.s(i) for i in range(400))
tic1 = time(); res = grp(); tac1 = time()
print 'queued in', tac1 - tic1
tic2 = time(); vals = res.get(); tac2 = time()
print 'executed in', tac2 - tic2

I'm using Redis as broker, Postgres as a result backend and default worker with --concurrency=4. Guess what is the output? Here it is:

queued in 3.5009469986

executed in 2.99818301201




ACCEPTED ANSWER

Score 4


Well it turnes out I had 2 separate issues.

First off, the task was a member method. After extracting it out of the class, the time went down to about 12 seconds. I can only assume it has something to do with the pickling of self.

The second thing was the fact that it ran on windows. After running it on my linux machine, the run time was less than 2 seconds. Guess windows just isn't cut for high performance..




ANSWER 3

Score 1


How about using twisted instead? You can reach for much simpler application structure. You can send all 400 requests from the django process at once and wait for all of them to finish. This works simultaneously because twisted sets the sockets into non-blocking mode and only reads the data when its available.

I had a similar problem a while ago and I've developed a nice bridge between twisted and django. I'm running it in production environment for almost a year now. You can find it here: https://github.com/kowalski/featdjango/. In simple words it has the main application thread running the main twisted reactor loop and the django view results is delegated to a thread. It use a special threadpool, which exposes methods to interact with reactor and use its asynchronous capabilities.

If you use it, your code would look like this:

from twisted.internet import defer
from twisted.web.client import getPage

import threading


def get_reports(self, urls, *args, **kw):
    ct = threading.current_thread()

    defers = list()
    for url in urls:
        # here the Deferred is created which will fire when
        # the call is complete
        d = ct.call_async(getPage, args=[url] + args, kwargs=kw)
        # here we keep it for reference
        defers.append(d)

    # here we create a Deferred which will fire when all the
    # consiting Deferreds are completed
    deferred_list = defer.DeferredList(defers, consumeErrors=True)
    # here we tell the current thread to wait until we are done
    results = ct.wait_for_defer(deferred_list)

    # the results is a list of the form (C{bool} success flag, result)
    # below unpack it
    reports = list()
    for success, result in results:
        if success:
            reports.append(result)
        else:
            # here handle the failure, or just ignore
            pass

    return reports

This still is something you can optimize a lot. Here, every call to getPage() would create a separate TCP connection and close it when its done. This is as optimal as it can be, providing that each of your 400 requests is sent to a different host. If this is not a case, you can use a http connection pool, which uses persistent connections and http pipelineing. You instantiate it like this:

from feat.web import httpclient

pool = httpclient.ConnectionPool(host, port, maximum_connections=3)

Than a single request is perform like this (this goes instead the getPage() call):

d = ct.call_async(pool.request, args=(method, path, headers, body))