dream

一个菜鸟程序员的成长历程

0%

thinkphp-queue队列使用

在我们写程序的时候,经常会用到队列来完成一些操作,关于队列的介绍和使用场景,注意事项可以看我的这个文章你不知道的队列使用技巧

在tp里面使用队列

安装

tp框架提供了一个扩展包,叫做think-queue。我们先来安装这个扩展包。

composer require topthink/think-queue

配置消息队列

等待安装完成之后,我们需要进行配置,消息队列的消息存放在哪里,可以配置成redis。

配置在你的config/queue.php里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

'default' => 'redis', //默认是sync,改成redis

'connections' => [
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => env('queue.host', '127.0.0.1'),
'port' => env('queue.port', 6379),
'password' => env('queue.password', ''),
'select' => 0, // 使用哪一个 db,默认为 db0
'timeout' => 0, // redis连接的超时时间
'persistent' => false, // 是否是长连接
],
],

创建消息

配置完成以后我们就可以开始使用了。

在我们的controller里面把一个消息推送到队列里面。这里我们定义一个队列名称叫做message,定义一个处理队列消息的消费者类app\common\queue\consumer。然后调用Queue门面的push方法,把消费者,队列名称,数据传入进去就可以了。这个时候就会把数据放到message这个队列里面。然后消费者取出数据进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php
namespace app\api\controller;

use app\BaseController;
use think\facade\Queue;

class Test extends BaseController
{

protected $consumer = 'app\common\queue\consumer'; //消费者类

protected $queue = 'message'; //队列名称

public function test() {
if ($this->request->isPost()) {
//要推送到队列里面的数据
$jobData = [];
$jobData["a"] = 'a';
$jobData['b'] = 'b';

$res = Queue::push($this->consumer, $data, $this->queue);
return json([]);
}
}
}

消费消息

我们接下来实现我们上面定义的消费者。来处理我们的逻辑。

消费者app\common\queue\consumer类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<?php
namespace app\common\queue;

use think\queue\Job;

class consumer {
/**
* fire方法是消息队列默认调用的方法
* @param Job $job 当前的任务对象
* @param array|mixed $data 发布任务时自定义的数据
*/
public function fire(Job $job,$data)
{
// 有些消息在到达消费者时,可能已经不再需要执行了
$isJobStillNeedToBeDone = $this->check($data);
if(!$isJobStillNeedToBeDone){
$job->delete(); //删除任务
return;
}

//执行任务
$isJobDone = $this->doJob($data);

if ($isJobDone) {
// 如果任务执行成功, 记得删除任务
$job->delete();
}else{
if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了
$job->delete();
}
}
}

/**
* 有些消息在到达消费者时,可能已经不再需要执行了
* @param array|mixed $data 发布任务时自定义的数据
* @return boolean 任务执行的结果
*/
private function check($data){
return true;
}

/**
* 根据消息中的数据进行实际的业务处理...
*/
private function doJob($data)
{
dump($data);
return true;
}
}

这样我们就完成了代码的逻辑,也就是发布消息,消费消息。

接下来我们启动这个队列。

启动队列

启动队列有两种方式。

  • work
  • listen

work方式启动。这种方式是单进程运行。如果你更新了代码需要手动重启队列。

php think queue:work –queue message //我们刚刚定义的队列名称

listen方式启动。这种方式是master-worker模型。一个master主进程来监听,当请求进来了启动一个work子进程来运行上面的work方式启动。

php think queue:listen –queue message

我更推荐listen方式来运行。这种方式更新代码后也不需要手动重启。

thinkphp-queue队列导致MySQL server has gone away

虽然队列一时爽,不过还是有缺点的,比如当队列运行时间长了会报错 MySQL server has gone away

原因是使用work模式运行时间长了以后没有释放mysql数据库的链接,导致时间长了以后被mysql server端判断超时切断了链接。

可以改用listen模式运行,这样每次都是启动一个新的work进程来运行程序,每次都会新链接数据库。

可以使用tp的断线重连功能。修改配置文件config/database.php

1
2
3
4
5
6
7

