This first edition was written for Lua 5.0. While still largely relevant for later versions, there are some differences.
The fourth edition targets Lua 5.3 and is available at Amazon and other bookstores.
By buying the book, you also help to support the Lua project.


9.2 – Pipes and Filters

One of the most paradigmatic examples of coroutines is in the producer-consumer problem. Let us suppose that we have a function that continually produces values (e.g., reading them from a file) and another function that continually consumes these values (e.g., writing them to another file). Typically, these two functions look like this:

    function producer ()
      while true do
        local x = io.read()     -- produce new value
        send(x)                 -- send to consumer
      end
    end
    
    function consumer ()
      while true do
        local x = receive()        -- receive from producer
        io.write(x, "\n")          -- consume new value
      end
    end
(In that implementation, both the producer and the consumer run forever. It is an easy task to change them to stop when there is no more data to be handled.) The problem here is how to match send with receive. It is a typical case of a who-has-the-main-loop problem. Both the producer and the consumer are active, both have their own main loops, and both assume that the other is a callable service. For this particular example, it is easy to change the structure of one of the functions, unrolling its loop and making it a passive agent. However, this change of structure may be far from easy in other real scenarios.

Coroutines provide an ideal tool to match producers and consumers, because a resume-yield pair turns upside-down the typical relationship between caller and callee. When a coroutine calls yield, it does not enter into a new function; instead, it returns a pending call (to resume). Similarly, a call to resume does not start a new function, but returns a call to yield. This property is exactly what we need to match a send with a receive in such a way that each one acts as if it were the master and the other the slave. So, receive resumes the producer so that it can produce a new value; and send yields the new value back to the consumer:

    function receive ()
      local status, value = coroutine.resume(producer)
      return value
    end
    
    function send (x)
      coroutine.yield(x)
    end
Of course, the producer must now be a coroutine:
    producer = coroutine.create(
      function ()
        while true do
        local x = io.read()     -- produce new value
          send(x)
        end
      end)
In this design, the program starts calling the consumer. When the consumer needs an item, it resumes the producer, which runs until it has an item to give to the consumer, and then stops until the consumer restarts it again. Therefore, we have what we call a consumer-driven design.

We can extend this design with filters, which are tasks that sit between the producer and the consumer doing some kind of transformation in the data. A filter is a consumer and a producer at the same time, so it resumes a producer to get new values and yields the transformed values to a consumer. As a trivial example, we can add to our previous code a filter that inserts a line number at the beginning of each line. The complete code would be like this:

    function receive (prod)
      local status, value = coroutine.resume(prod)
      return value
    end
    
    function send (x)
      coroutine.yield(x)
    end
    
    function producer ()
      return coroutine.create(function ()
        while true do
          local x = io.read()     -- produce new value
          send(x)
        end
      end)
    end
    
    function filter (prod)
      return coroutine.create(function ()
        local line = 1
        while true do
          local x = receive(prod)   -- get new value
          x = string.format("%5d %s", line, x)
          send(x)      -- send it to consumer
          line = line + 1
        end
      end)
    end
    
    function consumer (prod)
      while true do
        local x = receive(prod)   -- get new value
        io.write(x, "\n")          -- consume new value
      end
    end
The final bit simply creates the components it needs, connects them, and starts the final consumer:
    p = producer()
    f = filter(p)
    consumer(f)
Or better yet:
    consumer(filter(producer()))

If you thought about Unix pipes after reading the previous example, you are not alone. After all, coroutines are a kind of (non-preemptive) multithreading. While in pipes each task runs in a separate process, with coroutines each task runs in a separate coroutine. Pipes provide a buffer between the writer (producer) and the reader (consumer) so there is some freedom in their relative speeds. This is important in the context of pipes, because the cost of switching between processes is high. With coroutines, the cost of switching between tasks is much smaller (roughly the same cost of a function call), so the writer and the reader can go hand in hand.