跳转至

13 Streams

A future represents a single value that will arrive in the future. On the other hand, a stream represents multiple values that will arrive in the future. Think of a stream as a list of futures.

You can imagine a stream meandering through the woods as the autumn leaves fall onto the water’s surface. Each time a leaf floats by, it’s like the value that a Dart stream provides.

img

Streaming music online rather than downloading the song before playing it is another good comparison. When you stream music, you get many little chunks of data, but when you download the whole file, you get a single value, which is the entire file — a little like what a future returns. The http.get command you used in the last section was implemented as a stream internally. However, Dart just waited until the stream finished and then returned all the data at once as a completed future.

Streams, which are of type Stream, are used extensively in Dart and Dart-based frameworks. Here are some examples:

  • Reading a large file stored locally where new data from the file comes in chunks.
  • Downloading a file from a remote server.
  • Listening for requests coming into a server.
  • Representing user events such as button clicks.
  • Relaying changes in app state to the UI.

Although it’s possible to build streams from scratch, you usually don’t need to do that. You only need to use the streams that Dart or a Dart package provides. The first part of this chapter will teach you how to do that. The chapter will finish by teaching you how to make your own steams.

Using a Stream

Reading and writing files are important skills to learn in Dart. This will also be a good opportunity to practice using a stream.

The dart:io library contains a File class, which allows you to read data from a file. First, you’ll read data the easy way using the readAsString method, which returns the file’s contents as a future. Then, you’ll do it again by reading the data as a stream of bytes.

Adding an Assets File

You need a text file to work with, so you’ll add that to your project now.

Create a new folder named assets in the root of your project. In that folder, create a file named text.txt. Add some text to the file. Although any text will work, Lorem Ipsum is a good standby:

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

Then, save the file.

Note

Lorem Ipsum is often used as filler text by graphic designers and app developers when the meaning of the text doesn’t matter. The Latin words were taken from the writings of the Roman statesman and philosopher Cicero but modified to become essentially meaningless.

Reading as a String

Now that you’ve created the text file, replace your Dart code with the following:

import 'dart:io';

Future<void> main() async {
  final file = File('assets/text.txt');
  final contents = await file.readAsString();
  print(contents);
}

Here’s what’s new:

  • File takes the relative path to your text file as the argument.
  • readAsString returns Future<String>, but by using await, you’ll receive the string itself when it’s ready.

File also has a readAsStringSync method, which would run synchronously and avoid awaiting a future. However, doing so would block your app if the reading takes a while. Many of the methods on File have synchronous versions, but to prevent blocking your app, you should generally prefer the asynchronous versions.

Run the code above, and you’ll see the contents of text.txt printed to the console.

Increasing the File Size

If the file is large, you can read it as a stream. This allows you to start processing the data more quickly because you don’t have to wait to finish reading the entire file as you did in the last example.

When you read a file as a stream, Dart reads the file in chunks. The size of the chunks depends on how Dart is implemented on the system you’re using, but it’s probably 65,536 bytes per chunk as it was on the local machines used when writing this chapter. The text.txt file with Lorem Ipsum that you created earlier is only 445 bytes, so trying to stream that file would be no different than simply reading the whole thing as you did before.

To get a text file large enough to stream in chunks, create a new file in the assets folder called text_long.txt. Copy the Lorem Ipsum text and paste it in text_long.txt as new lines so that there are 1000 Lorem Ipsum copies. You can, of course, select all and recopy from time to time, unless you find it therapeutic to paste things a thousand times. Save the file, and you’re ready to proceed.

Alternatively, you can find text_long.txt in the assets folder of the final project that comes with this chapter.

Reading From a Stream

Replace the contents in the body of the main function with the following code:

final file = File('assets/text_long.txt');
final stream = file.openRead();
stream.listen(
  (data) {
    print(data.length);
  },
);

Here are a few points to note:

  • Instead of calling readAsString on file, this time you’re calling openRead, which returns an object of type Stream<List<int>>. That’s a lot of angle brackets, but Stream<List<int>> simply means it’s a stream that periodically produces a list, and that list is a list of integers. The integers are the byte values, and the list is the chunk of data being passed in.
  • To subscribe for notifications whenever new data comes in the stream, you call listen and pass it an anonymous function that takes a single parameter. The data parameter here is of type List<int>, which gives you access to the chunk of data coming in from the file.
  • Because each integer in the list is one byte, calling data.length will tell you the number of bytes in the chunk.

Note

By default, only a single object can listen to a stream. This is known as a single-subscription stream. If you want more than one object to be notified of stream events, you need to create a broadcast stream, which you could do like so:

  final broadcastStream = stream.asBroadcastStream();