// 数据库连接配置信息
'connections' => [
'mysql' => [
// 是否需要断线重连
'break_reconnect' => true,
],

php-mpdf扩展包中文乱码问题

mpdf是一个可以把html网页转换成pdf文件的扩展包。一开始使用的时候,发现中文乱码了。。在网上查了半天,好多方法都不管用。

最后,在他的文档里面找到了问题原因。

想要输出中文,有两个参数至关重要!!!

  • autoLangToFont 这个值一定要设置为true才可以
  • autoScriptToLang 这个值也一定要设置为true才可以

只要上面两个设置为true,那么你的中文就可以正常输出了。相信我,不能正常输出你来打我。

看一下mpdf文档上面的描述。

mpdf

mpdf

可以看到默认值是false,所以我们使用的时候需要改成true。

设置这两个值也很简单。

1
2
3
4
5
6
7
8
9
10
11
12

use Mpdf\Mpdf;

function test() {
$pdf = new Mpdf;
$pdf->autoLangToFont = true;
$pdf->autoScriptToLang = true;

$pdf->writeHTML('<h1>123</h1>');

return $pdf->output('./test.pdf', 'D');
}

其实,mpdf的文档最开始是有错误的,他的文档中写的默认值是true而不是现在的false。不过从他的源码上可以看到他的默认值其实是false

源码位置:vendor/mpdf/mpdf/src/Config/ConfigVariables.php里面。
这个文件里面是很多变量的默认值,在这里面搜索可以看到这两个值是false。

1
2
3
4
5
6
7
8

// AUTOMATIC FONT SELECTION
// Based on script and/or language
// mPDF 6.0 (similar to previously using function SetAutoFont() )
'autoScriptToLang' => false,

// mPDF 6.0 (similar to old useLang)
'autoLangToFont' => false,

我给他们的github上面提了一个issue,他们才把文档改过来了。

mpdf

最后附上mpdf官方文档:

http://mpdf.github.io/fonts-languages/fonts-in-mpdf-7-x.html

我给他们提的issue:

https://github.com/mpdf/mpdf.github.io/issues/141

thinkphp-tp6使用chunk分块操作数据的坑

有的时候我们会遇到需要定时操作数据的需求,比如定时更新所有用户的权益,徽章等等。这个时候你不能一次性取出所有数据来进行操作,因为数据量太大了,我们一次取出全部,先不说mysql数据库会很慢,就算取出来传给你,网络开销也很大。这时候你通过网络接收到数据以后,会把这些数据放到一个变量里面。这个变量是存在内存中的,如果过大还会导致内存溢出,内存不足的问题。

所以我们就需要分页取出数据来进行操作,比如每次取出100条,操作完了再取出下100条。而tp框架提供了一个方便的chunk方法来供我们使用,免去了我们需要手动limit分页的麻烦。

我之前使用过laravel的chunk,以为两个差不多,看了文档也觉得差不多。下图是tp6文档的描述。

tp

其实单表这么写也没有什么问题,不过一旦你使用了连表查询,就出现问题了。。而他的文档并没有说连表的问题。

虽然他的文档有这么一段也说明了主键和排序的问题。

tp

但是,没想到连表的时候是必须,注意,必须!!!传主键,不然他不知道是哪个表的主键。而laravel就没有这个问题。。

我当时写的时候去找了他的源码,才看到这个问题,因为我正常写完后一直报错。。

看一下他的源码。源码位置在./vendor/topthink/think-orm/src/db/query.php里面的chunk方法。

tp

tp

从这里可以看到他有4个参数。

  • count 每次处理的数量
  • callback 处理的回调函数
  • column 处理的字段名 默认 null
  • order 字段排序 默认asc

前两个我们必传,后面的可选。

他的第二行代码,如果你传了第三个参数,那么使用你传的,不然调用getPk这个函数。这个函数在源码里面也有,就是获取主键。假设你不传,你的主键是id,那么column这个变量就是id。

1
$column  = $column ?: $this->getPk();

接下来的代码你会发现你的column参数,还可以传一个数组。如果是一个数组,那么他在这里不使用这个参数。

如果你传的不是一个数组,那么看有没有.也就是连不连表。因为连表你传的是a.id。如果连表那么explode分割成数组[a,id]的形式。

如果你传的就是id那么直接赋值给变量key

1
2
3
4
5
6
7
8
9
10
11
12
13

if (is_array($column)) {
$times = 1;
$query = $this->options($options)->page($times, $count);
} else {
$query = $this->options($options)->limit($count);

if (strpos($column, '.')) {
[$alias, $key] = explode('.', $column);
} else {
$key = $column;
}
}

接下来就是真正获取数据,然后调用回调函数,再重复获取数据的过程了。

1
$resultSet = $query->order($column, $order)->select();

这里可以看到,我们传数组,那么数组就会直接给order函数,如果是连表的主键a.id,那么就会把[a,id]给order函数,如果是单表,那么默认id主键给order函数。

2022年终总结

2022年天灾人祸不断,疫情严重,年中的时候就居家办公了一个月,十一之后又开始了居家办公。好消息是年终的时候疫情结束了,大家一起阳。

这一年里钱没挣到,没攒下。工作也是稀里哗啦。开始了从php转java的路子。刚开始转java很不适应。很多写法,习惯不一样,很多工具不知道,不会用。不断的踩坑,填坑。

到年终的时候总算把java搞得差不多了,写项目也没有在踩坑之类的了。不过关于java的内存,gc,多线程这些还是有待提高。

同样的,因为java的不适应和踩坑,再加上作为第一批转java的,接了难活等等,年底绩效背了c。

2022年花钱如流水,换了3部手机,买了一个mac pro。

也完成了我的求婚。结婚也要提上日程了。

新的一年希望越来越好。

2023年小目标

学习

  • csapp
  • 伯克利cs61b
  • cmu 15-445

自考本科完成

读书

  • 离散数学及其应用
  • 概率论
  • csapp
  • 每个人的经济学
  • 置身事内

证书

  • 公共英语三级
  • 软考高级

java

  • 内存
  • gc
  • 多线程

stream

stream的结果态

当前使用的结果态方法是collect。主要作用是收集。看一下源码。作为结果态方法,不再返回stream类型的对象。接受一个Collector类型的对象作为参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
//声明一个变量
A container;
//判断是否并行流。短路,直接走else
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
//调用evaluate方法
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}

