Successor, Times, Merge, and Hamming Streams Using Threads

This example illustrates the use and behavior of stream-based or demand-driven execution. For some motivation on this subject see this. A particular function F might need several tokens from another function to complete its task. That function might need several from a third function and so on. Rather than have each produce all tokens and supply the results to F, it is sometimes better to let F demand tokens as needed, and let the other functions supply tokens when demanded. This works especially well when the functions involved generate infinitely many tokens. All of the five examples below are in this category.

To accomodate stream-based computation, we develop a Stream class that opens stream-based communication between stream objects. All communicating objects using the abstract Stream class must extend that class. This means implementing the following methods and variable definitions (items between square brackets are optional):

     [Stream from1, from2, ...];
     ...
     public ClassName (type parm, ... , [Stream s1, Stream s2, ...] )
     {
        [do something with parm (e.g. see class Successor)]
	[from1 = s1;  from2 = s2; ... (e.g. see class Merge)]
     }

     public void run ()
     {
        [do some initial setup such as build a stream (e.g. see class Hamming)]
        [use from1.next() ... to demand a token from from1 ... (e.g. see class Merge)]
        [build a token object and use putIt(token_object) to put
         token_object to the parent stream (e.g. see class Hamming)]
        while ( < some condition, usually true > )
        {
           [use from1.next() ... to demand a token from stream from1 ...]
           build a token_object [possibly from from1 ... tokens (e.g. see class Hamming)]
           use putIt(token_object) to send a token to the parent stream
        }
     }

As an example of how functions get hooked to streams, consider hooking into a simple token producer called Successor. The Successor function produces all integers, in increasing order, starting from the one specified when a Successor object is created. We will write code to demand integers from a Successor object and print them to the screen. The code for hooking things up looks like this:

      Stream succ = new Successor(1);

To start demanding tokens from the Successor object use this:

      succ.next();

This is used to demand the next tokens as well as the first one. The return value of succ.next() is an Object that may be used in some fashion. For example, if the Object is an IntObject then we may write:

      Object object = succ.next();
      System.out.println(((IntObject)(object)).valueOf()+" ");

to print the value of the integer object to the console.

The five Stream extensions in this example are the following:

Console <- Successor

      Stream succ = new Successor(1);
      ...
      while (true)
         System.out.println(((IntObject)(succ.next())).valueOf()+" ");
      ...
which produces the output 1 2 3 4 5 6 ....

Console <- Times 3 <- Successor

      Stream times = new Times(3, new Successor(1));
      ...
      while (true)
         System.out.println(((IntObject)(times.next())).valueOf()+" ");
      ...
which multiplies the tokens of succ by 3 to get the output stream of 3 6 9 12 15 .... A second such stream, where the multiplier is 5, is the third example. Its output is 5 10 15 20 25 ....

Console <- Merge <- Times 3 and Times 5 <- Successor

      Stream times1 = new Times(3, new Successor(1));
      Stream times2 = new Times(5, new Successor(1));
      Stream merge = new Merge(times1, times2);
      ...
      while (true)
         System.out.println(((IntObject)(merge.next())).valueOf()+" ");
      ...
This example is interesting because two streams are converging on one, namely times1 and times2 converge on merge. The output is 3 5 6 9 10 12 15 15 18 ....

Console <- Hamming (Set of Prime Numbers)

      int primes[] = new int[3];
      primes[0] = 11;
      primes[1] = 5;
      primes[2] = 3;
      Stream hamming = new Hamming(primes, 2);
      ...
      while (true)
         System.out.println(((IntObject)(hamming.next())).valueOf()+" ");
      ...
This is the most complicated case of all. The Hamming problem is, given a set P of prime numbers, output all integers whose prime factors are a subset of P. The prime numbers may be given as an increasing list and the output may be in increasing order as well. If, for example, the primes are 3 5 11, the output is 3 5 9 11 15 27 33 45 55 75 ....

A generate-and-test solution is impractical. Try it. The following stream-based approach works nicely. Split the desired stream into three parts: one stream containing no occurrences of the lowest prime number, one stream containing all integers that have the lowest prime as factor excluding the lowest prime, and the lowest prime. Then the output stream is the concatenation of the lowest prime with the merge of the two other streams. But each of the other streams can be written in terms of a hamming sequence.

Let primes be an array of prime numbers in decreasing order. Let index be the index of array primes containing the least number in the array. Let Hamming(primes, index) denote the hamming sequence for primes list primes. Let Times(multiplier, instream) produce the stream obtained by multiplying all integers in instream by the number multiplier. Then the first of the two streams to merge is Hamming(primes, index-1) and the second stream is Times(primes[index], Hamming(primes, index)) (that is, the original hamming sequence with all numbers multiplied by the lowest valued prime in the given array primes). Thus, the Hamming class is written using the following code:

 
      Stream s = new Times(primes[index], new Hamming(primes, index));
      if (index != 0) s = new Merge(s, new Hamming(primes, index-1));