How Event Loops work (the ReactPHP case)

0

If this is the first time that you hear the term “Event Loop” used alongside the word PHP, then you are, probably, not familiar with the Asynchronous programming advances in the PHP world, at least for the last 2-3 years. That’s ok, but there are some really interesting things going on there that allow us to do asynchronous I/O. Event-loops is one out of many approaches that are being used for that purpose. Some implementations of this approach are ReactPHP, RatchetPHP (based on ReactPHP) and Icicle.

I am not going to cover here what is asynchronous programming. I already assume that you are familiar with the concept. This blog post aims to explain how an event loop really works. I am sure that there are quite a few posts and articles out there but I haven’t managed to find out something that really helps you figure out the inner workings of the event loop. To be honest, the reason is that you cannot really master the concept of the event loop if you don’t try to dive into the event loop code. So, that’s what I will try to do. I’ll try to explain you the event loop by gently walking you through the code of an event loop implementation. Not all of the code! Just a small part. The core part.

There are many event loop implementations out there but the basic principles are always the same. In this blog post I will use the ReactPHP case. According to its site, ReactPHP is “a low-level library for event-driven programming in PHP”. Event-driven mainly refers to I/O events but not only. To my knowledge, ReactPHP is about 5 years old but event-driven programming is becoming more and more popular in the PHP world the last 2 years.

We will start with a small example of how we can use an event loop without doing any I/O at all. It may sounds strange but it is really helpful. So, we will assume that we want our application to echo something every 1 second. This is the code of our application:

$loop = React\EventLoop\Factory::create();

$loop->addPeriodicTimer(1, function () {
    echo "Tick\n";
});

$loop->run();

We use two things. The event loop and a timer. As you can imagine a timer represents a time-scheduled periodic event that we want to happen. Something like an alarm clock. In this case, we want to execute this callback function (passed as the second argument) every 1 second (passed as the first argument). So, we just instantiate an event loop, we add a periodic timer and then put the loop running. Keep in mind that anything that we add to the loop it will remain inactive until we put the loop running.

The instantiation of the loop takes place in a very simple factory like this:

namespace React\EventLoop;

class Factory
{
    public static function create()
    {
        // @codeCoverageIgnoreStart
        if (function_exists('event_base_new')) {
            return new LibEventLoop();
        } elseif (class_exists('libev\EventLoop', false)) {
            return new LibEvLoop;
        } elseif (class_exists('EventBase', false)) {
            return new ExtEventLoop;
        }

        return new StreamSelectLoop();
        // @codeCoverageIgnoreEnd
    }
}

As you can see, the ReactPHP supports many event-loop implementations. Which will be chosen it depends on the libraries and extensions that are available in PHP. Let’s make our life easier and look into the most transparent code flow, the StreamSelectLoop. This is a fallback implementation of the event loop but is the only one for which we have the code easily accessible. And so, it is a good choice for educational purposes. The definition and construction of this event loop implementation is quite trivial.

namespace React\EventLoop;

use React\EventLoop\Tick\FutureTickQueue;
use React\EventLoop\Tick\NextTickQueue;
use React\EventLoop\Timer\Timer;
use React\EventLoop\Timer\TimerInterface;
use React\EventLoop\Timer\Timers;

/**
 * A stream_select() based event-loop.
 */
class StreamSelectLoop implements LoopInterface
{
    const MICROSECONDS_PER_SECOND = 1000000;

    private $nextTickQueue;
    private $futureTickQueue;
    private $timers;
    private $readStreams = [];
    private $readListeners = [];
    private $writeStreams = [];
    private $writeListeners = [];
    private $running;

    public function __construct()
    {
        $this->nextTickQueue = new NextTickQueue($this);
        $this->futureTickQueue = new FutureTickQueue($this);
        $this->timers = new Timers();
    }
    // The rest of the class
    // ...
}

In our example, we will just using a timer, so we can ignore the first two lines of the constructor. These two lines are handling I/O streams and we don’t have any, yet. The Timers property is where the Timer instances are placed whenever we add one to our loop. Let’s check out the Timers constructor:

class Timers
{
    private $time;
    private $timers;
    private $scheduler;