先看一下ReduceOps 类的 makeRef方法。构造一个结果态对象,对引用类型的值执行可变的计算,规约。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

/** Constructs a TerminalOp that implements a mutable reduce on reference values.
形参:
collector – a Collector defining the reduction
返回值:
a ReduceOp implementing the reduction
*/
public static <T, I> TerminalOp<T, I> makeRef(Collector<? super T, I, ?> collector) {

//参数校验不为空并且获取到collector的 supplier
Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
// 获取到 collector的 accumulator
BiConsumer<I, ? super T> accumulator = collector.accumulator();
// 获取到 collector的 combiner
BinaryOperator<I> combiner = collector.combiner();
// 内部类
class ReducingSink extends Box<I>
implements AccumulatingSink<T, I, ReducingSink> {
@Override
public void begin(long size) {
state = supplier.get();
}

@Override
public void accept(T t) {
accumulator.accept(state, t);
}

@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
//创建一个 ReduceOp 的对象
return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}

@Override
public int getOpFlags() {
return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
? StreamOpFlag.NOT_ORDERED
: 0;
}
};
}

看一下ReduceOp类。他根据指定的流类型创建ReduceOp对象。 使用指定的supplier创建 accumulating sinks。 对流进行预估并将结果发送至 accumulating sinks。然后执行计算操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
implements TerminalOp<T, R> {
private final StreamShape inputShape;

/**
* Create a {@code ReduceOp} of the specified stream shape which uses
* the specified {@code Supplier} to create accumulating sinks.
*
* @param shape The shape of the stream pipeline
*/
ReduceOp(StreamShape shape) {
//这里 shape 是 REFERENCE 也就是引用类型
inputShape = shape;
}
}

