跳转至

14 Working With StreamsWritten by Kevin D Moore

Imagine yourself sitting by a creek, having a wonderful time. While watching the water flow, you see a piece of wood or a leaf floating down the stream and you decide to take it out of the water. You could even have someone upstream purposely float things down the creek for you to grab.

You can imagine Dart streams in a similar way: as data flowing down a creek, waiting for someone to grab it. That’s what a stream does in Dart — it sends data events for a listener to grab.

With Dart streams, you can send one data event at a time while other parts of your app listen for those events. Such events can be collections, maps or any other type of data you’ve created.

Streams can send errors in addition to data; you can also stop the stream, if you need to.

In this chapter, you’ll update your recipe project to use streams in two different locations. You’ll use one for bookmarks, to let the user mark favorite recipes and automatically update the UI to display them. You’ll use the second to update your ingredient and grocery lists.

But before you jump into the code, you’ll learn more about how streams work.

Types of streams

Streams are part of Dart, and Flutter inherits them. There are two types of streams in Flutter: single subscription streams and broadcast streams.

img

Single subscription streams are the default. They work well when you’re only using a particular stream on one screen.

A single subscription stream can only be listened to once. It doesn’t start generating events until it has a listener and it stops sending events when the listener stops listening, even if the source of events could still provide more data.

Single subscription streams are useful to download a file or for any single-use operation. For example, a widget can subscribe to a stream to receive updates about a value, like the progress of a download, and update its UI accordingly.

If you need multiple parts of your app to access the same stream, use a broadcast stream, instead.

A broadcast stream allows any number of listeners. It fires when its events are ready, whether there are listeners or not.

To create a broadcast stream, you simply call asBroadcastStream() on an existing single subscription stream.

final broadcastStream = singleStream.asBroadcastStream();

You can differentiate a broadcast stream from a single subscription stream by inspecting its Boolean property isBroadcast.

In Flutter, there are some key classes built on top of Stream that simplify programming with streams.

The following diagram shows the main classes used with streams:

img

Next, you’ll take a deeper look at each one.

StreamController and sink

When you create a stream, you usually use StreamController, which holds both the stream and StreamSink.

Sink

A sink is a destination for data. When you want to add data to a stream, you will add it to the sink. Since the StreamController owns the sink, it listens for data on the sink and sends the data to it’s stream listeners.

Here’s an example that uses StreamController:

final _recipeStreamController = StreamController<List<Recipe>>();
final _stream = _recipeStreamController.stream;

To add data to a stream, you add it to its sink:

_recipeStreamController.sink.add(_recipesList);

This uses the sink field of the controller to “place” a list of recipes on the stream. That data will be sent to any current listeners.

When you’re done with the stream, make sure you close it, like this:

_recipeStreamController.close();

StreamSubscription

Using listen() on a stream returns a StreamSubscription. You can use this subscription class to cancel the stream when you’re done, like this:

StreamSubscription subscription = stream.listen((value) {
    print('Value from controller: $value');
});
...
...
// You are done with the subscription
subscription.cancel();

Sometimes, it’s helpful to have an automated mechanism to avoid managing subscriptions manually. That’s where StreamBuilder comes in.

StreamBuilder

StreamBuilder is handy when you want to use a stream. It takes two parameters: a stream and a builder. As you receive data from the stream, the builder takes care of building or updating the UI.

Here’s an example:

final repository = Provider.of<Repository>(context, listen: false);
  return StreamBuilder<List<Recipe>>(
    stream: repository.recipesStream(),
    builder: (context, AsyncSnapshot<List<Recipe>> snapshot) {
      // extract recipes from snapshot and build the view
    }
  )
...

StreamBuilder is handy because you don’t need to use a subscription directly and it unsubscribes from the stream automatically when the widget is destroyed.

Now that you understand how streams work, you’ll convert your existing project to use them.

Adding streams to Recipe Finder

You’re now ready to start working on your recipe project. If you’re following along with your app from the previous chapters, open it and keep using it with this chapter. If not, just locate the projects folder for this chapter and open starter in Android Studio.

Note: If you use the starter app, don’t forget to add your apiKey and apiId in network/recipe_service.dart.

To convert your project to use streams, you need to change the memory repository class to add two new methods that return one stream for recipes and another for ingredients. Instead of just returning a list of static recipes, you’ll use streams to modify that list and refresh the UI to display the change.

This is what the flow of the app looks like:

img

Here, you can see that the RecipeList screen has a list of recipes. Bookmarking a recipe adds it to the bookmarked recipe list and updates both the bookmark and the groceries screens.

You’ll start by converting your repository code to return Streams and Futures.

Adding futures and streams to the repository

Open data/repository.dart and change all of the return types to return a Future. For example, change the existing findAllRecipes() to:

Future<List<Recipe>> findAllRecipes();

Do this for all the methods except init() and close().

Your final class should look like:

Future<List<Recipe>> findAllRecipes();

Future<Recipe> findRecipeById(int id);

Future<List<Ingredient>> findAllIngredients();

Future<List<Ingredient>> findRecipeIngredients(int recipeId);

Future<int> insertRecipe(Recipe recipe);

Future<List<int>> insertIngredients(List<Ingredient> ingredients);

Future<void> deleteRecipe(Recipe recipe);

Future<void> deleteIngredient(Ingredient ingredient);

Future<void> deleteIngredients(List<Ingredient> ingredients);

Future<void> deleteRecipeIngredients(int recipeId);

Future init();

void close();

These updates allow you to have methods that work asynchronously to process data from a database or the network.

Next, add two new Streams after findAllRecipes():

// 1
Stream<List<Recipe>> watchAllRecipes();
// 2
Stream<List<Ingredient>> watchAllIngredients();

Here’s what this code does:

  1. watchAllRecipes() watches for any changes to the list of recipes. For example, if the user did a new search, it updates the list of recipes and notifies listeners accordingly.
  2. watchAllIngredients() listens for changes in the list of ingredients displayed on the Groceries screen.

You’ve now changed the interface, so you need to update the memory repository.

Cleaning up the repository code

Before updating the code to use streams and futures, there are some minor housekeeping updates.

Open data/memory_respository.dart, import the the Dart async library:

import 'dart:async';

Now remove:

import 'package:flutter/foundation.dart';

Then, update the MemoryRepository class definition to remove ChangeNotifier, so it looks like:

class MemoryRepository extends Repository {

Next, add a few new fields after the existing two List declarations, ignoring all the red squiggles:

//1
Stream<List<Recipe>>? _recipeStream;
Stream<List<Ingredient>>? _ingredientStream;
// 2
final StreamController _recipeStreamController =
    StreamController<List<Recipe>>();
final StreamController _ingredientStreamController =
    StreamController<List<Ingredient>>();

Here’s what’s going on:

  1. _recipeStream and _ingredientStream are private fields for the streams. These will be captured the first time a stream is requested, which prevents new streams from being created for each call.
  2. Creates StreamControllers for recipes and ingredients.

And now, add these new methods before findAllRecipes():

// 3
@override
Stream<List<Recipe>> watchAllRecipes() {
  _recipeStream ??= _recipeStreamController.stream as Stream<List<Recipe>>;
  return _recipeStream!;
}

// 4
@override
Stream<List<Ingredient>> watchAllIngredients() {
  _ingredientStream ??=
    _ingredientStreamController.stream as Stream<List<Ingredient>>;
  return _ingredientStream!;
}

These streams will:

  1. Check to see if you already have the stream. If not, call the stream method, which creates a new stream, then return it.
  2. Do the same for ingredients.

Updating the existing repository

MemoryRepository is full of red squiggles. That’s because the methods all use the old signatures, and everything’s now based on Futures.

Still in data/memory_repository.dart, replace the existing findAllRecipes() with this:

@override
// 1
Future<List<Recipe>> findAllRecipes() {
  // 2
  return Future.value(_currentRecipes);
}

These updates:

  1. Change the method to return a Future.
  2. Wrap the return value with a Future.value().

There are a few more updates you need to make before moving on to the next section.

First, in init() remove the null from the return statement so it looks like this:

@override
Future init() {
  return Future.value();
}

Then update close() so it closes the streams.

@override
void close() {
  _recipeStreamController.close();
  _ingredientStreamController.close();
}

In the next section, you’ll update the remaining methods to return futures and add data to the stream using StreamController.

Sending recipes over the stream

As you learned earlier, StreamController’s sink property adds data to streams. Since this happens in the future, you need to change the return type to Future and then update the methods to add data to the stream.

To start, change insertRecipe() to:

@override
// 1
Future<int> insertRecipe(Recipe recipe) {
  _currentRecipes.add(recipe);
  // 2
  _recipeStreamController.sink.add(_currentRecipes);
  if (recipe.ingredients != null) {
    insertIngredients(recipe.ingredients!);
  }
  // 3
  // 4
  return Future.value(0);
}

Here’s what you have updated:

  1. Update the method’s return type to be Future.
  2. Add _currentRecipes to the recipe sink. You might wonder why you call add() with the same list instead of adding a single ingredient or recipe. The reason is that the stream expects a list, not a single value. By doing it this way, you replace the previous list with the updated one.
  3. Removed notifyListeners();.
  4. Return a Future value. You’ll learn how to return the ID of the new item in a later chapter.

This replaces the previous list with the new list and notifies any stream listeners that the data has changed.

Now that you know how to convert the first method, it’s time to convert the rest of the methods as an exercise. Don’t worry, you can do it! :]