    public function __construct()
    {
        $this->timers = new SplObjectStorage();
        $this->scheduler = new SplPriorityQueue();
    }

It contains an object map (think of it as an associative array where the key is an object) and a priority queue (a queue where alongside the data value you always pass a priority value related to the data. All data are ordered based on the priority value). So, the map contains rows of:

[$timer => $theNextTimeThisTimeShouldBeEnabled]

and the queue value pairs of:

($timer , $priority)

Every time a Timer instance is added to Timers, this instance is being added to the map and the queue. The reason we have this queue is to know in which order we should check the timers in every event loop iteration (also called a “tick” – we will get back into this, soon).

So, the code that adds a timer to the event loop (the StreamSelectLoop, but I will be using the generic term most of the times) is:

public function addPeriodicTimer($interval, callable $callback)
{
    $timer = new Timer($this, $interval, $callback, true);

    $this->timers->add($timer);

    return $timer;
}

which, as you can see, calls the add method of Timers class :

public function add(TimerInterface $timer)
{
    $interval = $timer->getInterval();
    $scheduledAt = $interval + microtime(true);

    $this->timers->attach($timer, $scheduledAt);
    $this->scheduler->insert($timer, -$scheduledAt);
}

See ? Nothing more than we have already said. The Timer is just a data structure that contains information about the timer. Nothing really more. We added it to the event loop and immediately the next time that the timer needs to go off is calculated.

The next step is to see how the event loop runs. Let’s describe this in words before I show you the code. To be precise I am going to describe the simplified behavior of the event loop that is related to our basic example. The behavior when there are only timers assigned to the event loop.

So, running the event loop means executing an infinite loop. In every iteration the following actions take place:

1. The time is updated (we get the time using the microtime() function).

2. For each attached timer (visiting by order of highest to lowest priority)

If it is not time yet to go off, exit the loop

If it is time to go off

– remove it from the scheduler queue
– execute the callback of this timer
– if the timer is periodic, recalculate the next time it needs to be activated and put it back to the scheduler queue, otherwise remove it from the map, too.

3. If there are still timers attached, retrieve the one on the top of the queue (the one that should be activated earlier than the others) and check how much time remains until its activation. If this time is greater than zero, let’s say t seconds, then we know that for the next t seconds the loop will be running continuously without doing nothing. This will overload the CPU and we don’t want to let it happen. So, we should “freeze” the execution for t seconds. We do this by calling the stream_select() function and using a timeout parameter equal to t seconds.

Of course, the use of stream_select function implies the existence of streams. After all the whole purpose of using the event loop is to do asynchronous I/O. But let’s not care about that right now. The important things for the moment is to understand that in our case stream_select introduces a “pause” to the execution of the script for t seconds.

Now let’s have a look a the code. This is the StreamSelectLoop code that implements the infinite loop (code related to I/O streams has been greyed out since we don’t have any in our example):

while ($this->running) {
        //$this->nextTickQueue->tick();

        //$this->futureTickQueue->tick();

        $this->timers->tick();

        // Next-tick or future-tick queues have pending callbacks ...
        //if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
        //    $timeout = 0;

        // There is a pending timer, only block until it is due ...
        } elseif ($scheduledAt = $this->timers->getFirst()) {
            $timeout = $scheduledAt - $this->timers->getTime();
            if ($timeout < 0) {
                $timeout = 0;
            } else {
                $timeout = round($timeout * self::MICROSECONDS_PER_SECOND);
            }

        // The only possible event is stream activity, so wait forever ...
        //} elseif ($this->readStreams || $this->writeStreams) {
            //$timeout = null;

        // There's nothing left to do ...
        //} else {
            //break;
        //}

        $this->waitForStreamActivity($timeout);
    }

The tick() method of the Timers class implements the step 2 we mentioned before and which iterates through all the timers attached to the event loop. The waitForStreamActivity() method is the one that uses the stream_select function with the appropriate timeout. The inners of waitForStreamActivity() are not that interesting in this case. We will visit it later when we will see the case of I/O streams. For our current case, the only thing we need to know is that it introduces a loop pause through the use of stream_select().

And this is the Timers class code that iterates through the list of attached timer objects:

public function tick()
{
    $time = $this->updateTime();
    $timers = $this->timers;
    $scheduler = $this->scheduler;

    while (!$scheduler->isEmpty()) {
        $timer = $scheduler->top();

        if (!isset($timers[$timer])) {
            $scheduler->extract();
            $timers->detach($timer);

            continue;
        }

        if ($timers[$timer] >= $time) {
            break;
        }

        $scheduler->extract();
        call_user_func($timer->getCallback(), $timer);

        if ($timer->isPeriodic() && isset($timers[$timer])) {
            $timers[$timer] = $scheduledAt = $timer->getInterval() + $time;
            $scheduler->insert($timer, -$scheduledAt);
        } else {
            $timers->detach($timer);
        }
    }
}

As you may think, if the execution of the callback of a timer takes too long there is a possibility the next (to go off) timer will not go off on time. In such a case, the execution of this timer will not be skipped but it will be impossible to keep its execution periodic. For that reason, we should be careful about the period we are using and the execution time of the periodic task.

Let’s recap the inner workings of the event loop. It is just an (almost) infinite loop that executes callbacks every time an event happens. In our trivial example, an event happens when the current system time equals the time when a timer should go off. The “tick” concept represents an iteration of the loop. Like a clock ticks. In every tick, we have to check all the timers (attached to the loop) for events.

For those who like class diagrams, the following one illustrates the main classes and properties/methods we have dealt with till this point:

Now, let’s assume that we have no timers but only I/O streams. As with timers, streams are also added hand-in-hand with a callback function (listener) that will be called when the stream changes state (this is when an event happens for streams). Adding a read stream to the loop means that an event will happen whenever a read attempt on that stream will not be blocked (so, there is data to read at the stream buffer). Adding a stream to the loop means adding the stream resource object to the $readStreams property and its callbacl/listener to the $readListeners property.

public function addReadStream($stream, callable $listener)
{
    $key = (int) $stream;

    if (!isset($this->readStreams[$key])) {
        $this->readStreams[$key] = $stream;
        $this->readListeners[$key] = $listener;
    }
}

The infinite loops runs as before but this time, since there are no timers and at least one read/write stream, the timeout used in stream_select() function is NULL because we want to pause the loop execution until a stream is ready for some action.

while ($this->running) {
    // $this->nextTickQueue->tick();

    // $this->futureTickQueue->tick();

    // $this->timers->tick();

    // Next-tick or future-tick queues have pending callbacks ...
    // if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
        //$timeout = 0;

    // There is a pending timer, only block until it is due ...
    //} elseif ($scheduledAt = $this->timers->getFirst()) {
    //    $timeout = $scheduledAt - $this->timers->getTime();
    //    if ($timeout < 0) {
    //        $timeout = 0;
    //    } else {
    //        $timeout = round($timeout * self::MICROSECONDS_PER_SECOND);
    //    }

    // The only possible event is stream activity, so wait forever ...
    } elseif ($this->readStreams || $this->writeStreams) {
        $timeout = null;

    // There's nothing left to do ...
    } //else {
        //break;
    //}

    $this->waitForStreamActivity($timeout);
}

You can see that if you look in the implementation of waitForStreamActivity() :

private function waitForStreamActivity($timeout)
{
    $read  = $this->readStreams;
    $write = $this->writeStreams;

    $available = $this->streamSelect($read, $write, $timeout);
    if (false === $available) {
        // if a system call has been interrupted,
        // we cannot rely on it's outcome
        return;
    }

    foreach ($read as $stream) {
        $key = (int) $stream;

        if (isset($this->readListeners[$key])) {
            call_user_func($this->readListeners[$key], $stream, $this);
        }
    }

    foreach ($write as $stream) {
        $key = (int) $stream;

        if (isset($this->writeListeners[$key])) {
            call_user_func($this->writeListeners[$key], $stream, $this);
        }
    }
}

The streamSelect() method is the one that calls the stream_select() function. It will return when there is a change of state for at least one stream. When this happens, we examine all the read streams to see which of them changed state and for those we execute their callback. The callback will usually be a function that reads data from the stream buffer and does something with this data. The some for the write streams. Change of state for them means that we can write data to their buffer and this is what their callback will usually do. Since one stream does not need to wait for the other and data are read/write whenever they are available the whole I/O happens asynchronously.