This first edition was written for Lua 5.0. While still largely relevant for later versions, there are some differences.
The third edition targets Lua 5.2 and is available at Amazon and other bookstores.
By buying the book, you also help to support the Lua project.


9.4 – Non-Preemptive Multithreading

As we saw earlier, coroutines are a kind of collaborative multithreading. Each coroutine is equivalent to a thread. A pair yield-resume switches control from one thread to another. However, unlike "real" multithreading, coroutines are non preemptive. While a coroutine is running, it cannot be stopped from the outside. It only suspends execution when it explicitly requests so (through a call to yield). For several applications this is not a problem, quite the opposite. Programming is much easier in the absence of preemption. You do not need to be paranoid about synchronization bugs, because all synchronization among threads is explicit in the program. You only have to ensure that a coroutine only yields when it is outside a critical region.

However, with non-preemptive multithreading, whenever any thread calls a blocking operation, the whole program blocks until the operation completes. For most applications, this is an unacceptable behavior, which leads many programmers to disregard coroutines as a real alternative to conventional multithreading. As we will see here, that problem has an interesting (and obvious, with hindsight) solution.

Let us assume a typical multithreading situation: We want to download several remote files through HTTP. Of course, to download several remote files, we must know how to download one remote file. In this example, we will use the LuaSocket library, developed by Diego Nehab. To download a file, we must open a connection to its site, send a request to the file, receive the file (in blocks), and close the connection. In Lua, we can write this task as follows. First, we load the LuaSocket library:

    require "luasocket"

Then, we define the host and the file we want to download. In this example, we will download the HTML 3.2 Reference Specification from the World Wide Web Consortium site:

    host = "www.w3.org"
    file = "/TR/REC-html32.html"

Then, we open a TCP connection to port 80 (the standard port for HTTP connections) of that site:

    c = assert(socket.connect(host, 80))
The operation returns a connection object, which we use to send the file request:
    c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")

The receive method always returns a string with what it read plus another string with the status of the operation. When the host closes the connection we break the receive loop.

Finally, we close the connection:

    c:close()

Now that we know how to download one file, let us return to the problem of downloading several files. The trivial approach is to download one at a time. However, this sequential approach, where we only start reading a file after finishing the previous one, is too slow. When reading a remote file, a program spends most of its time waiting for data to arrive. More specifically, it spends most of its time blocked in the call to receive. So, the program could run much faster if it downloaded all files simultaneously. Then, while a connection has no data available, the program can read from another connection. Clearly, coroutines offer a convenient way to structure those simultaneous downloads. We create a new thread for each download task. When a thread has no data available, it yields control to a simple dispatcher, which invokes another thread.

To rewrite the program with coroutines, let us first rewrite the previous download code as a function:

    function download (host, file)
      local c = assert(socket.connect(host, 80))
      local count = 0    -- counts number of bytes read
      c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
      while true do
        local s, status = receive(c)
        count = count + string.len(s)
        if status == "closed" then break end
      end
      c:close()
      print(file, count)
    end
Because we are not interested in the remote file contents, this function only counts the file size, instead of writing the file to the standard output. (With several threads reading several files, the output would intermix all files.) In this new code, we use an auxiliary function (receive) to receive data from the connection. In the sequential approach, its code would be like this:
    function receive (connection)
      return connection:receive(2^10)
    end
For the concurrent implementation, this function must receive data without blocking. Instead, if there is not enough data available, it yields. The new code is like this:
    function receive (connection)
      connection:timeout(0)   -- do not block
      local s, status = connection:receive(2^10)
      if status == "timeout" then
        coroutine.yield(connection)
      end
      return s, status
    end
The call to timeout(0) makes any operation over the connection a non-blocking operation. When the operation status is "timeout", it means that the operation returned without completion. In this case, the thread yields. The non-false argument passed to yield signals to the dispatcher that the thread is still performing its task. (Later we will see another version where the dispatcher needs the timed-out connection.) Notice that, even in case of a timeout, the connection returns what it read until the timeout, so receive always returns s to its caller.

The next function ensures that each download runs in an individual thread:

    threads = {}    -- list of all live threads
    function get (host, file)
      -- create coroutine
      local co = coroutine.create(function ()
        download(host, file)
      end)
      -- insert it in the list
      table.insert(threads, co)
    end
The table threads keeps a list of all live threads, for the dispatcher.

The dispatcher is simple. It is mainly a loop that goes through all threads, calling one by one. It must also remove from the list the threads that finish their tasks. It stops the loop when there are no more threads to run:

    function dispatcher ()
      while true do
        local n = table.getn(threads)
        if n == 0 then break end   -- no more threads to run
        for i=1,n do
          local status, res = coroutine.resume(threads[i])
          if not res then    -- thread finished its task?
            table.remove(threads, i)
            break
          end
        end
      end
    end

Finally, the main program creates the threads it needs and calls the dispatcher. For instance, to download four documents from the W3C site, the main program could be like this:

    host = "www.w3.org"
    
    get(host, "/TR/html401/html40.txt")
    get(host,"/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
    get(host,"/TR/REC-html32.html")
    get(host,
        "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")
    
    dispatcher()   -- main loop
My machine takes six seconds to download those four files using coroutines. With the sequential implementation, it takes more than twice that time (15 seconds).

Despite the speedup, this last implementation is far from optimal. Everything goes fine while at least one thread has something to read. However, when no thread has data to read, the dispatcher does a busy wait, going from thread to thread only to check that they still have no data. As a result, this coroutine implementation uses almost 30 times more CPU than the sequential solution.

To avoid this behavior, we can use the select function from LuaSocket. It allows a program to block while waiting for a status change in a group of sockets. The changes in our implementation are small. We only have to change the dispatcher. The new version is like this:

    function dispatcher ()
      while true do
        local n = table.getn(threads)
        if n == 0 then break end   -- no more threads to run
        local connections = {}
        for i=1,n do
          local status, res = coroutine.resume(threads[i])
          if not res then    -- thread finished its task?
            table.remove(threads, i)
            break
          else    -- timeout
            table.insert(connections, res)
          end
        end
        if table.getn(connections) == n then
          socket.select(connections)
        end
      end
    end
Along the inner loop, this new dispatcher collects the timed-out connections in table connections. Remember that receive passes such connections to yield; thus resume returns them. When all connections time out, the dispatcher calls select to wait for any of those connections to change status. This final implementation runs as fast as the first implementation with coroutines. Moreover, as it does no busy waits, it uses just a little more CPU than the sequential implementation.