Streams in Java

When we use traditional languages, our minds are often completely shut off to the possibility of solutions that are natural in other languages such as Scheme. For example, the C language, with its difficulty handling processes (except through the UNIX operating system, perhaps) does not give us many good opportunities to employ the notion of streams, in the sense of Scheme.

Think of a stream as a sequence of tokens (strings, integers, etc.) where each token is not generated by a producer procedure until demanded by a consumer procedure. To illustrate the importance of streams we use an example from the DOS world. What happens on a DOS machine (version 5.0) when you issue a command such as

   type file.txt | more  ?

The output from the type command is generated and stored somewhere. Then the more command is executed on the stored data. What's wrong with this? If file.txt has more than 65,000 characters, it barfs. The reason is DOS cannot create a file that large to store the data that more takes as input.

Now suppose we view the more command as a consumer of tokens and the type command as a producer of tokens only upon demond from more. The more command goes into action first and requests the next token from type. Then type produces the next token, more grabs it, and outputs it to the screen. The cycle repeats until either the screen is full or type has no more tokens to produce. There is no DOS-like barfing in this case and the user sees exactly what is expected.

In this section we create a Stream class which facilitates the consumer-producer mechanism illustrated above. A Stream object S (an object of the Stream class) is regarded as a pair consisting of a Token object, representing the first token in S, and a Continuation, or a procedure, that is capable of producing all the remaining tokens in S. The definitions for producing S come from an object of another class. We call this object a guest and the corresponding Stream object its host.

A guest object satisfies one major requirement: the class from which it is defined must contain a method next that returns a Stream object. The prototype for the next method is given in the StreamObject interface which the guest object's class must implement.

   interface StreamObject
   {
      public Stream next ();
   }

The next method should be designed following a special style that will be illustrated by example later.

Associated with a Stream object are two methods called first and cont. The first method returns the saved Token object in the Stream pair. The cont method invokes the next method of the guest to produce another stream Token. The next method is designed to spit out that Token and suspend itself until the next request.

As an example, consider a Successor class that outputs all the integers starting at some specified one. The class may be defined as follows:

   class Successor
   {
      int number;

      public Successor (int n) { number = n; }

      public void printTokens ()
      {
         System.out.println(number);
         number++;
         printTokens();
      }
   }

Once the printTokens method of a Successor object is invoked, a process sending integers to the console commences. This process can be terminated only by some abnormal interruption. The same class can be rewritten to take advantage of streams. We would like a Successor object to generate a number only when asked by a process that will consume the number. The Successor should generate only one number and return it to the consuming process. When requested to continue by the same process, the Successor should produce the next number in sequence and return it to the consumer. The following code shows how the Successor class can be modified to take advantage of streams as we have defined them. Since this modification requires that stream tokens be created from Objects, we build also the IntObject class to manage the integers in the stream. We delay showing the Stream class itself until the end.

   class IntObject
   {
      int number;

      IntObject (int i) { number = i; }

      int valueOf () { return number; }
   }

   class Successor implements StreamObject
   {
      int number;

      public Successor (int n) { number = n; }

      public Stream next ()
      {
         return new Stream(new IntObject(number), new Successor(number+1));
      }
   }

A Successor object can be used to create a stream as follows:

   Stream s1 = (new Successor(1)).next();

Thus s1 is a Stream object that produces all the integers from 1, when demanded, one at a time. A consumer communicates with the Successor object through s1. A simple example involving a loop to print the first 15 integers to the console looks like this:

   for (int i=0 ; i < 15 ; i++)
   {
      System.out.print(((IntObject)(s1.first())).valueOf()+" ");
      s1 = s1.cont();
   }

As a somewhat more interesting example, and as an example of how streams can be hooked together to form stream networks, consider the problem of writing a class Times that takes an integer Stream S and integer number as input and returns a Stream object that multiplies all integers in S by number. The code for this is as follows:

   class Times implements StreamObject
   {
      Stream S;
      int number;
      int mark;

      public Times (int n, Stream ins) { number = n; S = ins; }

      public Stream next ()
      {
         if (S.isNull())  return new Stream(null);
         mark = number*((IntObject)(S.first())).valueOf();
         return new Stream(new IntObject(mark), new Times(number, S.cont()));
      }
   }

We can use a Successor object as input to a Times object as follows:

   Stream t1 = (new Times(5, (new Successor(1)).next())).next();
   Stream t2 = (new Times(2, (new Successor(4)).next())).next();

Stream t1 produces the tokens 5, 10, 15, 20, ... whereas stream t2 produces 8, 10, 12, 14, 16, ....

We can even merge streams as the following example shows. Recall that two increasing lists of integers can be merged to a single increasing list containing all integers in the original lists, including repetitions.

   class Merge implements StreamObject
   {
      Stream stream1;
      Stream stream2;

      IntObject first_token;

      public Merge (Stream s1, Stream s2) { stream1 = s1; stream2 = s2; }

      public Stream next ()
      {
         if (stream1.isNull() && stream2.isNull())  return new Stream(null);
         else
         if (stream1.isNull())
         {
            first_token = (IntObject)stream2.first();
            return new Stream(first_token, new Merge(stream1, stream2.cont()));
         }
         else
         if (stream2.isNull())
         {
            first_token = (IntObject)stream1.first();
            return new Stream(first_token, new Merge(stream1.cont(), stream2));
         }
         else
         if (((IntObject)(stream1.first())).valueOf() < 
             ((IntObject)(stream2.first())).valueOf())
         {
            first_token = (IntObject)stream1.first();
            return new Stream(first_token, new Merge(stream1.cont(), stream2));
         }
         else
         {
            first_token = (IntObject)stream2.first();
            return new Stream(first_token, new Merge(stream1, stream2.cont()));
         }
      }
   }

