Monday, February 16, 2015
Timing out Lua coroutines
Time for “that's for another post”—this time, handling timeouts in Lua coroutines.
So, I have this Lua coroutine running, and I want to make a DNS request:
host = dns.request("www.conman.org.","a")
It takes a non-trivial amount of time—time that could be used to run other coroutines. But the operation could take longer than expected (say, two seconds, one second longer than we want to wait) or never even complete (because we lost the request packet, or the reply packet). In that case, we want to timeout the operation.
Step one—let's set a limit to how long we can run:
timeout(1) -- time out in one second host,err = dns.request("www.conman.org.","a") -- DNS query timeout(0) -- no longer need the timeout
The trick now is to implement the code behind timeout()
.
First we need to have some way of storing the coroutines (technically, a
reference to the coroutine) that are waiting to timeout, and some easy way
to determine which ones are timed out. For this, I used a binary
heap, or techically, a “binary min heap” to store the coroutine
reference. The “min” variation because the nodes are ordered by the
“awake” time, and times in the future are larger (and thus, the next
coroutine that will timeout next will always appear in the first spot in the
binary min heap).
Along with the awake value and the coroutine reference, I have a trigger flag. This flag is important for the “cancelling a timeout” case. An earlier version of the code searched sequentially through the list, making “cancelling” an expensive operation (O(n)) compared to “setting” a timeout (O(log(n))). I decided it was easier to just have a flag, and keep a secondary index, keyed by coroutine, to the correct node in the binary min tree. That way, cancelling a timeout is cheap (an amortized O(1)) with the following code:
function timeout(when) local co = coroutine.running() if when == 0 then if TQUEUE[co] then -- guard to make sure we have an entry TQUEUE[co].trigger = false -- disarm timeout trigger end else TQUEUE:insert(when,co) -- otherwise, set the timeout end end
(I should note that the code I
presented last Thursday was buggy—I realized I wasn't keeping the
invariant condition necessary for a binary min heap (child nodes have a
larger value than the parent node) by setting the awake
field
to 0 (a child would then have a smaller value than its parent)—it didn't
break the code but it did make it behave a bit oddly)
I also maintain a run queue—a list of coroutines that are ready to run, used in the main loop:
function mainloop() local timeout = -1 -- the timeout to wait for new packets local now = clock.get() while #TQUEUE > 0 do -- check timeout queue local obj = TQUEUE[1] -- next coroutine to timeout if obj.trigger then -- do we trigger? timeout = obj.awake - now -- if so, when if timeout > 0 then -- if now now (or before) break -- stop looking through the timeout queue end table.insert(RQUEUE,obj) -- insert coroutine into the run queue end TQUEUE:delete() -- delete the entry from the timeout queue end if #RQUEUE > 0 then -- if there's anything in the run queue timeout = 0 -- we set our timeout end -- --------------------------------------------------- -- Receive packets and process---see this post for some details; the actual -- code this isn't -- --------------------------------------------------- for _,event in ipairs(poll:events(timeout)) do event.obj(event) end -- ----------------------------------------------------------- -- run each coroutine in the run queue (some details snipped) -- ----------------------------------------------------------- while #RQUEUE > 0 do local obj = table.remove(RQUEUE,1) if coroutine.status(obj.co) == 'suspended' then coroutine.resume(obj.co) end end return mainloop() end
The code initially sets the timeout to wait for network activity to
-1
, or “wait indefinitely.” It then runs through the timeout
queue, checking each item to see if there's a coroutine that's triggered to
run. If it comes across a coroutine that has been “disarmed” then it's
simply ignored and deleted from the timeout queue. Otherwise, it checks to
see if the awake time is still in the future and if so, it's done with the
timeout queue; otherwise, it moves the coroutine reference to the run queue
and deletes it from the timeout queue.
Eventually, it'll get out of tha loop with a timeout of “indefinite” (if there were no coroutines waiting for a timeout) or some fixed amount of time until the next coroutine times out. It then checks the run queue, and if there is anything in that list, it sets the network activity timeout to 0 (if there is no packets, return immedately).
It then checks for network packets and for each one, process the packet (which in turn might schedule more coroutines to run). After that, it then goes through the run queue, resuming each coroutine in the list. After that, it starts over again.