只有记在脑海里的才是自己的,如果还没来得及,请写下来
Dart 中的多线程 与 Future
2020-11-02 / 12 min read

1:Dart中的异步执行

1.1:在开始之前,我们先要了解什么是阻塞与非阻塞?

阻塞:阻塞的系统调用是指,当进行系统调用时,除非出错(被信号打断也视为出错),进程将会一直陷入内核态直到调用完成。

非阻塞:非阻塞的系统调用是指无论I/O操作成功与否,调用都会立即返回。

在《Unix网络编程》一书中提到了五种IO模型,分别是:阻塞IO、非阻塞IO、多路复用IO、信号驱动IO以及异步IO。

关于以上五种IO模型的介绍

1.2:Dart中是如何处理阻塞的

之前有 androidiOS 开发经验的小伙伴都知道什么是异步执行;在移动端都存在多线程这个概念,当我们需要处理一段耗时的代码,就需要开启一个子线程去执行这段代码,防止阻塞主线程,导致界面卡顿。
如果是前端开发的小伙伴就知道,在 js 中,是没有多线程这个概念的,当我们需要去处理耗时操作,我们可以这样写:

new Promise((resolve, reject) =>{
    // 网络请求等耗时操作
}).then((success) => { //成功
   console.log(success);
}).catch((error) => { // 失败
   console.log(error);
});

通过 Promise 对象我们可以异步处理一段阻塞的代码。
js 类似,Dart 也是单线程编程语言,也存在一个特定的方式去处理需要异步执行的代码,如下:

void fut0() {
  final fut0 = Future(() {
    // 网络请求等耗时操作
  }).then((value) => {

  });
}

到这里为止,我们知道在Dart中可以使用Future 对象来处理阻塞的代码。但是具体是如何处理的?

我们知道在操作系统中,提供很多底层的I/O接口,这些接口模型可能是阻塞或者是非阻塞,亦或者是1.1中提到的其他几种模型。

对于不同的I/O模型,Dart 提供了两种不同的处理方式。
在这里,我们先提出以下三种情况,通过对每个情况的分析,来了解异步处理的真正方式

  1. 第一种方式:如果我们调用的方法,在操作系统中存在异步I/O ,则直接交给系统去执行;比如说,网络请求,Socket 本身提供了 select 模型可以异步查询;而文件IO,操作系统也提供了基于事件的回调机制,我们只需要监听到系统执行的回调就行了。
    类似下面的例子:
void fut0() {
  final fut0 = Future(() {
    // 网络请求等耗时操作
    // socket 本身提供了异步查询
  }).then((value) => {

  });   
}
  1. 第二种方式:不是所有的方法系统都有异步I/O,比如我们需要执行数据解析或者复杂的运算,这可能也很耗时;但是这些系统并没有提供异步I/0的操作,我们没办法直接交给系统去执行。那Dart是怎么将这些耗时的操作去异步执行的?

  2. 第三种方式:针对于第二种方式,并且我希望这是一个并发执行的操作

1.3:针对第二种方式处理 -> Dart 中的事件循环与消息队列

  1. 事件循环 (Event Loop):在Dart中,存在isolate的概念,通常我们的代码都运行在一个isolate的独立环境中,在isolate环境中,存在一个事件循环,它在不停干一件事情:从事件队列中取出事件并处理它。
while(true) {
   event = event_queue.first() //取出事件
   handleEvent(event) //处理事件
   drop(event) //从队列中移除
}

类似于iOS的runLoopandroid的Looper,当我们点击屏幕,或者网络请求时,都会产生一个event事件,这些事件会依次进入事件循环中被处理。

在 一个isolate环境中,除了事件循环 (Event Loop),还存在两个消息队列。

  1. event queue事件队列:用来处理外部的事件,如果IO、点击、绘制、计时器(timer)和不同 isolate 之间的消息事件等。
  2. microtask queue微任务队列:处理来自于Dart内部的任务,适合用来不会特别耗时或紧急的任务,微任务队列的处理优先级比事件队列的高,如果微任务处理比较耗时,会导致事件堆积,应用响应缓慢。
import 'dart:async';

main() {
    // 默认添加到事件队列
  new Future(() => print('beautiful'));
  // 收到提交到微任务队列
  Future.microtask(() => print('hi'));
}
打印:
222
111
Exited

