Think of a stream as a sequence of tokens (strings, integers, etc.)
where each token is not generated by a *producer* procedure until
demanded by a *consumer* procedure. To illustrate the importance of
streams we use an example from the DOS world. What happens on a DOS
machine (version 5.0) when you issue a command such as

type file.txt | more ?

The output from the `type` command is generated and stored
somewhere. Then the `more` command is executed on the stored
data. What's wrong with this? If `file.txt` has more than
65,000 characters, it barfs. The reason is DOS cannot create a file
that large to store the data that `more` takes as input.

Now suppose we view the `more` command as a consumer of tokens
and the `type` command as a producer of tokens only upon demond
from `more`. The `more` command goes into action first
and requests the next token from `type`. Then `type`
produces the next token, `more` grabs it, and outputs it to the
screen. The cycle repeats until either the screen is full or
`type` has no more tokens to produce. There is no DOS-like
barfing in this case and the user sees exactly what is expected.

In this section we create a `Stream` class which facilitates
the consumer-producer mechanism illustrated above. A Stream object
`S` (an object of the `Stream` class) is regarded as a
pair consisting of a Token object, representing the first token in
`S`, and a Continuation, or a procedure, that is capable of
producing all the remaining tokens in `S`. The definitions for
producing `S` come from an object of another class. We call
this object a `guest` and the corresponding Stream object its
host.

A guest object satisfies one major requirement: the class from which
it is defined must contain a method `next` that returns a
Stream object. The prototype for the `next` method is given
in the `StreamObject` interface which the guest object's class
must implement.

interface StreamObject { public Stream next (); }

The `next` method should be designed following a special style
that will be illustrated by example later.

Associated with a Stream object are two methods called `first`
and `cont`. The `first` method returns the saved Token
object in the Stream pair. The `cont` method invokes the
`next` method of the guest to produce another stream Token.
The `next` method is designed to spit out that Token and
suspend itself until the next request.

As an example, consider a Successor class that outputs all the integers starting at some specified one. The class may be defined as follows:

class Successor { int number; public Successor (int n) { number = n; } public void printTokens () { System.out.println(number); number++; printTokens(); } }

Once the `printTokens` method of a Successor object is invoked,
a process sending integers to the console commences. This process can
be terminated only by some abnormal interruption. The same class can
be rewritten to take advantage of streams. We would like a Successor
object to generate a number only when asked by a process that will
consume the number. The Successor should generate only one number and
return it to the consuming process. When requested to continue *by
the same process*, the Successor should produce the next number in
sequence and return it to the consumer. The following code shows how
the `Successor` class can be modified to take advantage of
streams as we have defined them. Since this modification requires
that stream tokens be created from Objects, we build also the
`IntObject` class to manage the integers in the stream. We delay
showing the `Stream` class itself until the end.

class IntObject { int number; IntObject (int i) { number = i; } int valueOf () { return number; } } class Successor implements StreamObject { int number; public Successor (int n) { number = n; } public Stream next () { return new Stream(new IntObject(number), new Successor(number+1)); } }

A Successor object can be used to create a stream as follows:

Stream s1 = (new Successor(1)).next();

Thus `s1` is a Stream object that produces all the integers
from 1, when demanded, one at a time. A consumer communicates with
the Successor object through `s1`. A simple example involving
a loop to print the first 15 integers to the console looks like this:

for (int i=0 ; i < 15 ; i++) { System.out.print(((IntObject)(s1.first())).valueOf()+" "); s1 = s1.cont(); }

As a somewhat more interesting example, and as an example of how
streams can be hooked together to form stream networks, consider the
problem of writing a class `Times` that takes an integer Stream
`S` and integer `number` as input and returns a Stream
object that multiplies all integers in `S` by `number`.
The code for this is as follows:

class Times implements StreamObject { Stream S; int number; int mark; public Times (int n, Stream ins) { number = n; S = ins; } public Stream next () { if (S.isNull()) return new Stream(null); mark = number*((IntObject)(S.first())).valueOf(); return new Stream(new IntObject(mark), new Times(number, S.cont())); } }

We can use a Successor object as input to a Times object as follows:

Stream t1 = (new Times(5, (new Successor(1)).next())).next(); Stream t2 = (new Times(2, (new Successor(4)).next())).next();

Stream `t1` produces the tokens `5, 10, 15, 20, ...` whereas
stream `t2` produces `8, 10, 12, 14, 16, ...`.

We can even merge streams as the following example shows. Recall that two increasing lists of integers can be merged to a single increasing list containing all integers in the original lists, including repetitions.