创建完结果态对象以后,将结果态对象传入evaluate方法。预估管道的结果态操作并产生一个结果。

  • container = evaluate(ReduceOps.makeRef(collector));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Evaluate the pipeline with a terminal operation to produce a result.
*
* @param <R> the type of result
* @param terminalOp the terminal operation to be applied to the pipeline.
* @return the result
*/
//这里的参数就是上面刚生成的结果态对象。
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
//断言类型是否是引用类型
//getOutputShape是ReferencePipeline类的方法,直接返回的就是 REFERENCE, 刚才生成的结果态的shape也是 REFERENCE
assert getOutputShape() == terminalOp.inputShape();
//判断是否已经消费 如果已经消费了 抛出非法状态异常。
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
//标记为已消费
linkedOrConsumed = true;
//如果是并行流调用结果态的并行流方法,否则调用顺序流方法。
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

调用顺序流处理方法之前,参数先调用了sourceSpliterator方法。而在调用sourceSpliterator之前,还调用了结果态的getOpFlags方法。

getOpFlags方法在创建结果态的时候增加的,代码在下面,一起回顾一下。这个方法很简单,就是看collector的characteristics是否无序。

  • 如果无序,返回StreamOpFlag.NOT_ORDERED标志,即32
  • 否则返回0

我们这里返回的是0,因为list并不需要有序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

//创建一个 ReduceOp 的对象
return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}

@Override
public int getOpFlags() {
return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
? StreamOpFlag.NOT_ORDERED
: 0;
}
};

接下来看sourceSpliterator方法。作用如下:

  • 如果是一个顺序流或无状态并行流,返回初始化的spliterator对象。
  • 如果是一个有状态并行流,返回一个spliterator对象,包含所有最近状态操作的计算结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* Get the source spliterator for this pipeline stage. For a sequential or
* stateless parallel pipeline, this is the source spliterator. For a
* stateful parallel pipeline, this is a spliterator describing the results
* of all computations up to and including the most recent stateful
* operation.
*/
@SuppressWarnings("unchecked")
//参数是0
private Spliterator<?> sourceSpliterator(int terminalFlags) {
// Get the source spliterator of the pipeline
//声明 spliterator = null
Spliterator<?> spliterator = null;
//判断 sourceStage指针的sourceSpliterator是否为null
//sourceStage指针指向头节点,所以是判断 头节点的 sourceSpliterator是否为null。
//在最开始初始化的时候,头节点的 sourceSpliterator 指向了一个 ArrayListSpliterator 对象,所以这里不是null
//不是null的话我们会取出 头节点的 sourceSpliterator
if (sourceStage.sourceSpliterator != null) {
// 本地变量 spliterator = 头节点的 ArrayListSpliterator 对象 里面包含我们的源数据 list。
spliterator = sourceStage.sourceSpliterator;
// 删除头节点的 sourceSpliterator
sourceStage.sourceSpliterator = null;
}
//这里在初始化的时候并没有 初始化 头节点的 sourceSupplier ,所以这里是null,并不会走到这里
else if (sourceStage.sourceSupplier != null) {
spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}

//判断是否是并行流,触发短路。
if (isParallel() && sourceStage.sourceAnyStateful) {
// Adapt the source spliterator, evaluating each stateful op
// in the pipeline up to and including this pipeline stage.
// The depth and flags of each pipeline stage are adjusted accordingly.
int depth = 1;
for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
u != e;
u = p, p = p.nextStage) {

int thisOpFlags = p.sourceOrOpFlags;
if (p.opIsStateful()) {
depth = 0;

if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
// Clear the short circuit flag for next pipeline stage
// This stage encapsulates short-circuiting, the next
// stage may not have any short-circuit operations, and
// if so spliterator.forEachRemaining should be used
// for traversal
thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
}

spliterator = p.opEvaluateParallelLazy(u, spliterator);

// Inject or clear SIZED on the source pipeline stage
// based on the stage's spliterator
thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
: (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
}
p.depth = depth++;
p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
}
}