Run the code in main, and you’ll see something like the following:

65536
65536
65536
65536
65536
65536
52783

At least on the computer used while writing this chapter, the data was all in 65,536-byte chunks until the final one, which was smaller because it didn’t quite fill up the 65,536-byte buffer size. Your final chunk might be a different size than the one shown here, depending on how therapeutic your copy-and-paste session was.

Using an Asynchronous For-Loop

Just as you can use callbacks or async-await to get the value of a future, you also have two ways to get the values of a stream. In the example above, you used the listen callback. Here is the same example using an asynchronous for loop:

Future<void> main() async {
  final file = File('assets/text_long.txt');
  final stream = file.openRead();
  await for (var data in stream) {
    print(data.length);
  }
}

The await for keywords cause the loop to pause until the next data event comes in. Run this, and you’ll see the same results as before.

Error Handling

Like futures, stream events can also include an error rather than a value.

img

Be a responsible programmer and plan how to handle errors. Callbacks and try-catchblocks both work.

Using a Callback

One way to handle errors is to use the onError callback like so:

final file = File('assets/text_long.txt');
final stream = file.openRead();
stream.listen(
  (data) {
    print(data.length);
  },
  onError: (Object error) {
    print(error);
  },
  onDone: () {
    print('All finished');
  },
);

Here are a couple of points to note:

  • When an error occurs, it won’t cancel the stream, and you’ll continue to receive more data events. If you want to cancel the stream after an error, listen also has a cancelOnError parameter that you can set to true.
  • When a stream finishes sending all its data, it’ll fire a done event. This gives you a chance to respond with an onDone callback.

Using Try-Catch

The other way to handle errors on a stream is with a try-catch block in combination with async-await. Here is what that looks like:

try {
  final file = File('assets/text_long.txt');
  final stream = file.openRead();
  await for (var data in stream) {
    print(data.length);
  }
} on Exception catch (error) {
    print(error);
} finally {
  print('All finished');
}

In this example, you’re catching all exceptions. A more robust solution would check for specific errors like FileSystemException, which Dart would throw if the file didn’t exist.

Run either the callback version or the try-catch version, and you’ll see the same chunk sizes as before, with the additional text “All finished” printed at the end.

Change the filename to something nonexistent, like pink_elephants.txt, and rerun the code. Confirm that you have a FileSystemException.

FileSystemException: Cannot open file, path = 'assets/pink_elephants.txt' (OS Error: No such file or directory, errno = 2)
All finished

Even with the exception, the finally block (or onDone callback if that’s what you used) still printed “All finished”.

Cancelling a Stream

As mentioned above, you may use the cancelOnError parameter to tell the stream that you want to stop listening in the event of an error. But even if there isn’t an error, you should always cancel your subscription to a stream if you no longer need it. This allows Dart to clean up the memory the stream was using. Failing to do so can cause a memory leak.

Replace your Dart code with the following version:

import 'dart:async';
import 'dart:io';

void main() {
  final file = File('assets/text_long.txt');
  final stream = file.openRead();
  StreamSubscription<List<int>>? subscription;
  subscription = stream.listen(
    (data) {
      print(data.length);
      subscription?.cancel();
    },
    cancelOnError: true,
    onDone: () {
      print('All finished');
    },
  );
}

Calling listen returns a StreamSubscription, which is part of the dart:asynclibrary. Keeping a reference to that in the subscription variable allows you to cancel the subscription whenever you want. In this case, you cancel it after the first data event.

Run the code, and you’ll only see 65536 printed once. The onDone callback was never called because the stream never completed.

Transforming a Stream

Being able to transform a stream as the data is coming in is very powerful. In the examples above, you never did anything with the data except print the length of the bytes list. Those bytes represent text, though, so you’re going to transform the data from numbers to text.

For this demonstration, there’s no need to use a large text file, so you’ll switch back to the 445-byte version of Lorem Ipsum in text.txt.

Viewing the Bytes

Replace the contents of main with the following code:

final file = File('assets/text.txt');
final stream = file.openRead();
stream.listen(
  (data) {
    print(data);
  },
);

Run that, and you’ll see a long list of bytes in decimal form:

[76, 111, 114, 101, ... ]

Although different computers encode text files using different encodings, the abbreviated list above is from a computer that uses UTF-8 encoding. You might recall that UTF-16 uses 16-bit, or 2-byte, code units to encode Unicode text. UTF-8 uses one to four 8-bit code units to encode Unicode text. Because for values of 127 and below, UTF-8 and Unicode code points are the same, English text only takes one byte per letter. This makes file sizes smaller than UTF-16 encoding. The smaller size helps when saving to disk or sending data over a network.

