The Python Oracle

Python's asyncore to periodically send data using a variable timeout. Is there a better way?

--------------------------------------------------
Rise to the top 3% as a developer or hire one of them at Toptal: https://topt.al/25cXVn
--------------------------------------------------

Music by Eric Matyas
https://www.soundimage.org
Track title: Underwater World

--

Chapters
00:00 Python'S Asyncore To Periodically Send Data Using A Variable Timeout. Is There A Better Way?
01:32 Accepted Answer Score 4
02:34 Answer 2 Score 1
02:59 Answer 3 Score 4
03:26 Answer 4 Score 2
04:37 Thank you

--

Full question
https://stackoverflow.com/questions/1036...

--

Content licensed under CC BY-SA
https://meta.stackexchange.com/help/lice...

--

Tags
#python #sockets #asynchronous

#avk47



ACCEPTED ANSWER

Score 4


The "select law" doesn't apply to your case, as you have not only client-triggered (pure server) activities, but also time-triggered activities - this is precisely what the select timeout is for. What the law should really say is "if you specify a timeout, make sure you actually have to do something useful when the timeout arrives". The law is meant to protect against busy-waiting; your code does not busy-wait.

I would not set _timeout to the maximum of 0.1 and the next update time, but to the maximum of 0.0 and the next timeout. IOW, if an update period has expired while you were doing updates, you should do that specific update right away.

Instead of asking each channel every time whether it wants to update, you could store all channels in a priority queue (sorted by next-update time), and then only run update for the earliest channels (until you find one whose update time has not arrived). You can use the heapq module for that.

You can also save a few system calls by not having each channel ask for the current time, but only poll the current time once, and pass it to .update.




ANSWER 2

Score 4


Maybe you can do this with sched.scheduler, like this (n.b. not tested):

import sched, asyncore, time

# Create a scheduler with a delay function that calls asyncore.loop
scheduler = sched.scheduler(time.time, lambda t: _poll_loop(t, time.time()) )

# Add the update timeouts with scheduler.enter
# ...

def _poll_loop(timeout, start_time):  
  asyncore.loop(timeout, count=1)
  finish_time = time.time()
  timeleft = finish_time - start_time
  if timeleft > timeout:  # there was a message and the timeout delay is not finished
    _poll_loop(timeleft, finish_time) # so wait some more polling the socket

def main_loop():
  while True:
    if scheduler.empty():
      asyncore.loop(30.0, count=1) # just default timeout, use what suits you
      # add other work that might create scheduled events here
    else:
      scheduler.run()



ANSWER 3

Score 2


This is basically demiurgus' solution with the rough edges made round. It retains his basic idea, but prevents RuntimeErrors and busy loops and is tested. [Edit: resolved issues with modifying the scheduler during _delay]

class asynschedcore(sched.scheduler):
    """Combine sched.scheduler and asyncore.loop."""
    # On receiving a signal asyncore kindly restarts select. However the signal
    # handler might change the scheduler instance. This tunable determines the
    # maximum time in seconds to spend in asycore.loop before reexamining the
    # scheduler.
    maxloop = 30
    def __init__(self, map=None):
        sched.scheduler.__init__(self, time.time, self._delay)
        if map is None:
            self._asynmap = asyncore.socket_map
        else:
            self._asynmap = map
        self._abort_delay = False

    def _maybe_abort_delay(self):
        if not self._abort_delay:
            return False
        # Returning from this function causes the next event to be executed, so
        # it might be executed too early. This can be avoided by modifying the
        # head of the queue. Also note that enterabs sets _abort_delay to True.
        self.enterabs(0, 0, lambda:None, ())
        self._abort_delay = False
        return True

    def _delay(self, timeout):
        if self._maybe_abort_delay():
            return
        if 0 == timeout:
            # Should we support this hack, too?
            # asyncore.loop(0, map=self._asynmap, count=1)
            return
        now = time.time()
        finish = now + timeout
        while now < finish and self._asynmap:
            asyncore.loop(min(finish - now, self.maxloop), map=self._asynmap,
                          count=1)
            if self._maybe_abort_delay():
                return
            now = time.time()
        if now < finish:
            time.sleep(finish - now)

    def enterabs(self, abstime, priority, action, argument):
        # We might insert an event before the currently next event.
        self._abort_delay = True
        return sched.scheduler.enterabs(self, abstime, priority, action,
                                        argument)

    # Overwriting enter is not necessary, because it is implemented using enter.

    def cancel(self, event):
        # We might cancel the next event.
        self._abort_delay = True
        return sched.scheduler.cancel(self, event)

    def run(self):
        """Runs as long as either an event is scheduled or there are
        sockets in the map."""
        while True:
            if not self.empty():
                sched.scheduler.run(self)
            elif self._asynmap:
                asyncore.loop(self.maxloop, map=self._asynmap, count=1)
            else:
                break



ANSWER 4

Score 1


I would use Twisted, long time since I used asyncore but I think this should be the twisted equivalent (not tested, written from memory):

from twisted.internet import reactor, protocol
import time

UPDATE_PERIOD = 4.0

class MyClient(protocol.Protocol):

    def connectionMade(self):
        self.updateCall = reactor.callLater(UPDATE_PERIOD, self.update)

    def connectionLost(self, reason):
        self.updateCall.cancel()

    def update(self):
        self.transport.write("hello %f\n" % (time.time(),))

    def dataReceived(self, data):
        print "recv:", data


f = protocol.ServerFactory()
f.protocol = MyClient

reactor.listenTCP(9090, f)
reactor.run()