//直接到这里,由于传进来的是0,所以直接返回了
if (terminalFlags != 0) {
// Apply flags from the terminal operation to last pipeline stage
// 合并结果态操作到最后一个流操作的标志
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
}

//直接到这里返回 头节点的 ArrayListSpliterator
return spliterator;
}

回到上面evaluate方法的这段代码terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

  • 第一个参数是this
  • 第二个参数是刚才返回的 头节点的 ArrayListSpliterator
  • 复习一下ArrayListSpliterator的属性
    • list = list
    • index = 0
    • fence = -1
    • expectedModCount = 0

接下来走到了 结果态 对象的 evaluateSequential 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
//调用stream流的wrapAndCopyInto方法
//第一个参数是 makeSink 方法生成的 ReducingSink 对象,第二个参数是 ArrayListSpliterator
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}

//makeSink是上面生成 结果态 对象的时候在 ReduceOp 对象里面加的
@Override
public ReducingSink makeSink() {
// 返回了一个 ReducingSink 对象。
return new ReducingSink();
}

// 这个类,提供了三个方法。等用到的时候说。
class ReducingSink extends Box<I>
implements AccumulatingSink<T, I, ReducingSink> {
@Override
public void begin(long size) {
state = supplier.get();
}

@Override
public void accept(T t) {
accumulator.accept(state, t);
}

@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}

看一下流的wrapAndCopyInto方法。这个方法在 AbstractPipeline 类里面。

  • 第一个参数是 ReducingSink 对象
  • 第二个参数是 ArrayListSpliterator 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
//调用过来 copyInfo 方法,第一个参数又调用了 wrapSink方法
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}

//参数是 ReducingSink 对象
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
//参数校验
Objects.requireNonNull(sink);

// for循环 p = 当前节点,如果p的深度 > 0, 就循环,然后p = p的前一个节点
// 也就是从当前节点往前遍历,一直到头节点。
// 当前节点按照我们写的就是 filter 节点。
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
// 调用每个节点的 opWrapSink 方法,传入前一个节点的标志位和 sink 对象。第一次是 ReducingSink
// 调用 filter 节点的结束后 把包装好的 sink链 也就是 ChainedReference 对象赋值给 sink。
// 调用 map 节点的时候,传入的sink变成了 filter 返回的 ChainedReference 对象。
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}

回顾一下 filter 节点的 opWrapSink 方法 。这是在中间态节点生成的时候,创建无状态对象的时候添加的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {
//增加了opWrapSink方法,第一个参数是标志 = 90,第二个是 ReducingSink 对象
//这个方法会在结果态的时候调用
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
//创建一个 Sink.ChainedReference类的对象并返回。传入 ReducingSink 对象
//简单来说就是包装 sink 对象,并创建出一个 sink 执行链。
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
//这里重写了 ChainedReference 的 begin 方法
public void begin(long size) {
//调用 ReducingSink 的 begin 并传入 -1
downstream.begin(-1);
}

//重写了 accept 方法
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};

static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
protected final Sink<? super E_OUT> downstream;

//这里的 downstream = ReducingSink 赋值给对象的 downstream 属性。
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}

@Override
public void begin(long size) {
downstream.begin(size);
}

@Override
public void end() {
downstream.end();
}

@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}

看一下现在的 sink 链

stream5.png

回顾一下 map 节点的 opWrapSink 方法 。这是在中间态节点生成的时候,创建无状态对象的时候添加的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {

//增加了opWrapSink方法,第一个参数是标志 = 95,第二个是 ChainedReference 对象
//这个方法会在结果态的时候调用
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
//再次创建一个 Sink.ChainedReference类的对象并返回。传入一个 ChainedReference 对象
return new Sink.ChainedReference<P_OUT, R>(sink) {

//这里重写了 accpet方法
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};

static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
protected final Sink<? super E_OUT> downstream;

//这里的 downstream = ChainedReference 赋值给对象的 downstream 属性。
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}

@Override
public void begin(long size) {
downstream.begin(size);
}

@Override
public void end() {
downstream.end();
}

@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}

看一下现在的 sink 链。

stream6.png