If you look up 76 in Unicode, you see that it’s the capital letter L, 111 is o, and on it goes with Lorem ipsum dolor sit….

img

Decoding the Bytes

Next, you’ll take the UTF-8 bytes and convert them to a string.

Make sure you have the following imports and main method:

import 'dart:convert';
import 'dart:io';

Future<void> main() async {
  final file = File('assets/text.txt');
  final byteStream = file.openRead();
  final stringStream = byteStream.transform(utf8.decoder);
  await for (var data in stringStream) {
    print(data);
  }
}

The main difference here is that you’re using transform. This method takes the input from the original stream, transforms it with a StreamTransformer and outputs a new stream, which you can listen to or loop over as before. In this case, the stream transformer was the dart:convert library’s utf8.decoder, which takes a list of bytes and converts them to a string.

Run the code, and you’ll see the Lorem Ipsum passage printed in plain text.

Exercise

The following code produces a stream that outputs an integer every second and stops after the tenth time.

Stream<int>.periodic(
  Duration(seconds: 1),
  (value) => value,
).take(10);
  1. Set the stream above to a variable named myStream.
  2. Use await for to print the value of the integer on each data event coming from the stream.

Creating Streams From Scratch

You’ve learned how to use streams. As you advance in your skills, you might want to also create packages with streams for other developers to use.

Say, for example, you’re writing an audio player plugin. You need to take the events that the underlying platform provides and pass them on to Dart. Using a stream is a natural choice for continuous events like playback state changes or the current play position. Because the data comes from outside of Dart, though, you have to create the stream yourself. The rest of this chapter will show you how to do that.

You can create a stream in a few ways:

  • Using Stream constructors.
  • Using asynchronous generators.
  • Using stream controllers.

You’ll start with constructors and move on to the other methods.

Using Stream Constructors

The Stream class has several constructors you can use to create streams. You saw an example in the exercise above with Stream.periodic, which added data at periodic intervals. Here are a few more named constructors:

  • Stream.empty: A stream with no values or errors. It’s done as soon as you listen to it.
  • Stream.value: A stream with a single value.
  • Stream.error: A stream with a single error.
  • Stream.fromFuture: Converts a future to a stream.
  • Stream.fromFutures: Converts multiple futures to a stream.
  • Stream.fromIterable: Converts an iterable collection to a stream.

Feel free to try them all out. The example below will demonstrate building a stream with the fromFutures constructor.

First, create a few futures by replacing the contents of main with the following code:

final first = Future(() => 'Row');
final second = Future(() => 'row');
final third = Future(() => 'row');
final fourth = Future.delayed(
  Duration(milliseconds: 300),
  () => 'your boat',
);

Now, create your stream and listen to it like so:

final stream = Stream<String>.fromFutures([
  first,
  second,
  third,
  fourth,
]);

stream.listen((data) {
  print(data);
});

fromFutures consolidates all your futures into a single stream.

Note

Be sure to add the comma after the last future in the list so they’re formatted vertically. That way, they go gently down the stream. :]

Run your code, and there you have it:

Row
row
row
your boat

Using Asynchronous Generators

The Stream constructors are good when they match the data you have, but if you want more flexibility, consider using an asynchronous generator.

A generator is a function that produces multiple values in a sequence. As you may recall, Dart has two types of generators: synchronous and asynchronous.

Reviewing Synchronous Generators

You learned about synchronous generators in Chapter 15, “Iterables”, of Dart Apprentice: Fundamentals. But to review, a synchronous generator returns its values as an iterable. These values are available on demand. You can get them as soon as you need them. That’s why they’re called synchronous.

Here’s an example of a synchronous generator function that provides the squares of all the integers from 1 to 100 as an iterable:

Iterable<int> hundredSquares() sync* {
  for (int i = 1; i <= 100; i++) {
    yield i * i;
  }
}

Recall that sync*, read “sync star”, is what defines the function as a synchronous generator and that yield provides the values to the iterable.

By comparison, an asynchronous generator returns its values as a stream. You can’t get them whenever you want. You have to wait for them. That’s why it’s called asynchronous.

Implementing an Asynchronous Generator

When creating an asynchronous generator, use the async* keyword, which you can read as “async star”.

Note

It’s easy to forget the difference between async and async*. Here’s a reminder: Functions with async return futures, and functions with async* return streams.

Add the following top-level function to your project file:

