"""Lowest-common-denominator implementations of platform functionality.""" from __future__ import absolute_import, division, print_function, with_statement import errno import socket from tornado.platform import interface class Waker(interface.Waker): """Create an OS independent asynchronous pipe. For use on platforms that don't have os.pipe() (or where pipes cannot be passed to select()), but do have sockets. This includes Windows and Jython. """ def __init__(self): # Based on Zope async.py: http://svn.zope.org/zc.ngi/trunk/src/zc/ngi/async.py self.writer = socket.socket() # Disable buffering -- pulling the trigger sends 1 byte, # and we want that sent immediately, to wake up ASAP. self.writer.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) count = 0 while 1: count += 1 # Bind to a local port; for efficiency, let the OS pick # a free port for us. # Unfortunately, stress tests showed that we may not # be able to connect to that port ("Address already in # use") despite that the OS picked it. This appears # to be a race bug in the Windows socket implementation. # So we loop until a connect() succeeds (almost always # on the first try). See the long thread at # http://mail.zope.org/pipermail/zope/2005-July/160433.html # for hideous details. a = socket.socket() a.bind(("127.0.0.1", 0)) a.listen(1) connect_address = a.getsockname() # assigned (host, port) pair try: self.writer.connect(connect_address) break # success except socket.error as detail: if (not hasattr(errno, 'WSAEADDRINUSE') or detail[0] != errno.WSAEADDRINUSE): # "Address already in use" is the only error # I've seen on two WinXP Pro SP2 boxes, under # Pythons 2.3.5 and 2.4.1. raise # (10048, 'Address already in use') # assert count <= 2 # never triggered in Tim's tests if count >= 10: # I've never seen it go above 2 a.close() self.writer.close() raise socket.error("Cannot bind trigger!") # Close `a` and try again. Note: I originally put a short # sleep() here, but it didn't appear to help or hurt. a.close() self.reader, addr = a.accept() self.reader.setblocking(0) self.writer.setblocking(0) a.close() self.reader_fd = self.reader.fileno() def fileno(self): return self.reader.fileno() def write_fileno(self): return self.writer.fileno() def wake(self): try: self.writer.send(b"x") except (IOError, socket.error): pass def consume(self): try: while True: result = self.reader.recv(1024) if not result: break except (IOError, socket.error): pass def close(self): self.reader.close() self.writer.close()