Exercise

Convert the remaining methods, just like you just did with insertRecipe(). You’ll need to do the following:

  1. Update MemoryRepository methods to return a Future that matches the new Repository interface methods.
  2. For all methods that change a watched item, add a call to add the item to the sink.
  3. Remove all the calls to notifyListeners(). Hint: not all methods have this statement.
  4. Wrap the return values in Futures.

For a method that returns a Future<void>, what do you think the return will look like? Got it? There might be a future for you yet.

return Future.value();

If you get stuck, check out memory_repository.dart in this chapter’s challenge folder — but first give it your best shot!

After you complete the exercise, MemoryRepository shouldn’t have any more red squiggles — but you still have a few more tweaks to make before you can run your new, stream-powered app.

Note: It’s very important that your add recipes to the _recipeStreamController.sink method for recipes and _ingredientStreamController.sink for ingredients. To make sure you did it correctly, check the challenge project. You’ll need to do the same for the delete methods as well.

Switching between services

In the previous chapter, you created a MockService to provide local data that never changes, but you also have access to RecipeService. It’s still a bit tedious to switch between the two, so you’ll take care of that before integrating streams.

An easy way to do that is with an interface — or, as it’s known in Dart, an abstract class. Remember that an interface or abstract class is just a contract that implementing classes will provide the given methods.

Once you create your interface, it will look like this:

img

To start creating the interface, go to the network folder, create a new Dart file named service_interface.dart and add the following imports:

import 'package:chopper/chopper.dart';
import 'model_response.dart';
import 'recipe_model.dart';

Next, add a new class:

abstract class ServiceInterface {
  Future<Response<Result<APIRecipeQuery>>> queryRecipes(
    String query,
    int from,
    int to,
  );
}

This defines a class with one method named queryRecipes(). It has the same parameters and return values as RecipeService and MockService. By having each service implement this interface, you can change the providers to provide this interface instead of a specific class.

Implementing the new service interface

Open network/recipe_service.dart and add the service_interface import:

import 'service_interface.dart';

Now, have RecipeService implement ServiceInterface:

abstract class RecipeService extends ChopperService
    implements ServiceInterface {

Then, add @override right above:

@Get(path: 'search')

Next, do the same in mock_service/mock_service.dart. Add the service_interface import:

import '../network/service_interface.dart';

Now, have the service implement this interface:

class MockService implements ServiceInterface {

and add the @override above queryRecipes().

With this done, you can now change the provider in main.dart.

Changing the provider

You’ll now adopt the new service interface instead of the specific services you used in the current code.

Open main.dart and add these imports:

import 'data/repository.dart';
import 'network/recipe_service.dart';
import 'network/service_interface.dart';

Then, remove the existing import of mock_service.dart.

Find // TODO: Update ChangeNotifierProvider replace it and ChangeNotifierProvider()with this:

Provider<Repository>(
  lazy: false,
  create: (_) => MemoryRepository(),
),

When you provide a Repository(), you can change the type of repository you create. Here, you’re using MemoryRepository(), but you could also use something else, as you’ll do in the next chapter.

Now, just below the last change, replace the other Provider() with:

Provider<ServiceInterface>(
  create: (_) => RecipeService.create(),
  lazy: false,
),

Here, you use RecipeService(), but if you start having problems with API rate-limiting, you can switch to MockService().

Next, open ui/recipes/recipe_details.dart and replace the memory_repository import with the Repository import:

import '../../data/repository.dart';

If it doesn’t already exist, add an import for the cached image package:

import 'package:cached_network_image/cached_network_image.dart';

Now, replace // TODO: change to new repository and the line beneath it with:

final repository = Provider.of<Repository>(context);

Replace:

// Comment out Align()
Align(
  alignment: Alignment.topCenter,
  child: Image.asset(
    'assets/images/pizza_w700.png',
    height: 200,
    width: 200,
  ),
)

with:

Align(
  alignment: Alignment.topLeft,
  child: CachedNetworkImage(
    imageUrl: recipe.image ?? '',
    alignment: Alignment.topLeft,
    fit: BoxFit.fill,
    width: size.width,
  ),
),

This will use the image from the recipe instead of the placeholder image.

Note: If you still have the commented out Align() from the last chapter, delete it.

Now, you can finally replace the specific service with your interface. Open ui/recipes/recipe_list.dart, replace the existing import of mock_service.dart with the following import:

import '../../network/service_interface.dart';

Next, in _buildRecipeLoader(), replace the line below // TODO: replace with new interface with:

future: Provider.of<ServiceInterface>(context).queryRecipes(

You’re now ready to integrate the new code based on streams. Fasten your seat belt! :]

Adding streams to Bookmarks

The Bookmarks page uses Consumer, but you want to change it to a stream so it can react when a user bookmarks a recipe. To do this, you need to replace the reference to MemoryRepository with Repository and use a StreamBuilder widget.

Start by opening ui/myrecipes/my_recipe_list.dart and changing the memory_repositoryimport to:

import '../../data/repository.dart';

Inside _buildRecipeList(), replace the return Consumer and the lines below it through recipes = repository.findAllRecipes(); with the following:

// 1
final repository = Provider.of<Repository>(context, listen: false);
// 2
return StreamBuilder<List<Recipe>>(
  // 3
  stream: repository.watchAllRecipes(),
  // 4
  builder: (context, AsyncSnapshot<List<Recipe>> snapshot) {
    // 5
    if (snapshot.connectionState == ConnectionState.active) {
      // 6
      final recipes = snapshot.data ?? [];

Don’t worry about the red squiggles for now. This code:

  1. Uses Provider to get your Repository.
  2. Uses StreamBuilder, which uses a List<Recipe> stream type.
  3. Uses the new watchAllRecipes() to return a stream of recipes for the builder to use.
  4. Uses the builder callback to receive your snapshot.
  5. Checks the state of the connection. When the state is active, you have data.

Now, at the end of _buildRecipeList(), find // TODO: Add else here and replace it with:

} else {
  return Container();
}

This returns a container if the snapshot isn’t ready.

Just like before, if this is still in place, replace:

// TODO: Replace with image from recipe
leading: Image.asset(
  'assets/images/pizza_w700.png',
  height: 200,
  width: 200,
)

with:

leading: CachedNetworkImage(
  imageUrl: recipe.image ?? '',
  height: 120,
  width: 60,
  fit: BoxFit.cover,
),

Note: If you still have the commented out leading: CachedNetworkImage() from the last chapter, delete it.

Next, in deleteRecipe(), change MemoryRepository to Repository so it looks like this:

void deleteRecipe(Repository repository, Recipe recipe) async {

Note: You can always use Android Studio’s Reformat Code command from the Codemenu to clean up your formatting.

All these changes ensure that the class depends on the new, generic Repository.

At this point, you’ve achieved one of your two goals: you’ve changed the Recipes screen to use streams. Next, you’ll do the same for the Groceries tab.

Adding streams to Groceries

Start by opening ui/shopping/shopping_list.dart and replacing the memory_repository.dartimport with:

import '../../data/repository.dart';
import '../../data/models/ingredient.dart';

Just as you did in the last section, find the build() method and change the return Consumer line and the lines below up to the line above return ListView.builder( with:

final repository = Provider.of<Repository>(context);
return StreamBuilder(
  stream: repository.watchAllIngredients(),
  builder: (context, snapshot) {
    if (snapshot.connectionState == ConnectionState.active) {
      final ingredients = snapshot.data as List<Ingredient>?;
      if (ingredients == null) {
        return Container();
      }

Once again, ignore the red squiggles. This is just like the code from Recipe Details, except it uses watchAllIngredients().

Next, at the end of ListView.builder replace // TODO: Add else here with:

} else {
  return Container();
}

As before, this just returns a container if the snapshot isn’t ready.

No more red squiggles. Yay! :]

Finally, open up ui/recipe_card.dart and find: // TODO: Replace with image from recipe and replace:

// TODO: Replace with image from recipe
child: Image.asset(
  'assets/images/pizza_w700.png',
  height: 200,
  width: 200,
),

with:

child: CachedNetworkImage(
  imageUrl: recipe.image,
  height: 210,
  fit: BoxFit.fill,
),

Add the following import:

import 'package:cached_network_image/cached_network_image.dart';

Stop and restart your app and make sure it works as before. Your main screen will look something like this after a search:

img

Click a recipe. The Details page will look like this:

img

Next, click the Bookmark button to return to the Recipes screen, then go to the Bookmarkspage to see the recipe you just added:

img

Finally, go to the Groceries tab and make sure the recipe ingredients are all showing.

img

Congratulations! You’re now using streams to control the flow of data. If any of the screens change, the other screens will know about that change and will update the screen.

You’re also using the Repository interface so you can go back and forth between a memory class and a different type in the future.

Key points

  • Streams are a way to asynchronously send data to other parts of your app.
  • You usually create streams by using StreamController.
  • Use StreamBuilder to add a stream to your UI.
  • Abstract classes, or interfaces, are a great way to abstract functionality.

Where to go from here?

In this chapter, you learned how to use streams. If you want to learn more about the topic, visit the Dart documentation at https://dart.dev/tutorials/language/streams.

In the next chapter, you’ll learn about databases and how to persist your data locally.