Stream<String> consciousness() async* {
  final data = ['con', 'scious', 'ness'];
  for (final part in data) {
    await Future<void>.delayed(Duration(milliseconds: 500));
    yield part;
  }
}

Here’s what’s happening in the stream of consciousness:

…this is an async\* function, so the function returns a stream, and like synchronous functions, this function also uses the yield keyword to return values in the stream, but unlike synchronous functions, you don’t get all the values on demand, so here you’re waiting for 500 milliseconds every time you loop because the data here is just simulating data you might get from a database or the user’s device or the web or something…

Listening to the Stream

Replace the contents of main with the following:

final stream = consciousness();

stream.listen((data) {
  print(data);
});

consciousness gives you the stream, which you listen to in the usual way.

Run that, and you’ll see the following text written to the console one line every half second:

con
scious
ness

Using Stream Controllers

The final way you’ll create a stream is with the low-level StreamController. You could go even more low-level than that, but a stream controller is fine for most practical purposes.

Before diving into the code, it would help to understand how streams work.

Understanding Sinks and Streams

The way to add data or errors to a stream is with what’s called a sink. You can think of this like your kitchen sink with water flowing out of it into a pipe. The water pipe is like a stream. Throwing a grape into the sink is like adding a data value event to the stream. The grape gets washed through the sink’s drain and enters the water stream flowing through the pipe. Alternatively, you could throw a cherry in the sink, and it will have the same fate as the grape. Putting in a cherry is like adding an error event. You can also close the sink. Think of that like putting a plug in the hole. No more data or errors can enter the stream.

img

Because adding data and errors are events, a sink is also called an event sink.

When you use a stream controller, it creates and manages the sink and stream internally.

Writing the Code

Replace the contents of main with the following code:

// 1
final controller = StreamController<String>();
final stream = controller.stream;
final sink = controller.sink;
// 2
stream.listen(
  (value) => print(value),
  onError: (Object error) => print(error),
  onDone: () => print('Sink closed'),
);
// 3
sink.add('grape');
sink.add('grape');
sink.add('grape');
sink.addError(Exception('cherry'));
sink.add('grape');
sink.close();

Here’s what you’re doing:

  1. Create a stream controller of type String. Internally, the controller will take care of creating a string stream and also a sink.
  2. Listen for and handle data, error and done events on the stream.
  3. Add some data values and errors to the sink. These will flow into the stream. Finally, close the sink. sink is of type StreamSink, which implements EventSink. The EvenSink interface ensures you have the add, addErrorand close methods.

Note

The example above provides a single-subscriber stream. If you need a broadcast stream, use StreamController<String>.broadcast(). This allows you to listen to the stream more than once.

Testing It Out

Run your code, and you’ll see the following lines in the console:

grape
grape
grape
Exception: cherry
grape
Sink closed

It works! As you can see, even the low-level solution wasn’t very difficult to implement.

If you were making a library package for other people to use, you would probably make the stream controller and the sink private. Just expose the stream to the library users. See the solution to Challenge 2 for an example.

Challenges

Before going on to the next chapter, here are some challenges to test your knowledge of streams. It’s best if you try to solve them yourself, but if you get stuck, solutions are available in the challenge folder of this chapter.

Challenge 1: Data Stream

The following code uses the http package to stream content from the given URL:

final url = Uri.parse('https://kodeco.com');
final client = http.Client();
final request = http.Request('GET', url);
final response = await client.send(request);
final stream = response.stream;

Your challenge is to transform the stream from bytes to strings and see how many bytes each data chunk is. Add error handling, and when the stream finishes, close the client.

Challenge 2: Heads or Tails?

Create a coin flipping service that provides a stream of 10 random coin flips, each separated by 500 milliseconds. You use the service like so:

final coinFlipper = CoinFlippingService();

coinFlipper.onFlip.listen((coin) {
  print(coin);
});

coinFlipper.start();

onFlip is the name of the stream.

Key Points

  • A stream, which is of type Stream, is like a series of futures.
  • Using a stream enables you to handle data events as they happen rather than waiting for them all to finish.
  • You can handle stream errors with callbacks or try-catch blocks.
  • You can create streams with Stream constructors, asynchronous generators or stream controllers.
  • A sink is an object for adding values and errors to a stream.

Where to Go From Here?

Streams are powerful, and you can do much more with them. For example, if your app has a “Download Song” button, you don’t want to overload the server when some happy kid presses the button as fast as they can a million times. You can consolidate that stream of button-press events into a single server request. This is called debouncing. It doesn’t come built into Dart, but packages like RxDart support debouncing and many other stream functions.