먼저 Stream을 왜 쓰는지부터 생각해보자.
Stream은 데이터가 사용 가능해지면 생산자에서 소비자로 데이터를 전달한다. 이는 사용하지 못하는 데이터의 접근을 방지해 준다.
두번재로 보통 데이터를 영역과 UI영역을 분리하는데 이때 데이터를 UI에 전달 해주는 매개체가 필요할 것이다. Stream은 데이터를 수월하게 전달하는 매개체 역할을 한다. Stream은 생산자와 소비자로 영역을 분리하여 먼 곳까지 데이터를 전달해 줄 수 있다.
- 데이터가 사용 가능한 상태 일때 기다렸다가 데이터를 사용하게 해준다.
- 데이터 영역과 처리영역으로 코드를 분리 할수 있다.
- 데이터를 쉽게 전달해주는 인터페이스를 제공 해준다.
- 데이터를 가공하고 재생산 할수 있어 원본 데이터의 재사용성을 극대화 한다.
데이터 하나하나를 수신하여 처리하려 할때 사용한다. 데이터를 수신하면서 변경/가공 할수 있는 함수들을 제공해주고 있다. Iterable을 리턴해야하고 함수에 sync* 키워드를 붙인다. yield가 호출될때마다 Iterable에 값을 넣어주고 쌓인 값을 리턴한다.
Iterable<String> stm() sync*{
for (int i = 1; i <= 10; i++) {
String sendMsg = "message:$i";
print(sendMsg);
yield sendMsg;
}
}
stm().forEach((receive) => print('receive = $receive'));
/* 출력
message:1
receive = message:1
...
receive = message:10
/*
출력 내용을 보면 한번에 데이터를 보내는것이 아닌 yield가 호출될 때마다 수신한다. 데이터를 생성하는 부분과(Iterable) 처리부가(forEach) 나눠지는 것을 볼수 있다. 이러한 장점은 데이터를 가공할때 원본 데이터를 수정/변경 하지 않고 데이터를 가공하여 만들수 있다.
Note : sync*와 Iterable 리턴을 한쌍으로 함수를 만든다.
async*, yeild는 데이터를 비동기로 주기적으로 받아야 할때 사용한다. Iterable과 같이 변경/가공 할수 있는 기능들을 제공해 준다.
Stream을 리턴해야하고 함수에 async* 키워드를 붙인다. yield는 구독하는 곳에 리턴할 데이터이다. 그리고 Stream.listen으로 생산된 데이터를 전달 받을 수 있다.
Note : async/yeild 로만든 Stream은 데이터 생산자, Stream.listen은 소비자 이다.
Stream<String> count() async* {
for (int i = 1; i <= 10; i++) {
await Future.delayed(Duration(seconds: 1)); // 없어도됨. 비동기 코드가 들어가되 되는지 확인하려 넣은 코드
yield 'message:$i';
}
}
print("read");
stream.listen((receive) { print('receive = $receive');});
print("end");
//출력
//read
//end
//receive = message:1
//receive = message:2
//....
Note : async*와 Stream 리턴을 한쌍으로 함수를 만든다.
비동기로 출력하는 것을 볼수 있다. 작업이 완료되고 결과를 받고 싶거나 Future처럼 await걸어 옵션널하게 동기화를 걸고 싶을 수 있다. 하지만 stream은 Future가 아니기 때문에 await를 걸수 없다. 대신 Future를 return하는 함수를 제공함으로써 await 및 Future의 기능을 활용할 수 있게 해준다. 대표적인 것이 forEach 함수이다.
await stream.forEach((receive) {print('receive = $receive'); });
//또는 for in 도 같은 역할을함
// await for(var receive in stream){print('receive = $receive');}
//출력
//read
//receive = message:1
//receive = message:2
//....
//end
yield는 값을 리턴하는 반면 yield* Stream을 리턴한다. yield*은 다른 스트림과 연결하는데 사용한다고 볼수 있다.
Stream<int> stm(int count) async*{
yield* stm2(count);
}
Stream<int> stm2(int count) async*{
yield count*10;
}
stm(3).listen(print);
/* 출력
30
*/
Note : yield*은 재귀 호출로 사용할수 있다.
Stream은 lazy로 실행된다. 즉 요구가 있을때 그때 코드를 실행하게 된다. Stream의 경우 listen이 있을때 해당 코드를 lazy하게 실행하게 된다.
Stream<int> stm(int count) async*{
print('3초 후 실행');
yield count;
}
Stream<int> stream = stm(5);
Future.delayed(Duration(seconds: 3), (){
stream.listen((event) {print('listen:$event');});
});
/* 출력
3초 후 실행
listen:5
*/
출력 내용과 같이 listen이 요구될때 stm을 실행하는것을 볼수 있다. lazy로딩은 때문에 Stream 객체를 만들놓고 필요한 곳에서 listen을 할 수 있다. 이것은 실제 데이터 등은 만드는 곳과 데이터를 처리하는 영역을 분리할 수 있음을 의미한다.
일반적으로 lazy로딩은 작업을 분산시켜 성능에 향상에 도움이 된다. 하지만 로딩(앱의 인트로 같은곳) 타임에 오래걸리는 데이터 연산을 하지 않고 lazy하게 한다면 데이터가 필요한 시점에 연산을 하게되어 원치 않는 delay를 줄 수 있어 이점에 주의해야 한다. 연산시간을 할얘 받는곳이 있다면 여기서 연산을 하고 Stream은 그 결과를 보내주는 형태로 만드는 것이 좋다.
Stream은 이벤트나 특정 작업에 데한 데이터를 주기적으로 받는 구독이란 개념을 가지고 있다. 따라서 구독을 해제 한다던지 구독중 오류나 구독인 만료 되었을때 처리가 필요한다. 이때 쓰이는 것이 StreamSubscription 이다. stream.listen 함수는 StreamSubscription을 리턴하고 있다. StreamSubscription으로 Stream 구독을 cancel 할수 있고 완료와 에러처리도 지원한다.
StreamSubscription subscription = stream.listen((event) => print(event));
subscription.cancel(); // 구독 취소
subscription.onError(()=>print('error')); // 에러. 에러를 구현한 추가적인 코드가 필요하다.
subscription.onDone(()=>print('done')); // 완료
subscription.onData((data)=>print('data=$data')); //onData를 사용하면 listen안에 함수를 실행하지 않고 onData 안에 함수를 인터셉터하여 실행한다.
//또다른 방법이 있는데 Stream listen함수 안에 onError 등을 매개변수로 지원한다.
//StreamSubscription 동작을 한다.
stream.listen((data)=>print('data=$data'),
onError: ()=>print('error'),
onDone: ()=>print('done'),
);
주의 해야할점은 stream.cancel에 제약이 있다는 것이다.
Stream<int> stm() async*{
int count = 5;
for (int c = 1; c <= count; c++) {
for (int i = 0; i < 100000; i++) {
for (int j = 0; j < 15000; j++) {
100000 * 100;
}
}
yield c;
}
}
void main() async {
print('ready');
StreamSubscription? subscription = null;
Future.delayed(Duration(seconds: 1), (){
subscription!.cancel();
print('cancel');
});
subscription = stm().listen((event) => print('listen:$event'));
print('end');
}
/* 출력
ready
end
listen:1
...
listen:5
cancel
*/
delay가 1초가 지나면 cancel되게 했다. 그리고 listen보다 먼저 실행했다. 1초가 충분히 지났음에도 불구하고 출력내용을 보면 cancel이 맨 마지막에 나온다.
Stream에 컴퓨팅 작업을 하게되면 Block가 걸리게 된다. 이후 cancel 코드는 Stream 작업 이후 불리게 된다. 따라서 cancel을 Future로 호출한다 하더라도 맨마지막에 호출되어 cancel 되지 않는다.
위의 내용으로 2가지를 알수 있다.
- Stream 안에 코드가 다실행 될때까지 Block이 걸린다.
- Stream의 관련 코드는 해당 코드블럭에 맨 마지막에 실행된다.
Stream은 사실 완전하게 비동기로 동작하지 않는 것을 볼수 있다. 같은 레벨의 코드 블럭의 맨 나중에 실행되기 때문에 비동기 같은 느낌이 나는 것이다.
아래는 Stream의 listen이 비동기가 아니라 맨 코드블럭이 끝나고 호출되는 예제이다.
Stream<int> stm() async* {
int count = 5;
for (int c = 1; c <= count; c++) {
for (int i = 0; i < 100000; i++) {
for (int j = 0; j < 15000; j++) {
100000 * 100;
}
}
yield c;
}
}
void main() async {
print('ready');
StreamSubscription subscription = stm().listen((event) => print('listen:$event'));
for (int c = 1; c <= 3; c++) {
for (int i = 0; i < 100000; i++) {
for (int j = 0; j < 15000; j++) {
100000 * 100;
}
}
print("code:$c");
}
print('end');
/* 출력
ready
code:1
code:2
code:3
end
listen:1
...
listen:t
/*
}
결론적으로 Stream은 Future와 마찬가지로 연산코드는 비동기로 실행되지 않는다. Stream에 연산코드가 많다면 프로그램이 block이 걸려 실행이 느려질수 있다. 시간 delay나 사용자 입력 리스닝 같은 연산없이 사용해야 좋다. 연산에 대한 비동기가 필요하다면 리스닝 후 isolate를 이용하여 비동기를 실행하여야 할 것이다.
Note : Stream은 같은 레벨의 코들 블럭의 컴퓨팅만큼 block이 걸린다.
아래는 연산없이 time delay이로 대기후 알림을 주는 구독형태의 올바 스트림 예제이다.
Stream<int> stm() async*{
for(int i=1;i<=10;i++){
await Future.delayed(Duration(seconds:1));
yield i;
}
}
void main() async{
print('ready');
StreamSubscription subscription= stm().listen((event){print('$event초');});
//3초후 구독을 취소한다.
Future.delayed(Duration(seconds:3), ()=>subscription.cancel());
print('end');
}
/* 출력
ready
1초
2초
end
*/
Stream 자체를 옵저버 형태로 사용하기에는 좀 어려움이 있다. UI나 특정 이벤트를 옵저빙 하려 할때는 사용하기 힘들다. 아래 코드처럼 사용자의 input값을 받아 처리한다고 가정하자.
Stream<int> stm(int count) async*{
yield count;
}
int input = 1;
stm(input).listen((event) => print('listen:$event'));
input = 2;
stm(input).listen((event) => print('listen:$event'));
input = 3;
stm(input).listen((event) => print('listen:$event'));
여기에는 두가지 문제점이 있다.
- 사용자 input은 미리 정해져 있지 않기 때문에 연산영역(Stream 객체)와 처리영역(listen)을 분리 할수 없다.
- input이 들어 올때마다 listen을 해줘야한다.
데이터 영역과 UI영역이 분리된 상태에서 특정 이벤트가 발생했을때 데이터를 받고 싶을 때가 있다. 다시 말해 단순히 데이터를 전달 받는 옵저버 형태의 Stream이 필요할때가 많다. 옵저버의 조건은 이벤트가 발생할떄 까지 기다리는 '대기'와 이벤트가 발생할때 마다 알려주는 '알림(구독)' 한다는 조건이 있다.
위 코드를 보면 사용자의 입력에 따른 대기도 할수 없으며 하나의 Stream에 listen 한번만 허용되기 때문에 사용자 입력이 발생할때 마다 알림을 수도 없다.
StreamController<int> streamController = StreamController<int>();
streamController.stream.listen((event) => print('listen:$event'));
streamController.sink.add(1);
streamController.sink.add(2);
streamController.sink.add(3);
//사용이 다끝나면 반드시 종료 해줘야함
streamController.close();
/* 출력
listen:1
listen:2
listen:3
*/
StreamController클래스 내부 stream이 객체를 생성한다. stream 객체에 listen으로 데이터를 구독 할수 있다. 이후 필요할때 마다streamController.sink.add 호출하여 listen하는 곳에 데이터를 전달할 수있다. sink.add(data)의 매개변수(data)가 listen((event)..)의 event로 전달된다.
Note : StreamController.add와 sink.add의 차의점
streamController.add와 Sink.add는 동일한 기능을 하는데 두가지의 기능적 차이점은 없다.
다만 streamController.sink는 StreamSink의 인터페이스로 객체로 sink의 기능들만 사용할수 있게 된다.
참고자료 : https://api.dart.dev/stable/2.5.0/dart-async/StreamController/sink.html
streamController의 stream은 무한정 대기하며 이벤트를 기다린다. 따라서 필요가 없을시 대기를 종료해주는 streamController.close를 반드시 종료 해줘야 한다.
close로 Stream을 닫았을때 더이상 사용할수 없고 add를 하게되면 'Cannot add new events after calling close' 에러가 발생한다.
Note : streamController.close는 Future를 리턴하는데 Stream이 즉시 종료되지 않을 수 있음을 의미한다. Stream의 종료 이후 필요한 작업은 Future의 then을 사용하는 것이 안전할 것이다.
Stream은 한곳에서만 구독을(listen) 할수 있다. Broadcast로 여러곳에서 구독을 할 수 있다.
StreamController.broadcast()로 StreamController객체를 생성하면 여러곳에서 listen을 할수 있다.
StreamController<int> streamController = StreamController.broadcast();
streamController.stream.listen((event) => print('listen1:$event'));
streamController.stream.listen((event) => print('listen2:$event'));
streamController.sink.add(1);
streamController.sink.add(2);
streamController.close();
/* 출력
listen1:1
listen2:1
listen1:2
listen2:2
*/
UI화면은 StatelessWidget과 StatefulWidget으로 상태관리에 따라 widget을 나누게 된다.
주기적으로 상태에 따라 UI화면을 갱신하기 위해 Stream을 사용할수 있다.
아래는 Stream에 값에 따라 색상이 변하는 예제이다.
class MyApp extends StatefulWidget {
@override
State<StatefulWidget> createState() {
return _MyApp();
}
}
class _MyApp extends State<MyApp> {
Color color = Colors.red;
Stream<Color> chageColor() async* {
await Future.delayed(Duration(seconds: 3));
yield Colors.blue;
await Future.delayed(Duration(seconds: 3));
yield Colors.yellow;
}
@override
void initState() {
super.initState();
chageColor().listen((newColor) {
setState(() {
color = newColor;
});
});
}
@override
Widget build(BuildContext context) {
return MaterialApp(
home: Scaffold(
body: Center(
child: Container(
width: 300,
height: 300,
color: color,
),
),
),
);
}
}
위 코드의 실행 기준으로 단계를 보면 3가지가 있다.
- Stream 객체 생성
- Stream Listen
- 데이터 수신시 setState 사용
데이터를 동적으로 수신하고 UI를 갱신하기 위해서는 StatefulWidget을 써야한다. 데이터 수신시 상태 변경을 위해 setState를 필수적으로 호출할 것이다. setState는 State클래스의 함수이기 때문에 데이터 수신부를 다른 클래스나 파일로 분리하게 된다면 State객체를 넘겨줘야 할것이다. 이는 전체적인 구조를 잡는데 불편할 뿐만 아니라 코드가 복잡해 지고 문제가 발생할 여지가 있다. 이 밖에도 Widget이 dispose될때 취소도 해줘야한다.
Flutter에선 이런 반복적인 일을 대신 주는 패턴이 있는데 바로 Builder이다. Flutter는 여러가지 Builder들을 제공하는데 그 중 하나가 StreamBuilder이다.
Stream을 사용하면 매번 initState에 listen코드를 넣고 dispose에 cancel을 넣어야 한다. 그리고 화면을 갱신하기 위해 setState도 호출 해줘야한다. 같은 코드를 반복하다 보면 재사용하기 위한 공통클래스를 만들기 마련이다. Flutter에서는 이미 StreamBuilder로 만들어 놨다.
StreamBuilder클래스는 StatefulWidget을 상속 했으며 따라서 Widget고 Widget처럼 사용하면 된다. StatefulWidget의 상태와 관련된 라이프 싸이클도 가지고 있다는 뜻도 된다. 내부적으로 initState와 dispose에 구독,취소를 구현해 놨다. 그리고 데이터 수신부에 setState를 호출해준다.
아래는 StreamBuilder를 사용한 예이다
class MyApp extends StatelessWidget {
Color? color = Colors.red;
Stream<Color> chageColor() async* {
await Future.delayed(Duration(seconds: 3));
yield Colors.blue;
await Future.delayed(Duration(seconds: 3));
yield Colors.yellow;
await Future.delayed(Duration(seconds: 3));
}
late Stream<Color> stm;
MyApp() {
stm = chageColor();
}
@override
Widget build(BuildContext context) {
return MaterialApp(
home: Scaffold(
body: Center(
child: StreamBuilder<Color>(
stream: stm,
builder: (context, snapshot) {
if (snapshot.connectionState == ConnectionState.none) {
print('none');
} else if (snapshot.connectionState == ConnectionState.waiting) {
print('waiting');
} else if (snapshot.connectionState == ConnectionState.active) {
print('active:${snapshot.data}');
color = snapshot.data;
} else if (snapshot.connectionState == ConnectionState.done) {
print('done');
}
return Container(
width: 300,
height: 300,
color: color,
);
},
),
),
),
);
}
}
StreamBuilder의 builder에 구현된 내용은 ConnectionState 상태에 따라 여러번 호출된다. 리턴값으로는 Widget을 리턴 해야한다.
상태에 따라 위젯을 갱신하는 패턴이라 볼수 있다.
주의 해야할 점은 builder가 여러번 호출될 수 있다는 것이다. 그래서 연산이나 객의 참조의 값을 변경하는 작업을 하다보면 예상치 못한 일이 발생할 수 있다.
따라서 Stream에서는 Widget을 결정하는 단순한 상태값만 보내고 builder에서는 상태에 따라 Widget을 갱신하는 로직만 포함하는것이 안전하다.
- SteamController.onListen : SteamController를 사용할 경우만 해당. 초기화 작업을 할때 유용하다
- none : Stream과 연결이 준비되지 않은 상태
- waiting : Stream이 데이터를 보내기를 기다리는 상태
- done : Stream이 종료된 상태
위 예제 코드에는 ConnectionState가 active일때만 새로운 데이터를 전달 받았다. 하지만 아래처럼 마지막에 delay 코드를 삭제 한다면 이야기가 다르다.
Stream<Color> chageColor() async* {
await Future.delayed(Duration(seconds: 3));
yield Colors.blue;
await Future.delayed(Duration(seconds: 3));
yield Colors.yellow;
// await Future.delayed(Duration(seconds: 3)); //delay 코드 삭제
}
...
StreamBuilder<Color>(
stream: stm,
builder: (context, snapshot) {
...
if (snapshot.connectionState == ConnectionState.done) {
// done에 새로운 데이터가 보내짐
print('done:${snapshot.data}');
}
...
Stream이 데이터를 보내고 바로 빠르게 종료 된다면 active가 아닌 done에 데이터가 실려 보내 진다. 따라서 Stream이 빠르게 종료가 되는지 아닌지를 확인하고 data에 대한 대응을 해야한다.
이런일은 상당히 머리를 아프게 한다. 따라서 더더욱이 builder에는 단순히 상태에 따른 Widget만 넘기는 로직만 남기고 done 상태에는 data와 크게 연관성 없게 구성하는 것이 좋다. 아니면 active, done과 상관없이 data에만 반응하도록 구성하는것이 좋다.
위 예제는 ConnectionState와 상관없고 data에만 연관이 있기 때문에 아래 코드처럼 바꾸는것이 좋다.
Widget build(BuildContext context) {
return MaterialApp(
home: Scaffold(
body: Center(
child: StreamBuilder<Color>(
stream: stm,
builder: (context, snapshot) {
if(snapshot.hasData){
color = snapshot.data;
}
return Container(
width: 300,
height: 300,
color: color,
);
},
),
),
),
);
}
상태를 전달하는 것에 있어 StreamController를 사용하면 좀더 직관적으로 StreamBuilder에 데이터를 전달할 수있다. StreamController를.sink.add 통해 상태 데이터를 바로 보낼 수 있다.
class MyApp extends StatelessWidget {
Color? color = Colors.red;
StreamController<Color> controller = StreamController();
MyApp(){
controller.onListen = () async{
print('onListen');
await Future.delayed(Duration(seconds: 3));
controller.sink.add(Colors.cyan);
await Future.delayed(Duration(seconds: 3));
controller.sink.add(Colors.amber);
controller.close();
};
}
@override
Widget build(BuildContext context) {
return MaterialApp(
home: Scaffold(
body: Center(
child: StreamBuilder<Color>(
stream: controller.stream,
builder: (context, snapshot) {
if(snapshot.hasData){
color = snapshot.data;
}
return Container(
width: 300,
height: 300,
color: color,
);
},
),
),
),
);
}
}
StreamController를 종료하지 않으면 필요할때 상태를 전송할수 있어 유용하다. UI의 이벤트가 발생했을때 변경이 필요한 Widget에 상태를 바꿔줄 수 있다.
class MyApp extends StatelessWidget {
Color? color = Colors.red;
StreamController<Color> controller = StreamController();
@override
Widget build(BuildContext context) {
return MaterialApp(
home: Scaffold(
body: Center(
child: StreamBuilder<Color>(
stream: controller.stream,
builder: (context, snapshot) {
if (snapshot.hasData) {
color = snapshot.data;
}
return InkWell(
child: Container(
width: 300,
height: 300,
color: color,
),
onTap: (){
//클릭 했을때 색상을 바꿔줌
if(color == Colors.red){
controller.sink.add(Colors.blue);
}else{
controller.sink.add(Colors.red);
}
},
);
},
),
),
),
);
}
}
위에 코드는 잘 동작하지만 한가지 문제가 있다. StreamController사용하면 반드시 close를 해줘야하기 떄문이다. 그래서 보통 화면이 dispose 시점에 close하는 경우가 많아 StreamController를 사용할 때에는 StatelessWidget을 써야할 경우가 많다.
아래처럼 StreamController를 반드시 종료해 줘야 한다.
Note: 보통 StatelessWidget dispose 시점에 StreamController를 사용하지 않음으로 dispose에 close를 한다.
class MyApp extends StatefulWidget{
@override
State<StatefulWidget> createState() {
return _MyApp();
}
}
class _MyApp extends State<MyApp> {
Color? color = Colors.red;
StreamController<Color> controller = StreamController();
StreamBuilder<int> b = StreamBuilder(builder: (context, snapshot) => Text(''));
@override
Widget build(BuildContext context) {
return MaterialApp(
home: Scaffold(
body: Center(
child: StreamBuilder<Color>(
stream: controller.stream,
builder: (context, snapshot) {
if (snapshot.hasData) {
color = snapshot.data;
}
return InkWell(
child: Container(
width: 300,
height: 300,
color: color,
),
onTap: (){
//클릭 했을때 색상을 바꿔줌
if(color == Colors.red){
controller.sink.add(Colors.blue);
}else{
controller.sink.add(Colors.red);
}
},
);
},
),
),
),
);
}
@override
void dispose() {
//반드시 close를 해야한다.
controller.close();
super.dispose();
}
}
단일 StatefulWidget만들고 앱을 종료하면 dispose가 호출되지 않는 것을 볼수 있다.
dispose 호출은 화면이 사라질때라고 생각하기 쉽지만 사실 Navigator의 pop이 되었을때 호출된다.
문제는 첫페이지에 같은 경우 dispose가 호출되지 않는 것이다. Android Activity가 종료되는 시점을 알아야 되기 때문에 Activity 라이프 사이클을 가져오는 추가적인 코드가 필요하다.
참고문서 : https://docs.flutter.dev/get-started/flutter-for/android-devs#how-do-i-listen-to-android-activity-lifecycle-events