14 Working With StreamsWritten by Kevin D Moore¶
Imagine yourself sitting by a creek, having a wonderful time. While watching the water flow, you see a piece of wood or a leaf floating down the stream and you decide to take it out of the water. You could even have someone upstream purposely float things down the creek for you to grab.
You can imagine Dart streams in a similar way: as data flowing down a creek, waiting for someone to grab it. That’s what a stream does in Dart — it sends data events for a listener to grab.
With Dart streams, you can send one data event at a time while other parts of your app listen for those events. Such events can be collections, maps or any other type of data you’ve created.
Streams can send errors in addition to data; you can also stop the stream, if you need to.
In this chapter, you’ll update your recipe project to use streams in two different locations. You’ll use one for bookmarks, to let the user mark favorite recipes and automatically update the UI to display them. You’ll use the second to update your ingredient and grocery lists.
But before you jump into the code, you’ll learn more about how streams work.
Types of streams¶
Streams are part of Dart, and Flutter inherits them. There are two types of streams in Flutter: single subscription streams and broadcast streams.
Single subscription streams are the default. They work well when you’re only using a particular stream on one screen.
A single subscription stream can only be listened to once. It doesn’t start generating events until it has a listener and it stops sending events when the listener stops listening, even if the source of events could still provide more data.
Single subscription streams are useful to download a file or for any single-use operation. For example, a widget can subscribe to a stream to receive updates about a value, like the progress of a download, and update its UI accordingly.
If you need multiple parts of your app to access the same stream, use a broadcast stream, instead.
A broadcast stream allows any number of listeners. It fires when its events are ready, whether there are listeners or not.
To create a broadcast stream, you simply call asBroadcastStream()
on an existing single subscription stream.
final broadcastStream = singleStream.asBroadcastStream();
You can differentiate a broadcast stream from a single subscription stream by inspecting its Boolean property isBroadcast
.
In Flutter, there are some key classes built on top of Stream
that simplify programming with streams.
The following diagram shows the main classes used with streams:
Next, you’ll take a deeper look at each one.
StreamController and sink¶
When you create a stream, you usually use StreamController
, which holds both the stream and StreamSink
.
Sink¶
A sink is a destination for data. When you want to add data to a stream, you will add it to the sink. Since the StreamController
owns the sink, it listens for data on the sink and sends the data to it’s stream listeners.
Here’s an example that uses StreamController
:
final _recipeStreamController = StreamController<List<Recipe>>();
final _stream = _recipeStreamController.stream;
To add data to a stream, you add it to its sink:
_recipeStreamController.sink.add(_recipesList);
This uses the sink field of the controller to “place” a list of recipes on the stream. That data will be sent to any current listeners.
When you’re done with the stream, make sure you close it, like this:
_recipeStreamController.close();
StreamSubscription¶
Using listen()
on a stream returns a StreamSubscription
. You can use this subscription class to cancel the stream when you’re done, like this:
StreamSubscription subscription = stream.listen((value) {
print('Value from controller: $value');
});
...
...
// You are done with the subscription
subscription.cancel();
Sometimes, it’s helpful to have an automated mechanism to avoid managing subscriptions manually. That’s where StreamBuilder
comes in.
StreamBuilder¶
StreamBuilder
is handy when you want to use a stream. It takes two parameters: a stream and a builder. As you receive data from the stream, the builder takes care of building or updating the UI.
Here’s an example:
final repository = Provider.of<Repository>(context, listen: false);
return StreamBuilder<List<Recipe>>(
stream: repository.recipesStream(),
builder: (context, AsyncSnapshot<List<Recipe>> snapshot) {
// extract recipes from snapshot and build the view
}
)
...
StreamBuilder
is handy because you don’t need to use a subscription directly and it unsubscribes from the stream automatically when the widget is destroyed.
Now that you understand how streams work, you’ll convert your existing project to use them.
Adding streams to Recipe Finder¶
You’re now ready to start working on your recipe project. If you’re following along with your app from the previous chapters, open it and keep using it with this chapter. If not, just locate the projects folder for this chapter and open starter in Android Studio.
Note: If you use the starter app, don’t forget to add your
apiKey
andapiId
in network/recipe_service.dart.
To convert your project to use streams, you need to change the memory repository class to add two new methods that return one stream for recipes and another for ingredients. Instead of just returning a list of static recipes, you’ll use streams to modify that list and refresh the UI to display the change.
This is what the flow of the app looks like:
Here, you can see that the RecipeList screen has a list of recipes. Bookmarking a recipe adds it to the bookmarked recipe list and updates both the bookmark and the groceries screens.
You’ll start by converting your repository code to return Streams
and Futures
.
Adding futures and streams to the repository¶
Open data/repository.dart and change all of the return types to return a Future
. For example, change the existing findAllRecipes()
to:
Future<List<Recipe>> findAllRecipes();
Do this for all the methods except init()
and close()
.
Your final class should look like:
Future<List<Recipe>> findAllRecipes();
Future<Recipe> findRecipeById(int id);
Future<List<Ingredient>> findAllIngredients();
Future<List<Ingredient>> findRecipeIngredients(int recipeId);
Future<int> insertRecipe(Recipe recipe);
Future<List<int>> insertIngredients(List<Ingredient> ingredients);
Future<void> deleteRecipe(Recipe recipe);
Future<void> deleteIngredient(Ingredient ingredient);
Future<void> deleteIngredients(List<Ingredient> ingredients);
Future<void> deleteRecipeIngredients(int recipeId);
Future init();
void close();
These updates allow you to have methods that work asynchronously to process data from a database or the network.
Next, add two new Stream
s after findAllRecipes()
:
// 1
Stream<List<Recipe>> watchAllRecipes();
// 2
Stream<List<Ingredient>> watchAllIngredients();
Here’s what this code does:
watchAllRecipes()
watches for any changes to the list of recipes. For example, if the user did a new search, it updates the list of recipes and notifies listeners accordingly.watchAllIngredients()
listens for changes in the list of ingredients displayed on the Groceries screen.
You’ve now changed the interface, so you need to update the memory repository.
Cleaning up the repository code¶
Before updating the code to use streams and futures, there are some minor housekeeping updates.
Open data/memory_respository.dart, import the the Dart async library:
import 'dart:async';
Now remove:
import 'package:flutter/foundation.dart';
Then, update the MemoryRepository
class definition to remove ChangeNotifier
, so it looks like:
class MemoryRepository extends Repository {
Next, add a few new fields after the existing two List
declarations, ignoring all the red squiggles:
//1
Stream<List<Recipe>>? _recipeStream;
Stream<List<Ingredient>>? _ingredientStream;
// 2
final StreamController _recipeStreamController =
StreamController<List<Recipe>>();
final StreamController _ingredientStreamController =
StreamController<List<Ingredient>>();
Here’s what’s going on:
_recipeStream
and_ingredientStream
are private fields for the streams. These will be captured the first time a stream is requested, which prevents new streams from being created for each call.- Creates
StreamController
s for recipes and ingredients.
And now, add these new methods before findAllRecipes()
:
// 3
@override
Stream<List<Recipe>> watchAllRecipes() {
_recipeStream ??= _recipeStreamController.stream as Stream<List<Recipe>>;
return _recipeStream!;
}
// 4
@override
Stream<List<Ingredient>> watchAllIngredients() {
_ingredientStream ??=
_ingredientStreamController.stream as Stream<List<Ingredient>>;
return _ingredientStream!;
}
These streams will:
- Check to see if you already have the stream. If not, call the stream method, which creates a new stream, then return it.
- Do the same for ingredients.
Updating the existing repository¶
MemoryRepository
is full of red squiggles. That’s because the methods all use the old signatures, and everything’s now based on Future
s.
Still in data/memory_repository.dart, replace the existing findAllRecipes()
with this:
@override
// 1
Future<List<Recipe>> findAllRecipes() {
// 2
return Future.value(_currentRecipes);
}
These updates:
- Change the method to return a
Future
. - Wrap the return value with a
Future.value()
.
There are a few more updates you need to make before moving on to the next section.
First, in init()
remove the null
from the return
statement so it looks like this:
@override
Future init() {
return Future.value();
}
Then update close()
so it closes the streams.
@override
void close() {
_recipeStreamController.close();
_ingredientStreamController.close();
}
In the next section, you’ll update the remaining methods to return futures and add data to the stream using StreamController
.
Sending recipes over the stream¶
As you learned earlier, StreamController
’s sink
property adds data to streams. Since this happens in the future, you need to change the return type to Future
and then update the methods to add data to the stream.
To start, change insertRecipe()
to:
@override
// 1
Future<int> insertRecipe(Recipe recipe) {
_currentRecipes.add(recipe);
// 2
_recipeStreamController.sink.add(_currentRecipes);
if (recipe.ingredients != null) {
insertIngredients(recipe.ingredients!);
}
// 3
// 4
return Future.value(0);
}
Here’s what you have updated:
- Update the method’s return type to be
Future
. - Add
_currentRecipes
to the recipesink
. You might wonder why you calladd()
with the same list instead of adding a single ingredient or recipe. The reason is that the stream expects a list, not a single value. By doing it this way, you replace the previous list with the updated one. - Removed
notifyListeners();
. - Return a
Future
value. You’ll learn how to return the ID of the new item in a later chapter.
This replaces the previous list with the new list and notifies any stream listeners that the data has changed.
Now that you know how to convert the first method, it’s time to convert the rest of the methods as an exercise. Don’t worry, you can do it! :]
Exercise¶
Convert the remaining methods, just like you just did with insertRecipe()
. You’ll need to do the following:
- Update
MemoryRepository
methods to return aFuture
that matches the newRepository
interface methods. - For all methods that change a watched item, add a call to add the item to the
sink
. - Remove all the calls to
notifyListeners()
. Hint: not all methods have this statement. - Wrap the return values in
Future
s.
For a method that returns a Future<void>
, what do you think the return will look like? Got it? There might be a future for you yet.
return Future.value();
If you get stuck, check out memory_repository.dart in this chapter’s challenge folder — but first give it your best shot!
After you complete the exercise, MemoryRepository
shouldn’t have any more red squiggles — but you still have a few more tweaks to make before you can run your new, stream-powered app.
Note: It’s very important that your add recipes to the
_recipeStreamController.sink
method for recipes and_ingredientStreamController.sink
for ingredients. To make sure you did it correctly, check the challenge project. You’ll need to do the same for the delete methods as well.
Switching between services¶
In the previous chapter, you created a MockService
to provide local data that never changes, but you also have access to RecipeService
. It’s still a bit tedious to switch between the two, so you’ll take care of that before integrating streams.
An easy way to do that is with an interface — or, as it’s known in Dart, an abstract class. Remember that an interface or abstract class is just a contract that implementing classes will provide the given methods.
Once you create your interface, it will look like this:
To start creating the interface, go to the network folder, create a new Dart file named service_interface.dart and add the following imports:
import 'package:chopper/chopper.dart';
import 'model_response.dart';
import 'recipe_model.dart';
Next, add a new class:
abstract class ServiceInterface {
Future<Response<Result<APIRecipeQuery>>> queryRecipes(
String query,
int from,
int to,
);
}
This defines a class with one method named queryRecipes()
. It has the same parameters and return values as RecipeService
and MockService
. By having each service implement this interface, you can change the providers to provide this interface instead of a specific class.
Implementing the new service interface¶
Open network/recipe_service.dart and add the service_interface import:
import 'service_interface.dart';
Now, have RecipeService
implement ServiceInterface
:
abstract class RecipeService extends ChopperService
implements ServiceInterface {
Then, add @override
right above:
@Get(path: 'search')
Next, do the same in mock_service/mock_service.dart. Add the service_interface import:
import '../network/service_interface.dart';
Now, have the service implement this interface:
class MockService implements ServiceInterface {
and add the @override
above queryRecipes()
.
With this done, you can now change the provider in main.dart.
Changing the provider¶
You’ll now adopt the new service interface instead of the specific services you used in the current code.
Open main.dart and add these imports:
import 'data/repository.dart';
import 'network/recipe_service.dart';
import 'network/service_interface.dart';
Then, remove the existing import of mock_service.dart.
Find // TODO: Update ChangeNotifierProvider
replace it and ChangeNotifierProvider()
with this:
Provider<Repository>(
lazy: false,
create: (_) => MemoryRepository(),
),
When you provide a Repository()
, you can change the type of repository you create. Here, you’re using MemoryRepository()
, but you could also use something else, as you’ll do in the next chapter.
Now, just below the last change, replace the other Provider()
with:
Provider<ServiceInterface>(
create: (_) => RecipeService.create(),
lazy: false,
),
Here, you use RecipeService()
, but if you start having problems with API rate-limiting, you can switch to MockService()
.
Next, open ui/recipes/recipe_details.dart and replace the memory_repository import with the Repository
import:
import '../../data/repository.dart';
If it doesn’t already exist, add an import for the cached image package:
import 'package:cached_network_image/cached_network_image.dart';
Now, replace // TODO: change to new repository
and the line beneath it with:
final repository = Provider.of<Repository>(context);
Replace:
// Comment out Align()
Align(
alignment: Alignment.topCenter,
child: Image.asset(
'assets/images/pizza_w700.png',
height: 200,
width: 200,
),
)
with:
Align(
alignment: Alignment.topLeft,
child: CachedNetworkImage(
imageUrl: recipe.image ?? '',
alignment: Alignment.topLeft,
fit: BoxFit.fill,
width: size.width,
),
),
This will use the image from the recipe instead of the placeholder image.
Note: If you still have the commented out
Align()
from the last chapter, delete it.
Now, you can finally replace the specific service with your interface. Open ui/recipes/recipe_list.dart, replace the existing import of mock_service.dart with the following import:
import '../../network/service_interface.dart';
Next, in _buildRecipeLoader()
, replace the line below // TODO: replace with new interface
with:
future: Provider.of<ServiceInterface>(context).queryRecipes(
You’re now ready to integrate the new code based on streams. Fasten your seat belt! :]
Adding streams to Bookmarks¶
The Bookmarks page uses Consumer
, but you want to change it to a stream so it can react when a user bookmarks a recipe. To do this, you need to replace the reference to MemoryRepository
with Repository
and use a StreamBuilder
widget.
Start by opening ui/myrecipes/my_recipe_list.dart and changing the memory_repository
import to:
import '../../data/repository.dart';
Inside _buildRecipeList()
, replace the return Consumer
and the lines below it through recipes = repository.findAllRecipes();
with the following:
// 1
final repository = Provider.of<Repository>(context, listen: false);
// 2
return StreamBuilder<List<Recipe>>(
// 3
stream: repository.watchAllRecipes(),
// 4
builder: (context, AsyncSnapshot<List<Recipe>> snapshot) {
// 5
if (snapshot.connectionState == ConnectionState.active) {
// 6
final recipes = snapshot.data ?? [];
Don’t worry about the red squiggles for now. This code:
- Uses
Provider
to get yourRepository
. - Uses
StreamBuilder
, which uses aList<Recipe>
stream type. - Uses the new
watchAllRecipes()
to return a stream of recipes for the builder to use. - Uses the builder callback to receive your snapshot.
- Checks the state of the connection. When the state is active, you have data.
Now, at the end of _buildRecipeList()
, find // TODO: Add else here
and replace it with:
} else {
return Container();
}
This returns a container if the snapshot isn’t ready.
Just like before, if this is still in place, replace:
// TODO: Replace with image from recipe
leading: Image.asset(
'assets/images/pizza_w700.png',
height: 200,
width: 200,
)
with:
leading: CachedNetworkImage(
imageUrl: recipe.image ?? '',
height: 120,
width: 60,
fit: BoxFit.cover,
),
Note: If you still have the commented out
leading: CachedNetworkImage()
from the last chapter, delete it.
Next, in deleteRecipe()
, change MemoryRepository
to Repository
so it looks like this:
void deleteRecipe(Repository repository, Recipe recipe) async {
Note: You can always use Android Studio’s Reformat Code command from the Codemenu to clean up your formatting.
All these changes ensure that the class depends on the new, generic Repository
.
At this point, you’ve achieved one of your two goals: you’ve changed the Recipes screen to use streams. Next, you’ll do the same for the Groceries tab.
Adding streams to Groceries¶
Start by opening ui/shopping/shopping_list.dart and replacing the memory_repository.dartimport with:
import '../../data/repository.dart';
import '../../data/models/ingredient.dart';
Just as you did in the last section, find the build()
method and change the return Consumer
line and the lines below up to the line above return ListView.builder(
with:
final repository = Provider.of<Repository>(context);
return StreamBuilder(
stream: repository.watchAllIngredients(),
builder: (context, snapshot) {
if (snapshot.connectionState == ConnectionState.active) {
final ingredients = snapshot.data as List<Ingredient>?;
if (ingredients == null) {
return Container();
}
Once again, ignore the red squiggles. This is just like the code from Recipe Details, except it uses watchAllIngredients()
.
Next, at the end of ListView.builder
replace // TODO: Add else here
with:
} else {
return Container();
}
As before, this just returns a container if the snapshot isn’t ready.
No more red squiggles. Yay! :]
Finally, open up ui/recipe_card.dart and find: // TODO: Replace with image from recipe
and replace:
// TODO: Replace with image from recipe
child: Image.asset(
'assets/images/pizza_w700.png',
height: 200,
width: 200,
),
with:
child: CachedNetworkImage(
imageUrl: recipe.image,
height: 210,
fit: BoxFit.fill,
),
Add the following import:
import 'package:cached_network_image/cached_network_image.dart';
Stop and restart your app and make sure it works as before. Your main screen will look something like this after a search:
Click a recipe. The Details page will look like this:
Next, click the Bookmark button to return to the Recipes screen, then go to the Bookmarkspage to see the recipe you just added:
Finally, go to the Groceries tab and make sure the recipe ingredients are all showing.
Congratulations! You’re now using streams to control the flow of data. If any of the screens change, the other screens will know about that change and will update the screen.
You’re also using the Repository
interface so you can go back and forth between a memory class and a different type in the future.
Key points¶
- Streams are a way to asynchronously send data to other parts of your app.
- You usually create streams by using
StreamController
. - Use
StreamBuilder
to add a stream to your UI. - Abstract classes, or interfaces, are a great way to abstract functionality.
Where to go from here?¶
In this chapter, you learned how to use streams. If you want to learn more about the topic, visit the Dart documentation at https://dart.dev/tutorials/language/streams.
In the next chapter, you’ll learn about databases and how to persist your data locally.