现在来到 copyInto 方法。这个方法两个参数

  • 第一个是包装好的 sink 链。
  • 第二个是 ArrayListSpliterator 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
//参数校验
Objects.requireNonNull(wrappedSink);

//getStreamAndOpFlags方法获取标志位 154
//isKnown返回false
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
// getExactSizeIfKnown = 7 因为list的大小是 7
// 调用了 sink 的 begin。现在的 sink 是 map 的 sink,没有重写 begin
// 所以 begin 是 downstream.begin(size); size = 7
// 而map sink的 downstream指向了 filter 的 sink,所以调用了filter sink 的 begin
// filter 重写了,是 downstream.begin(-1); 调用了 ReducingSink 的 begin
// ReducingSink 的 begin 是给 state 赋值 supplier.get()的值 是 0
wrappedSink.begin(spliterator.getExactSizeIfKnown());
// 循环剩余的数据,传入 sink 链
spliterator.forEachRemaining(wrappedSink);
// 执行结束方法
// 先调用 map sink, 然后 filter sink 然后 collect 的end
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}

//判断这个标志是否在流上设置,是否在操作上设置。是否在流和操作的组合上设置。
boolean isKnown(int flags) {
// 154 & 50331648 == 16777216
// 返回 false
return (flags & preserve) == set;
}

//如果能获取到size 就返回 size,不然返回 -1
default long getExactSizeIfKnown() {
return (characteristics() & SIZED) == 0 ? -1L : estimateSize();
}

//获取大小
public long estimateSize() {
return (long) (getFence() - index);
}

看循环方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public void forEachRemaining(Consumer<? super E> action) {
//声明变量
int i, hi, mc; // hoist accesses and checks from loop
ArrayList<E> lst; Object[] a;
//参数校验,空指针
if (action == null)
throw new NullPointerException();

//list 是我们的数据 不为空并且取出一个元素也不为空
if ((lst = list) != null && (a = lst.elementData) != null) {
//把大小赋值给hi = 7
if ((hi = fence) < 0) {
mc = lst.modCount;
hi = lst.size;
}
else
// mc = 7
mc = expectedModCount;

// index >=0 并且 hi <= list的数量
// i = 0 index = hi = 7
if ((i = index) >= 0 && (index = hi) <= a.length) {
//开始循环 0 < 7, 执行完循环后 ++i, i = 1
for (; i < hi; ++i) {
//取出一个元素
@SuppressWarnings("unchecked") E e = (E) a[i];
//调用 sink 链的 accpet方法 传入 该元素。
action.accept(e);
}
//循环完以后判断一下是否全部都处理完了 完成就返回
if (lst.modCount == mc)
return;
}
}
//抛出异常
throw new ConcurrentModificationException();
}


//map的 accpet方法
@Override
public void accept(P_OUT u) {
//先进行处理 mapper.apply相当于调用了我们map传入的方法
//将结果沿着sink链传播
downstream.accept(mapper.apply(u));
}

//filter的 accept 方法
@Override
public void accept(P_OUT u) {
//predicate.test就相当于调用了我们 filter 的时候传入的方法。
//看过滤结果是否成功,如果成功继续沿着sink链流动。
//不成功则不流动。
if (predicate.test(u))
downstream.accept(u);
}

//结果态的collect的 accept 方法
class ReducingSink extends Box<I> implements AccumulatingSink<T, I, ReducingSink> {
@Override
public void begin(long size) {
// 初始化
state = supplier.get();
}

@Override
public void accept(T t) {
// 执行累加器的 accpet
// 这个就是 state.add(t) 具体后面会讲
accumulator.accept(state, t);
}
}

最后,回到开始的collect方法中。evaluate会返回我们刚才一系列操作以后收集到的满足条件的list

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
//声明一个变量
A container;
//判断是否并行流。短路,直接走else
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
//调用evaluate方法
container = evaluate(ReduceOps.makeRef(collector));
}
//判断标志位,collector.characteristics() = IDENTITY_FINISH
// 所以这里直接返回 收集的 list
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}

php yii2框架前端加载css和js文件的方法