当我们程序启动时,默认会创建一个主的isolate(隔离)(相当于一个运行环境),启动事件循环,事件会按照FIFO的顺序依次执行,优先处理 microtask queue中的任务,然后处理 event queue中的任务。

按照第二种方式,我们执行如下任务:

void fut0() {
  print('1');
  final fut0 = Future(() {
    for (var i=0; i< 10000; i++) {
      for (var j=0; j<10000; j++) {
        
      }
    }
    return 'xxxxxx';
  }).then((value) => {
    print(value)
  });
  Future.microtask(() => print('3'));
  print('2');
}

其中执行了一个耗时的操作,最后输出:

1
2
3
xxxxxx

事件按照FIFO的方式,fut0 中的print('1')和print('2')方法会优先执行,然后在处理microtask queue中的任务输出 3,最后处理 event queue中的任务输出 xxxxxx
到这里,我们可以知道,一个 isolate(隔离) 相当于一个线程,因为事件处理会按照顺序执行,一个 isolate(隔离) 无法实现多线程并行的方式

1.4:针对第三种方式处理(多线程并行) -> Dart 中的 isolate(隔离)

到这里,我们需要重新去了解 Dart,在 1.2 中,我们提到 Dart 是单线程编程语言,这是因为我们在上层写代码的时候,不需要考虑并发问题,不需要对你的状态上锁这些,但其实它是由多个线程配合实现的单线程环境,那么 Dart 是如何实现多线程功能的?

在 1.3 中,我们提到 isolate隔离,默认情况下,在一个isolate隔离中的代码都是按照顺序执行的,所有无法实现多线程,那么我们是否可以创建多个isolate隔离来实现多线程?

  • 让我们先来了解一下isolate隔离是什么?
    isolate:英文翻译为隔离,它是类似于线程(thread)但不共享内存的独立运行的worker ,类似于一个进程
    我们可以查看文档中对 isolate的解释;大致翻译如下:
  1. 一个Isolate对象,是对一个 isolate隔离的引用,当我们创建一个新的Isolate 对象时,如果成功会生成一个新的isolate隔离(开辟一个新的独立的内存空间),在新的isolate隔离中,运行的代码会隔离在自己的事件循环中。按照这样,这样我们可以创建多个isolate隔离实现并发,因为每个isolate隔离是相互独立的。
  2. Isolate对象 允许被其他isolate控制、监听它所代表的isolate的事件循环,例如当这个isolate发生未捕获错误时,可以暂停(pause)isolate或获取(addErrorListener)错误信息。按照这样 isolate隔离是可以相互通信的。
  3. 代码默认运行在一个 main isolate隔离 中。

以下是网上找到的例子:这里创建了一个新的isolate隔离,然后在 main isolate隔离 去监听执行结果

import 'dart:isolate';
main(List<String> args) async {
  print('test1');
  await start();
  for (int i=0; i< 100; i++) {
    print('testxxx$i');
  }
  print('test3');
}
Isolate isolate;
start() async {
  print('test5');
  //创建接收端口,用来接收子线程消息
  ReceivePort receivePort = ReceivePort();
  //创建并发Isolate,并传入主线程发送端口
  isolate = await Isolate.spawn(entryPoint, receivePort.sendPort);
  //监听子线程消息
  receivePort.listen((data) {
    print('Data:$data');
  });
}
//并发Isolate
entryPoint(SendPort sendPort) {
  for (var i=0; i< 3000; i++) {
    print(i);
  }
  sendPort.send('xxxxx');
}

在执行之前,我们先猜想一下,有以下几种结果:
1:如果 await start(); 返回的是一个普通Future,那么输出顺序:

test1 -> test5 -> testxxx1 ->testxxx99 -> test3 -> Future中的输出

2:先执行 start() async中的代码,输出顺序:

test1 -> test5 -> test1 ->test2999 -> testxxx1 ->testxxx99 -> test3

3:以上两种都是按照顺序执行的,那么是否会出现并行?

test1 -> test5  -> ?(交错输出) -> test3

执行结果:

test1
test5
0
1
2
...
testxxx0
testxxx1
testxxx2
testxxx3
testxxx4
...
2694
2695
2696testxxx44
testxxx45
...
2901
testxxx93
testxxx94
testxxx95
testxxx96
testxxx97
testxxx98
testxxx99
test3
...
2998
2999
Data:xxxxx

