Documentation for method throttle assembled from the following pages:

Class: Supply §

From Supply

(Supply) method throttle §

Defined as

multi method throttle(Supply:D:
      Int()  $elems,
      Real() $seconds,
      Real() $delay  = 0,
      :$scheduler    = $*SCHEDULER,
      :$control,
      :$status,
      :$bleed,
      :$vent-at,
    )
multi method throttle(Supply:D:
      Int()  $elems,
      Callable:D $process,
      Real() $delay = 0,
      :$scheduler   = $*SCHEDULER,
      :$control,
      :$status,
      :$bleed,
      :$vent-at,
    )

Arguments to .throttle are defined as follows:

Argument Meaning
$limit, values / time or simultaneous processing
$seconds or $process time-unit / code to process simultaneously
$delay = 0, initial delay before starting, in seconds
:$control, supply to emit control messages on (optional)
:$status, supply to tap status messages from (optional)
:$bleed, supply to bleed messages to (optional)
:$vent-at, bleed when so many buffered (optional)
:$scheduler, scheduler to use, default $*SCHEDULER

This method produces a Supply from a given one, but makes sure the number of messages passed through is limited.

It has two modes of operation: per time-unit or by maximum number of executions of a block of code: this is determined by the type of the second positional parameter.

The first positional parameter specifies the limit that should be applied.

If the second positional parameter is a Callable, then the limit indicates the maximum number of parallel processes executing the Callable, which is given the value that was received. The emitted values in this case will be the Promises that were obtained from starting the Callable.

If the second positional parameter is a real number, it is interpreted as the time-unit (in seconds). If you specify .1 as the value, then it makes sure you don't exceed the limit for every tenth of a second.

If the limit is exceeded, then incoming messages are buffered until there is room to pass on / execute the Callable again.

The third positional parameter is optional: it indicates the number of seconds the throttle will wait before passing on any values.

The :control named parameter optionally specifies a Supply that you can use to control the throttle while it is in operation. Messages that can be sent, are strings in the form of "key:value". Please see below for the types of messages that you can send to control the throttle.

The :status named parameter optionally specifies a Supply that will receive any status messages. If specified, it will at least send one status message after the original Supply is exhausted. See status message below.

The :bleed named parameter optionally specifies a Supply that will receive any values that were either explicitly bled (with the bleed control message), or automatically bled (if there's a vent-at active).

The :vent-at named parameter indicates the number of values that may be buffered before any additional value will be routed to the :bleed Supply. Defaults to 0 if not specified (causing no automatic bleeding to happen). Only makes sense if a :bleed Supply has also been specified.

The :scheduler named parameter indicates the scheduler to be used. Defaults to $*SCHEDULER.

control messages §

These messages can be sent to the :control Supply. A control message consists of a string of the form "key: value", e.g. "limit: 4".

  • limit

Change the number of messages (as initially given in the first positional) to the value given.

  • bleed

Route the given number of buffered messages to the :bleed Supply.

  • vent-at

Change the maximum number of buffered values before automatic bleeding takes place. If the value is lower than before, will cause immediate rerouting of buffered values to match the new maximum.

  • status

Send a status message to the :status Supply with the given id.

status message §

The status return message is a hash with the following keys:

  • allowed

The current number of messages / callables that is still allowed to be passed / executed.

  • bled

The number of messages routed to the :bleed Supply.

  • buffered

The number of messages currently buffered because of overflow.

  • emitted

The number of messages emitted (passed through).

  • id

The id of this status message (a monotonically increasing number). Handy if you want to log status messages.

  • limit

The current limit that is being applied.

  • vent-at

The maximum number of messages that may be buffered before they're automatically re-routed to the :bleed Supply.

Examples §

Have a simple piece of code announce when it starts running asynchronously, wait a random amount of time, then announce when it is done. Do this 6 times, but don't let more than 3 of them run simultaneously.

my $s = Supply.from-list(^6);  # set up supply 
my $t = $s.throttle: 3,        # only allow 3 at a time 
{                              # code block to run 
    say "running $_";          # announce we've started 
    sleep rand;                # wait some random time 
    say "done $_"              # announce we're done 
}                              # don't need ; because } at end of line 
$t.wait;                       # wait for the supply to be done

and the result of one run will be:

running 0
running 1
running 2
done 2
running 3
done 1
running 4
done 4
running 5
done 0
done 3
done 5