这两天有一个以前的项目是用yii2框架写的,前后端没有做分离,现在需要用vue接手后续的前端开发。

把vue的项目放到yii2里面,这时候遇到一个加载静态资源的问题,原来html的引用方式不管用了。

后来看到yii2官方文档里面,需要改一下引用方式。

改成下面这样就可以了。

1
2
$this->registerCssFile("@web/static_vue/css/index.css")
$this->registerJsFile("@web/static_vue/js/index.js")

所有都使用这两个php代码进行引入,引入后就可以了。

wsl

wsl是可以在windows里面运行linux的一个软件。是微软官方发行的。

安装go

先去到安装目录/usr/local/src

从go官网下载go tar包。

1
sudo wget https://golang.org/dl/go1.16.3.linux-amd64.tar.gz

然后解压

1
sudo tar -zxvf go1.16.3.linux-amd64.tar.gz

接下来需要配置环境变量。可以设置在/etc/profile文件里面也可以设置在其他地方,我用的是zsh,所以我的环境变量配置在~/.zshrc文件里面。

执行vim ~/.zshrc。然后添加变量信息

1
2
export GOROOT="/usr/local/src/go"
export PATH="$PATH:$GOROOT/bin"

接下来重新加载一下配置文件

1
source ~/.zshrc

现在go就安装好了。可以执行go version查看go的版本。

创建第一个go 项目

我把项目放在~/go目录下。所以把这个目录添加到配置文件里面。因为这个目录也相当于$HOME/go。所以我们直接添加。

执行vim ~/.zshrc。然后添加变量信息

1
export GOPATH="$HOME/go"

接下来重新加载一下配置文件

1
source ~/.zshrc

在项目目录下面创建第一个文件。

1
vim ~/go/hello.go

添加下面的代码

1
2
3
4
5
6
7
8
9

package main

import "fmt"

func main () {
fmt.Printf("hello world\n")

}

进行编译

1
go build ~/go/hello.go

编译完成出现hello文件,直接执行。

1
~/go/hello

就可以看到输出了。

wsl

wsl是可以在windows里面运行linux的一个软件。是微软官方发行的。

安装php

从php官网下载php tar包。

1
sudo wget https://www.php.net/distributions/php-7.4.12.tar.gz

然后解压

1
sudo tar -zxvf php-7.4.12.tar.gz

接下来需要安装一些扩展来支持php。

1
sudo apt-get install gcc make pkg-config libxml2-dev libssl-dev libsqlite3-dev libcurl4-openssl-dev libonig-dev zlib1g-dev libffi-dev libpng-dev libzip-dev

不安装上面的扩展会导致接下来报错。

切换目录

1
cd ./php-7.4.12

执行configure,注意这里prefix一定要是/usr/local/php7,要不然找不到配置文件php.ini。这里有这个坑。

1
sudo ./configure --enable-fpm --with-mysql --with-pear --with-zip --enable-sockets --enable-soap --with-pdo-mysql  --enable-gd --enable-ftp --with-ffi  --with-zlib  --with-curl --with-openssl --enable-mbstring --prefix=/usr/local/php7 --with-config-file-path=/usr/local/php7 --with-external-gd --with-webp  --with-jpeg  --with-xpm  --with-freetype  --enable-bcmath

执行完上面一步如果没有错误就可以了。

接下来执行make

1
sudo make && sudo make install

复制一些配置文件

1
2
sudo cp /usr/local/src/php7/etc/php-fpm.conf.default /usr/local/src/php7/etc/php-fpm.conf
sudo cp /usr/local/src/php7/etc/php-fpm.d/www.conf.default /usr/local/src/php7/etc/php-fpm.d/www.conf

修改php-fpm的用户

1
sudo vim /usr/local/src/php7/etc/php-fpm.d/www.conf

找到 usergroup 这两个参数,原来的值是 nobody改成www-data,然后保存退出。

建立软连接或者环境变量。我们要配置全局的环境变量有两种方式。

  • 在环境变量目录里面增加软连接
  • 把php目录增加到环境变量里面

我采用的是软连接的方式。