我们可以通过打印输出观察到,确实是第三种输出,输出的顺序是不固定的,也就是说 await Isolate.spawn(entryPoint, receivePort.sendPort); 创建了一个新的isolate隔离,新的isolate隔离main isolate隔离同时执行。

结论:Dart可以通过isolate隔离来实现多线程并发执行
(在测试中,上面循环如果少于3000,无法产生我们预想的结果,原因未知)

通常情况下,我们的功能需求通过第二种方式中就可以完成,不需要新创建isolate隔离来实现

2:Future常见的创建方法

Future 提供了多种不同的 factory 方法,通过不同的方法,可以创建多种功能的Future

2.1:最基础的工厂方法
void futu1() {
  Future(() {
    for (var i=0; i< 10; i++) {
      print(i);
    }
    return 'xxx';
  }).then((value) => {
    print(value)
  });
}
2.2:Future.microtask 将事件添加到 microtask queue队列中
void futu2() {
  Future.microtask(() => {
    for (var i=0; i< 10; i++) {
      print(i)
    }
  }).then((value) => {
    print(value)
  });
}
2.3:Future.sync,future中的任务同步执行
void futu3() {
  print('1');
  Future.sync(() => {
    for (var i=0; i< 10; i++) {
      print(i)
    }
  }).then((value) => {
    
  });
  print('3');
}

输出:

1
test0
test1
test2
test3
test4
test5
test6
test7
test8
test9
3
2.4:Future.value()
Future<String> futu4() {
  return Future.value('3');
}
2.5:Future.delayed 延时执行操作
void futu5() {
  Future.delayed(new Duration(seconds: 2)).then((value) => {
    print('22')
  });
}
2.6:Future.wait[xxx] 等待 [xxx]中的任务都执行完之后,才执行
void futu6() {
  print('11');
  Future.wait([Future.delayed(new Duration(seconds: 2)).then((value) => {
    print('22')
  }), Future(() {
    print('33');
  })]).then((value) => {
    print('44')
  });
  print('55');
}

输出:

11
55
33
22
44
2.7:Future.any[xxx] 只要 [xxx]中的任务有一个执行完成,就执行
void futu7() {
  print('11');
  Future.any([Future.delayed(new Duration(seconds: 2)).then((value) => {
    print('22')
  }), Future(() {
    print('33');
  })]).then((value) => {
    print('44')
  });
  print('55');
}

输出:

11
55
33
44
22

3:Future常见用法

3.1:获取 future 的执行结果
  1. 使用then获取future的返回值
void futu1() {
  Future(() {
    return 'xxx';
  }).then((value) => {
    print(value)
  });
}
  1. 使用asyncawait来获取返回值
futu1() async {
  var value = await Future(() {
    for (var i=0; i< 10; i++) {
      print(i);
    }
    return 'xxx';
  });
  print(value);
}
3.2:使用 catchError 拦截future中的错误
void futu2() {
  Future(() => {
    throw new Exception("3")
  }).then((value) => {
    print(value)
  }).catchError((error) {
    print(error);
  });
}
3.3:使用 whenComplete 监听 future 执行结束
void futu2() {
  Future(() => {
    throw new Exception("3")
  }).then((value) => {
    print(value)
  }).catchError((error) {
    print(error);
  }).whenComplete(() => {
    print('complete')
  });
}
3.4:使用 timeout 判断 future 执行超时
void futu2() {
  Future.delayed(Duration(seconds: 3), () {
    return 'xxx';
  }).timeout(Duration(seconds: 2)).then((value) => {
    print(value)
  }).catchError((error) {
    print(error);
  }).whenComplete(() => {
    print('complete')
  });
}

输出:

TimeoutException after 0:00:02.000000: Future not completed
complete

4:FutureOr

FutureOr<T>:表示Future<T>T的值的类型
获取 FutureOr<T>的值,需要先判断类型,通常使用如下判断:

if (value is Future<T>) {
    var result = await value;
    print(result);
}else {
    print(value);
}

参考:
Dart 异步编程
阻塞IO与非阻塞IO
搞懂Dart异步并封装Isolate
Dart中的Isolate