We can merge t1 and t2 as follows:

   Stream s3 = (new Merge(t1, t2)).next();

Here is the Stream class

   class Stream
   {
      StreamObject func;
      Object object = null;

      public Stream(Object obj, StreamObject f)
      {
         func = f;
         object = obj;
      }

      public Stream(StreamObject f)  {  func = f; }

      public boolean isNull ()
      {
         if (func == null) return true;
         if (func != null && object == null)
            if (func.next().first() == null) return true;
         return false;
      }
   
      public Stream cont ()
      {
         if (isNull()) return new Stream(null);
         return func.next();
      }

      public Object first ()
      {
         if (object == null && func != null)
            return func.next().first();
         return object;
      }
   }

There are a number of problems that would be hard to solve without such a Stream class. Consider the following problem, known as Hamming's problem:

  Given:   A list of prime numbers in increasing order
          
  Produce: An increasing list of integers, each having at least one
           of the given list of prime numbers as a factor and only
           prime numbers from the given list as prime factors.

A generate-and-test solution is not practical. The following stream-based approach is much better. Let hamming(p) represent the stream of integers that is the solution to an instance of Hamming's problem given the list p of primes. Let first(p) be the first element of the list of primes p. Let cont(p) be the list of primes equal to p with first(p) removed. Let + denote stream concatenation. Then, symbolically,

   hamming(p) =
        if (p == null) return (null);
        return first(p) + merge(times(first(p), hamming(p)), hamming(cont(p)));

In Java, using our Stream class to implement this idea we get

   class Hamming implements StreamObject
   {
      Stream primes;
  
      public Hamming (Stream p) {  primes = p;  }

      public Stream next ()
      {
         if (primes.isNull())  return new Stream(null);
         int f = ((IntObject)(primes.first())).valueOf();
         Stream h1 = new Stream(new IntObject(1), new Hamming(primes));
         Stream s1 = new Stream(new IntObject(1), new Times(f, h1.cont()));
         Stream rest_tokens = primes.cont();
         if (rest_tokens != null)
         {
            Stream s2 = new Stream(new IntObject(1), new Hamming(primes.cont()));
            return new Stream (primes.first(), new Merge(s1.cont(), s2.cont()));
         }
         else
         {
            return new Stream (primes.first(), new Times(f, h1.cont()));
         }
      }
   }

where the new IntObject(1) terms are merely placeholders that disappear when some cont is invoked. Unfortunately this does not work in Java because the invocations of cont result in an infinite recursion. Keeping out the cont invocations allows us to solve Hamming's problem but with a hitch. The astute reader will realize that, if the new IntObject(1) terms are replaced by the actual first tokens in the corresponding streams (resulting in the first two stream tokens being duplicates) and the cont invocations are removed, the number of duplicates in the Hamming stream, for each token, is never greater than three and all duplicates are found in succession. Therefore, a reasonable solution to Hamming's problem is the following:

   class Hamming implements StreamObject
   {
      Stream primes;
  
      public Hamming (Stream p) {  primes = p;  }

      public Stream next ()
      {
         if (primes.isNull())  return new Stream(null);
         int f = ((IntObject)(primes.first())).valueOf();
         Stream h1 = new Stream(primes.first(), new Hamming(primes));
         Stream s1 = new Stream(new IntObject(f*f), new Times(f, h1));
         Stream rest_tokens = primes.cont();
         if (rest_tokens != null)
         {
            Stream s2 = new Stream(rest_tokens.first(), new Hamming(primes.cont()));
            return new Stream (primes.first(), new Merge(s1, s2));
         }
         else
         {
            return new Stream (primes.first(), new Times(f, h1));
         }
      }
   }

plus some code that removes duplicates.

The prime numbers can be made a Stream object as follows:

   class ArrayObject
   {
      int array[];
      int index;

      public ArrayObject (int n, int a[]) { array = a; index = n;  }
      public int[] arrayOf()  { return array; }
      public int valueOf ()   { return array[index];  }
      public int indexOf () { return index; }
   }

   class PrimeList implements StreamObject
   {
      ArrayObject primes;

      public PrimeList (ArrayObject a)  {  primes = a;  }

      public Stream next ()
      {
         if (primes.indexOf() < 0) return new Stream(null);
         IntObject io = new IntObject(primes.valueOf());
         int[] pao = primes.arrayOf();
         PrimeList pl = new PrimeList(new ArrayObject(primes.indexOf()-1, pao));
         return new Stream(io, pl);
     }
   }

Code for generating the numbers in a Hamming sequence is as follows:

   int a[] = new int[3];
   a[0] = 11;
   a[1] = 5;
   a[2] = 3;

   Stream h = (new Hamming((new PrimeList(new ArrayObject(2, a))).next())).next();

   for (int i=0 ; i < 50 ; i++)
   {
      if (h.isNull()) break;
      int val = ((IntObject)(h.first())).valueOf();
      System.out.print(val+" ");
      h = h.cont();
   }