1
2
3
sudo ln -s /usr/local/php7/bin/php /usr/local/bin/php
sudo ln -s /usr/local/php7/bin/phpize /usr/local/bin/phpize
sudo ln -s /usr/local/php7/sbin/php-fpm /usr/local/bin/php-fpm

接下来就可以全局使用php命令了

php-fpm启动,重启方法

启动

1
sudo php-fpm

重启 先找到进程 然后发送USR2信号

1
2
3
ps -aux | grep php

sudo kill -USR2 进程id

安装nginx

访问nginx的官网进行下载。

复制下载地址。比如我下载的1.19.5,直接下载

1
sudo wget https://nginx.org/download/nginx-1.19.5.tar.gz

然后解压

1
sudo tar -zxvf nginx-1.19.5.tar.gz

接下来需要安装一些扩展来支持php。

1
sudo apt-get install libpcre3 libpcre3-dev

切换目录

1
cd ./nginx-1.19.5

执行安装

1
2
sudo ./configure --with-file-aio --with-http_ssl_module --with-http_v2_module --with-http_realip_module --prefix=/usr/local/src/nginx
sudo make && sudo make install

好了,安装完成。

同意,建立软连接。

1
sudo ln -s /usr/local/src/nginx/sbin/nginx /usr/local/bin/nginx

接下来启动nginx看看效果

1
sudo nginx

访问localhost就可以看到效果了。

配置网站

接下来配置一下网站。

修改nginx.conf

1
sudo vim /usr/local/src/nginx/conf/nginx.conf

在http块里面加上这句话,引入其他的配置文件

1
include     conf.d/*.conf;

然后我们创建这个目录

1
sudo mkdir /usr/local/src/nginx/conf/conf.d

配置我们的网站文件

1
sudo vim /usr/local/src/nginx/conf/conf.d/test.com.conf

复制下面内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
server {
listen 80;
server_name test.com;
root "/home/wwwroot/test/public/";
location / {
index index.php index.html error/index.html;
autoindex off;
if (!-e $request_filename) {
rewrite ^(.*)$ /index.php?s=/$1 last;
break;
}
}
location ~ \.php(.*)$ {
fastcgi_pass 127.0.0.1:9000;
fastcgi_index index.php;
fastcgi_split_path_info ^((?U).+\.php)(/?.+)$;
fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
fastcgi_param PATH_INFO $fastcgi_path_info;
fastcgi_param PATH_TRANSLATED $document_root$fastcgi_path_info;
include fastcgi_params;
}
}

在修改hosts文件就好了。

redis扩展

如果要装redis扩展,那么手动下载

1
http://pecl.php.net/package/redis

找到tar包下载

1
sudo wget http://pecl.php.net/get/redis-5.3.2.tgz

解压缩

1
sudo tar -zxvf redis-5.3.2.tgz

进去执行phpize

1
2
cd ./redis-5.3.2
sudo phpize

然后编译

1
2
sudo ./configure --with-php-config=/usr/local/php7/bin/php-config
sudo make && sudo make install

接下来会出现下面的目录

1
/usr/local/php7/lib/php/extensions/no-debug-non-zts-20190902

修改我们的php.ini

1
sudo vim /usr/local/php7/lib/php.ini

修改下面这个

1
extension_dir=/usr/local/php7/lib/php/extensions/no-debug-non-zts-20190902

增加redis扩展

1
extension="redis.so"

重启nginx和php-fpm就好了。

如何生成ssh-key

打开命令终端,或者使用git bash都可以。

打开以后先查看你之前是否生成过ssh-key。生成之后会在目录~/.ssh/下面出现两个文件id_rsa私钥和id_rsa.pub公钥。

cd ~/.ssh/
ls

如果没看到这两个文件,那么开始执行生成指令。

ssh-keygen -t rsa -C “你的邮箱” //这里一般使用github的邮箱

运行之后,会出现提示让你输入一些东西,这里不需要管,不需要输入,直接回车即可。

一直回车。直到指令执行完。

再次查看,发现已经生成了两个文件。接下来使用的时候只需要把公钥给到别人就可以了。