class Merge implements StreamObject { Stream stream1; Stream stream2; IntObject first_token; public Merge (Stream s1, Stream s2) { stream1 = s1; stream2 = s2; } public Stream next () { if (stream1.isNull() && stream2.isNull()) return new Stream(null); else if (stream1.isNull()) { first_token = (IntObject)stream2.first(); return new Stream(first_token, new Merge(stream1, stream2.cont())); } else if (stream2.isNull()) { first_token = (IntObject)stream1.first(); return new Stream(first_token, new Merge(stream1.cont(), stream2)); } else if (((IntObject)(stream1.first())).valueOf() < ((IntObject)(stream2.first())).valueOf()) { first_token = (IntObject)stream1.first(); return new Stream(first_token, new Merge(stream1.cont(), stream2)); } else { first_token = (IntObject)stream2.first(); return new Stream(first_token, new Merge(stream1, stream2.cont())); } } }

We can merge `t1` and `t2` as follows:

Stream s3 = (new Merge(t1, t2)).next();

Here is the `Stream` class

class Stream { StreamObject func; Object object = null; public Stream(Object obj, StreamObject f) { func = f; object = obj; } public Stream(StreamObject f) { func = f; } public boolean isNull () { if (func == null) return true; if (func != null && object == null) if (func.next().first() == null) return true; return false; } public Stream cont () { if (isNull()) return new Stream(null); return func.next(); } public Object first () { if (object == null && func != null) return func.next().first(); return object; } }

There are a number of problems that would be hard to solve without such a Stream class. Consider the following problem, known as Hamming's problem:

Given: A list of prime numbers in increasing order Produce: An increasing list of integers, each having at least one of the given list of prime numbers as a factor and only prime numbers from the given list as prime factors.

A generate-and-test solution is not practical. The following stream-based
approach is much better. Let `hamming(p)` represent the stream of
integers that is the solution to an instance of Hamming's problem given
the list `p` of primes. Let `first(p)` be the first element
of the list of primes `p`. Let `cont(p)` be the list of
primes equal to `p` with `first(p)` removed. Let `+`
denote stream concatenation. Then, symbolically,

hamming(p) = if (p == null) return (null); return first(p) + merge(times(first(p), hamming(p)), hamming(cont(p)));

In Java, using our `Stream` class to implement this idea we get

class Hamming implements StreamObject { Stream primes; public Hamming (Stream p) { primes = p; } public Stream next () { if (primes.isNull()) return new Stream(null); int f = ((IntObject)(primes.first())).valueOf(); Stream h1 = new Stream(new IntObject(1), new Hamming(primes)); Stream s1 = new Stream(new IntObject(1), new Times(f, h1.cont())); Stream rest_tokens = primes.cont(); if (rest_tokens != null) { Stream s2 = new Stream(new IntObject(1), new Hamming(primes.cont())); return new Stream (primes.first(), new Merge(s1.cont(), s2.cont())); } else { return new Stream (primes.first(), new Times(f, h1.cont())); } } }

where the `new IntObject(1)` terms are merely placeholders that
disappear when some `cont` is invoked. Unfortunately this does
not work in Java because the invocations of `cont` result in an
infinite recursion. Keeping out the `cont` invocations allows
us to solve Hamming's problem but with a hitch. The astute reader
will realize that, if the `new IntObject(1)` terms are replaced
by the actual first tokens in the corresponding streams (resulting in
the first two stream tokens being duplicates) and the `cont`
invocations are removed, the number of duplicates in the Hamming
stream, for each token, is never greater than three and all duplicates
are found in succession. Therefore, a reasonable solution to
Hamming's problem is the following:

class Hamming implements StreamObject { Stream primes; public Hamming (Stream p) { primes = p; } public Stream next () { if (primes.isNull()) return new Stream(null); int f = ((IntObject)(primes.first())).valueOf(); Stream h1 = new Stream(primes.first(), new Hamming(primes)); Stream s1 = new Stream(new IntObject(f*f), new Times(f, h1)); Stream rest_tokens = primes.cont(); if (rest_tokens != null) { Stream s2 = new Stream(rest_tokens.first(), new Hamming(primes.cont())); return new Stream (primes.first(), new Merge(s1, s2)); } else { return new Stream (primes.first(), new Times(f, h1)); } } }

plus some code that removes duplicates.

The prime numbers can be made a Stream object as follows:

class ArrayObject { int array[]; int index; public ArrayObject (int n, int a[]) { array = a; index = n; } public int[] arrayOf() { return array; } public int valueOf () { return array[index]; } public int indexOf () { return index; } } class PrimeList implements StreamObject { ArrayObject primes; public PrimeList (ArrayObject a) { primes = a; } public Stream next () { if (primes.indexOf() < 0) return new Stream(null); IntObject io = new IntObject(primes.valueOf()); int[] pao = primes.arrayOf(); PrimeList pl = new PrimeList(new ArrayObject(primes.indexOf()-1, pao)); return new Stream(io, pl); } }

Code for generating the numbers in a Hamming sequence is as follows:

int a[] = new int[3]; a[0] = 11; a[1] = 5; a[2] = 3; Stream h = (new Hamming((new PrimeList(new ArrayObject(2, a))).next())).next(); for (int i=0 ; i < 50 ; i++) { if (h.isNull()) break; int val = ((IntObject)(h.first())).valueOf(); System.out.print(val+" "); h = h.cont(); }