dream

一个菜鸟程序员的成长历程

0%

网络原理自顶向下三可靠数据传输原理实现SR选择重传协议

SR

在SR中,和GBN不同,SR是给每一个分组设置定时器,发送端只确认重传当前分组,而不是所有分组。接收端在接收到乱序的分组的时候会进行缓存,当前面的分组到达以后一起提交给应用层。

发送端

  • 等待上层调用。这里和GBN一样
  • 超时。超时哪个重传哪个
  • 收到ACK。如果收到的是最小序号的ACK,那么base可以往前移动,也就是窗口滑动。如果收到其他序号的ACK。那么把这些ACK缓存。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//序号
$base = 1;
$nextSeqNum = 1;
function rdt_send($data) {
//生成校验和
$checkSum = generateCheckSum($data);
//组装报文
$packet[$nextSeqNum] = make_pkt($data, $checkSum, $nextSeqNum);
//调用网络层传输
udt_send($packet[$nextSeqNum]);
//每个启动一个定时器
start_timer($nextSeqNum);
//如果超时
if ($timeNum = timeout()) {
//重传超时的分组
udt_send($packet[$timeNum]);
} else if ($isAck = rdt_rev() && check($isAck)) {
//等待接收方回传ack 并且没有出现错误
//获取确认的序号
$ackNum = getAckNum($isAck);
stop_timer($ackNum);
//判断这个ACK是不是base
if ($ackNum == $base) {
++$base;
//判断缓存有没有
while(array_key_exists(++$ackNum, $cache)) {
//如果下一个ack已经收到了,那么就把base接着往前移动
++$base;
}
} else {
//缓存起来
$cache[$ackNum] = $ackNum;
}
}
}

接收端

  • 序号在rcv_base 至 rcv_base + N - 1内的分组被正确接受。如果该分组不是期望的分组,那么缓存,如果是,那么给应用层并且看缓存里面有没有后续,有就直接一起给应用层
  • 序号在rcv_base - N 至 rcv_base - 1内的分组被正确接受。返回一个确认ACK。表示我已经收到了。
  • 其他情况。忽略分组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//期望的序号
$expackNum = 1;
function rdt_rev($packet) {
//差错检测通过了并且报文序号正确
if (check($packet)) {
if ($packet['num'] > $expackNum && $packet['num'] < $expackNum + N - 1) {
if ($packet['num'] == $expackNum) {
//是我们期望的,直接给应用层
//解析报文
$data = extract($packet);
//序号对的
//没有错,把数据交付给应用层并回传ack
//把数据给应用层
deliver_data($data);
//回传ACK
$ack = make_pkt(1, $expackNum);
$expackNum++;
udt_send($ack);
//查询缓存里面有没有
$key = $expackNum + 1;
while(array_key_exists($key, $cache)) {
//如果下一个分组已经收到了,那么给应用层,并且滑动窗口
deliver_data($cache[$expackNum]);
++$expackNum;
$key++;
}
} else {
//缓存分组
$cache[$packet['num']] = $packet;
//回传ACK
$ack = make_pkt(1, $packet['num']);
udt_send($ack);
}
}
} else {
//没通过差错检测或者序号错误,我们回传一个上一个ack,告诉发送端上一个分组我们收到了,当前分组没收到。
//回传ACK
$ack = make_pkt(1, $expackNum);
udt_send($ack);
}
}

网络原理自顶向下三可靠数据传输原理实现停等协议

这里仅考虑单向可靠数据传输。而不是双向可靠数据传输

构造可靠数据传输协议

经完全可靠信道的可靠数据传输 rdt1.0版本

首先考虑最简单的版本,底层信道完全可靠。

发送端

发送端应用层只需要调用rdt_send函数。网络层提供了一个函数udt_send来给运输层调用。现在假设udt_send是可靠的。

1
2
3
4
5
6
function rdt_send($data) {
//组装报文
$packet = make_pkt($data);
//调用网络层传输
udt_send($packet);
}

接收端

接收端网络层只需要调用rdt_rev函数。应用层提供了一个deliver_data函数来接受运输层的数据。

1
2
3
4
5
6
function rdt_rev($packet) {
//解析报文
$data = extract($packet);
//把数据给应用层
deliver_data($data);
}

有限状态机

再来画一下对应的有限状态机(FSM).

发送端只有一个状态,等待调用

接收端也只有一个状态,等待调用

经比特差错信道的可靠数据传输rdt2.0

现在底层信道有可能造成比特的错误。

回想一下打电话的时候,如果我们说的话对方没听清,会怎么样。会再说一遍也就是重传

那么什么情况下会重传。当接收方说我没听清的时候。

所以在rdt2.0里面我们让接收方接受完信息后回传一个标志,告诉我们正确还是错误

如果正确,那么我们继续等待调用。

如果错误,那么我们重传

基于这样重传机制的可靠数据传输协议称为自动重传请求协议(Automatic Repeat reQuest)ARQ,需要下面三个功能

  • 差错检测
  • 接收方回传ack或者nak
  • 重传

发送端

看一下发送端的简单实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
function rdt_send($data) {
//生成校验和
$checkSum = generateCheckSum($data);
//组装报文
$packet = make_pkt($data, $checkSum);
//调用网络层传输
udt_send($packet);
//等待接收方回传ack或者nak
$isAck = rdt_rev();
//判断ack
if ($isAck == 1) {
//收到了Ack分组,可以结束了
return true;
} else if ($isAck == 0) {
//收到了Nak分组,需要重传
return rdt_send($data);
}
}

接收端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
function rdt_rev($packet) {
//差错检测
if (check($packet)) {
//解析报文
$data = extract($packet);
//没有错,把数据交付给应用层并回传ack
//把数据给应用层
deliver_data($data);
//回传ACK
$ack = make_pkt(1);
udt_send($ack);
} else {
//有错,回传一个nak,不交付数据
$nak = make_pkt(0);
udt_send($nak);
}
}

状态机

现在发送端有两个状态

  • 等待调用
  • 等待返回ack或nak

接收端还是一个状态

  • 等待调用

rdt2.0也被称为停等协议。因为发送端处于等待ack状态是不能被上层调用的。

ack受损rdt2.1

从上面的代码可以看出来,接收端发送ack使用的是udt_send函数,这个函数是不可靠的。那么如果我们的ack或者nak损坏了怎么办。

这时候可以像处理损坏分组一样。我们校验ack是否受损,如果受损,那么我们重传分组。

可是重传分组就会造成接收方不知道这个分组我有没有收到过。所以我们需要增加分组序号

对于停等协议来说,0和1就够用了。因为停等协议只有两个状态,发完会等待ack。

发送端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//序号
$num = 0;
function rdt_send($data) {
//生成校验和
$checkSum = generateCheckSum($data);

//组装报文
$packet = make_pkt($data, $checkSum, $num);
//调用网络层传输
udt_send($packet);
//等待接收方回传ack或者nak
$isAck = rdt_rev();
//差错检测
if (check($isAck)) {
//没出问题,那么把改变序号
$num = !$num;
//判断ack
if ($isAck == 1) {
//收到了Ack分组,可以结束了
return true;
} else if ($isAck == 0) {
//收到了Nak分组,需要重传
return rdt_send($data);
}
} else {
//ack出问题了,那么这个时候重传
return rdt_send($data);
}
}

接收端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//序号
$num = 0;
function rdt_rev($packet) {
//差错检测
if (check($packet)) {
//判断报文序号
if ($packet['num'] == $num) {
//解析报文
$data = extract($packet);
//序号对的
//没有错,把数据交付给应用层并回传ack
//把数据给应用层
deliver_data($data);
//回传ACK
$ack = make_pkt(1, $num);
udt_send($ack);
} else {
//序号错了,说明这不是我们要的,我们回传一个ack,告诉发送端这个分组我们收到了。
//回传ACK
$ack = make_pkt(1, $num);
udt_send($ack);
}
} else {
//有错,回传一个nak,不交付数据
$nak = make_pkt(0, $num);
udt_send($nak);
}
}

状态机

发送端有4个状态

  • 等待调用0
  • 等待ack0或者nak0
  • 等待调用1
  • 等待ack1或者nak1

接收端有2个状态

  • 等待调用0
  • 等待调用1

去掉nak分组的rdt2.2

从上面的代码可以看出来,发送端在接收到nak的时候和丢失ack或者nak的时候都是重传。
所以我们只需要判断ack就可以了。那么同样接收方只需要回传ack就可以了。
这样一来,代码更见简单了。

发送端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//序号
$num = 0;
function rdt_send($data) {
//生成校验和
$checkSum = generateCheckSum($data);
//组装报文
$packet = make_pkt($data, $checkSum, $num);
//调用网络层传输
udt_send($packet);
//等待接收方回传ack
$isAck = rdt_rev();
//差错检测
if (check($isAck) && $isAck['num'] == $num) {
//没出问题,那么把改变序号
$num = !$num;
//收到了Ack分组,可以结束了
return true;
} else {
//ack出问题了,那么这个时候重传
return rdt_send($data);
}
}

接收端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//序号
$num = 0;
function rdt_rev($packet) {
//差错检测通过了并且报文序号正确
if (check($packet) && $packet['num'] == $num) {
//解析报文
$data = extract($packet);
//序号对的
//没有错,把数据交付给应用层并回传ack
//把数据给应用层
deliver_data($data);
//回传ACK
$ack = make_pkt(1, $num);
udt_send($ack);
} else {
//没通过差错检测或者序号错误,我们回传一个上一个ack,告诉发送端上一个分组我们收到了,当前分组没收到。
//回传ACK
$ack = make_pkt(1, !$num);
udt_send($ack);
}
}

状态机

发送端有4个状态

  • 等待调用0
  • 等待ack0或者nak0
  • 等待调用1
  • 等待ack1或者nak1

接收端有2个状态

  • 等待调用0
  • 等待调用1

经具有比特差错的丢包信道的可靠数据传输rdt3.0

现在底层信道除了会出错,还会丢包了。

如果遇到丢包怎么办呢,也就是接收方接收不到数据了。这个时候也就回传不了ack

那么可以在发送端加上超时机制。如果长时间没收到ack。那么就重传分组。

发送端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//序号
$num = 0;
function rdt_send($data) {
//生成校验和
$checkSum = generateCheckSum($data);
//组装报文
$packet = make_pkt($data, $checkSum, $num);
//调用网络层传输
udt_send($packet);
//启动一个定时器
start_timer();
//等待接收方回传ack 并且没有超时
if ($isAck = rdt_rev() && !timeout()) {
//差错检测
if (check($isAck) && $isAck['num'] == $num) {
//没出问题,那么把改变序号
$num = !$num;
//收到了Ack分组,可以结束了
return true;
} else {
//ack出问题了,那么这个时候重传
return rdt_send($data);
}
} else {
//没接收到ack或者超时 重发
return rdt_send($data);
}
}

接收端

无变化

状态机

发送端

流水线可靠数据传输协议

停等协议的缺点是性能受限。因为每次要等待上一个ack回来才能发送下一个报文。

而采用流水线就是不等待ack直接发送下一个报文。

这样会有下面的影响

  • 必须增加序号范围,因为每个分组必须有序号
  • 协议的发送方和接收方两端也许不得不缓存多个分组。
  • 所需序号范围和对缓冲的要求取决于数据传输协议如何处理丢失,损坏及时延大的分组。解决流水线的差错恢复有两种基本方法:
    • 回退N步(Go Back N)GBN
    • 选择重传(Selective Repeat)SR

GBN

滑动窗口协议

SR

选择重传协议

运输层

运输层和网络层的关系

网络层是主机到主机,端到端的逻辑传输。

运输层是应用到应用,端口到端口的逻辑传输。

先由网络层送到主机,再通过运输层送到对应的端口程序中。

运输层将应用层报文封装成报文段交给网络层

将主机间交付扩展到进程间交付被称为运输层的多路复用和多路分解

UDP提供不可靠服务

  • 差错检测
  • 数据交付

TCP提供可靠服务

  • 流量控制
  • 序号
  • 确认
  • 定时器
  • 拥塞控制

多路复用和多路分解

源主机使用多路复用把多个套接字进程的报文发送给目的主机

目的主机使用多路分解把报文发送给多个套接字进程

运输层需要再首部信息加入

  • 源端口号
  • 目的端口号

通过端口号来区分进程

UDP套接字通过二元组来标识

  • 目的端口号
  • 目的ip
    只要你的目的ip和端口号相同就算你的源ip和端口不一样,也会分解到同一个套接字

TCP通过四元组来标识

  • 源ip
  • 源端口
  • 目的Ip
  • 目的端口

这是因为TCP会创建链接,一个TCP进程有一个“欢迎套接字”,用来等待程序过来,然后创建一个新的套接字来进行通信。

如果四元组一致会分解到一个套接字,不一致,会分解到另外的套接字。

web服务器和TCP

现在的计算机有线程的概念,所以TCP链接一般也不会创建多个进程来服务不同的客户端,而是创建多个线程套接字,通过分解到不同的线程套接字来服务多个客户端。

无连接运输:UDP

UDP只提供了运输层最低限度的东西。

  • 差错检测
  • 复用分解

DNS是使用UDP的一个应用层协议.

UDP存在的意义及优势:

  • 关于发送什么数据以及何时发送的应用层控制更为精细。
    采用UDP时,只要应用将数据给UDP,UDP就会直接传递给IP网络层。TCP有拥塞控制和重发机制,但是这样会需要更长的时间。因为实时应用通常要求最小的发送速率,不希望过分的延迟报文段的传送,且能容忍一部分数据丢失,TCP服务模型并不是特别适合这些应用的需要。这些应用使用UDP,并可以再应用层实现所需的,超出UDP的额外功能。
  • 无需链接建立
    TCP需要三次握手建立链接,UDP不需要。因此没有建立链接的时延。
  • 无连接状态
    TCP需要再端系统中维护链接状态。此链接状态包括接受和发送缓存,拥塞控制参数以及序号与确认号的参数。UDP一般能支持更多的活跃用户。
  • 分组首部开销小
    每个TCP报文段首部有20字节,UDP只有8字节

UDP也可以通过应用层实现可靠性传输。比如chrome的QUIC协议

UDP报文段结构

  • 首部字段 8字节
    • 源端口号 2字节
    • 目的端口号 2字节
    • 长度 2字节 数据长度
    • 校验和 2字节 差错检测
  • 数据

UDP校验和

发送方的UDP对报文段中的所有16比特字的和进行反码运算,求和时候遇到的任何溢出都被回卷。得到的结果被放在UDP报文段中的校验和字段。

例子

如果有三个16比特的字:

0110011001100000
0101010101010101
1000111100001100

计算他们的和

0110011001100000
+0101010101010101
=1011101110110101
+1000111100001100
=0100101011000001

这个时候溢出了,把溢出的1加到后面
=0100101011000010

进行反码运输,把1变0,0变1
=1011010100111101

这就变成了校验和。

接收方将三个16比特和校验和加在一起,如果全是1,那么就没问题。如果有0,那么就有差错。

把上面的和加上校验和算一下
0100101011000010
+1011010100111101
=1111111111111111

UDP虽然实现了差错检测,但是没有差错恢复。他只是丢弃受损的报文段。其他实现是将受损的报文段交给应用程序并给出警告。

TCP

TCP提供全双工服务。双方都可以发送数据。

TCP首先建立连接。然后把数据引导到发送缓存中。从发送缓存中取出数据进行发送。取出的数量受限于最大报文段长度MSS。MSS一般根据最大链路层帧长度MTU。以太网和PPP链路层都具有1500MTU。所以MSS的典型值在1460 + 40字节的TCP/IP首部长度。

TCP报文段结构:

  • 源端口号 16比特
  • 目的端口号 16比特
  • 序号 32比特
  • 确认号 32比特
  • 接收窗口 16比特 用于流量控制
  • 首部长度 4比特
  • 选项字段 可选变长
  • 标志字段 6比特
    • ACK用于确认
    • RST, SYN, FIN用于连接建立和删除
    • 拥塞通告中使用 CWR和ECE字段
    • PSH置位标识接收方应立即将数据交给上层
    • URG用来指示紧急数据
  • 紧急数据指针字段 16比特
  • 校验和 16比特

往返时间的估计与超时

估计往返时间

估计一个SampleRTT作为样本,根据SampleRTT取平均值,也就是EstimatedRTT。

EstimatedRTT = (1 - a) * EstimatedRTT + a * SmapleRTT

a的推荐值是0.125。

RTT的偏差用DevRTT表示

DevRTT = (1 - b) * DevRTT + b * |SampleRTT - EstimatedRTT|

b的推荐值是0.25

设置和管理重传超时间隔

超时间隔应该 >= EstimatedRTT。但是也不能大太多

TimeoutInterval = EstimatedRTT + 4 * DevRTT

TCP设置单一的定时器,每当超时一次超时时间会加倍,以免造成网络拥塞。

TCP收到同一个ACK三次,会进行快速重传而不等待定时器超时。

TCP采用的是GBN和SR的混合体。

流量控制

TCP有发送缓存和接收缓存。接收方把数据放入接收缓存,然后读取到应用层。

当发送数据太多,为防止接收缓存装不下,需要控制发送端发送数量。

接收窗口字段就是这个作用,用来控制接收方还有多少空间。发送方就可以根据这个调整发送数量。

如果接收缓存已经满了。那么发送方依旧会发送1比特的数据。这样才能知道接收方变化后的接收缓存大小。

建立连接

  • 第一步:发送一个SYN = 1 Seq = 随机序号
  • 第二步:返回一个SYN = 1 Seq = 随机序号 Ack = Seq + 1
  • 第三步:SYN = 0 Seq = Ack Ack = Seq + 1 可以携带数据

关闭连接

  • 第一步:发送一个FIN = 1
  • 第二步:接收一个ACK
  • 第三步:接收一个FIN = 1
  • 第四步:发送一个ACK

拥塞控制

  • 端到端的拥塞控制。网络层不提供支持,端系统观察网络层得到结果。

  • 网络辅助的拥塞控制。

  • 丢失的报文段表示拥塞,降低发送速率。

  • 确认报文段表示顺利,提高发送速率

  • 带宽检测,逐步提高发送速率,如果丢失超时就降低然后接着提高。

慢启动

开始速率较小。大概MSS/RTT.
每当收到一个确认,就增加一个MSS。也就是指数级增长,不断翻倍。
何时结束?

  • 遇到丢包超时
  • 达到一个阈值

结束慢启动进入拥塞避免模式

拥塞避免模式

不再郑家指数级,而是一个一个增长。线性增长

supervisor

supervisor是一个守护进程软件。

安装

ubuntu安装很简单。直接apt

1
sudo apt-get install supervisor

配置

修改配置项,加入我们的命令

1
sudo vim /etc/supervisor/conf.d/test.conf

复制下面的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[program:命令名称]
;目录
;directory=/test 这里我注释掉了,如果你加上以后报错可以注释掉试试
;执行的命令如果没有配置环境变量或者软连接请使用全路径
command=php /test/think queue:listen --queue im
process_name=%(program_name)s_%(process_num)02d;
numprocs = 3
autostart = true
startsecs = 5
autorestart = true
startretries = 3
;执行的linux用户
user=root
redirect_stderr = true
stdout_logfile_maxbytes = 50MB
stdout_logfile_backups = 20
;日志位置
stdout_logfile = /var/log/supervisor/queue_worker.log
loglevel=info

然后启动supervisor,因为是apt安装的,所以很好启动

1
sudo service supervisor start

查看启动了没有,有两种方法,一种status命令,还有一种查看linux进程

1
sudo service supervisor status

查看进程

1
ps -aux |grep super

再查看我们的命令挂起来没有。我们执行的php命令所以直接查看php

1
ps -aux|grep php

如果看见刚刚的命令就成功了。

php-simplexml解析一个或多个结构的坑

php解析xml还是挺方便的,不管是正常的xml,还是加了一个命名空间或者前缀的xml。都可以通过simplexml_load_string函数来解析成数组或者对象。

simplexml_load_string

来看一下使用方法。

1
2
3
$xml = "<reports><report><id>1</id><name>张三</name></report></reports>"

$arr = (array)simplexml_load_string($xml);

是不是很简单,如果你的带啦前缀或者命名空间也可以使用。下面带了s前缀

1
2
3
$xml = "<s:reports><s:report><s:id>1</id><s:name>张三</name></report></reports>"

$arr = (array)simplexml_load_string($xml,'SimpleXMLElement',0,'s',true);

但是如果带了多个前缀,这个函数就无能为力了,可以使用别的方法解析。

bug

不过这个函数解析一个和多个结果是不一样的,这里解析出来一定要做判断!!!

下面有两个report,解析出的结果是一个数组。

1
2
3
$xml = "<reports><report><id>1</id><name>张三</name></report><report><id>1</id><name>张三</name></report></reports>"

$arr = (array)simplexml_load_string($xml);

而上面只有一个report的时候,解析出来就是一个对象!!!

php-thinkphp-报错Creating default object from empty value

报错第一步,打印,打印日志,在你用到对象的地方,把对象都打印出来看看,你以为他是个对象,但他。。。不一定是个对象!!!

如果你确定你从数据库查询出来的没有错,是个对象,那么。。请看一下你别的对象是不是一个对象!!!

要相信报错,一定是对象错了,但你不确定,所以,请打印日志,如果你这里没问题,别人那里有问题,那么可能是参数的问题。打印日志在别人那就能看到你想的对象可能在他那里不是一个对象!!!

thinkphp-queue队列使用

在我们写程序的时候,经常会用到队列来完成一些操作,关于队列的介绍和使用场景,注意事项可以看我的这个文章你不知道的队列使用技巧

在tp里面使用队列

安装

tp框架提供了一个扩展包,叫做think-queue。我们先来安装这个扩展包。

composer require topthink/think-queue

配置消息队列

等待安装完成之后,我们需要进行配置,消息队列的消息存放在哪里,可以配置成redis。

配置在你的config/queue.php里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

'default' => 'redis', //默认是sync,改成redis

'connections' => [
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => env('queue.host', '127.0.0.1'),
'port' => env('queue.port', 6379),
'password' => env('queue.password', ''),
'select' => 0, // 使用哪一个 db,默认为 db0
'timeout' => 0, // redis连接的超时时间
'persistent' => false, // 是否是长连接
],
],

创建消息

配置完成以后我们就可以开始使用了。

在我们的controller里面把一个消息推送到队列里面。这里我们定义一个队列名称叫做message,定义一个处理队列消息的消费者类app\common\queue\consumer。然后调用Queue门面的push方法,把消费者,队列名称,数据传入进去就可以了。这个时候就会把数据放到message这个队列里面。然后消费者取出数据进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php
namespace app\api\controller;

use app\BaseController;
use think\facade\Queue;

class Test extends BaseController
{

protected $consumer = 'app\common\queue\consumer'; //消费者类

protected $queue = 'message'; //队列名称

public function test() {
if ($this->request->isPost()) {
//要推送到队列里面的数据
$jobData = [];
$jobData["a"] = 'a';
$jobData['b'] = 'b';

$res = Queue::push($this->consumer, $data, $this->queue);
return json([]);
}
}
}

消费消息

我们接下来实现我们上面定义的消费者。来处理我们的逻辑。

消费者app\common\queue\consumer类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<?php
namespace app\common\queue;

use think\queue\Job;

class consumer {
/**
* fire方法是消息队列默认调用的方法
* @param Job $job 当前的任务对象
* @param array|mixed $data 发布任务时自定义的数据
*/
public function fire(Job $job,$data)
{
// 有些消息在到达消费者时,可能已经不再需要执行了
$isJobStillNeedToBeDone = $this->check($data);
if(!$isJobStillNeedToBeDone){
$job->delete(); //删除任务
return;
}

//执行任务
$isJobDone = $this->doJob($data);

if ($isJobDone) {
// 如果任务执行成功, 记得删除任务
$job->delete();
}else{
if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了
$job->delete();
}
}
}

/**
* 有些消息在到达消费者时,可能已经不再需要执行了
* @param array|mixed $data 发布任务时自定义的数据
* @return boolean 任务执行的结果
*/
private function check($data){
return true;
}

/**
* 根据消息中的数据进行实际的业务处理...
*/
private function doJob($data)
{
dump($data);
return true;
}
}

这样我们就完成了代码的逻辑,也就是发布消息,消费消息。

接下来我们启动这个队列。

启动队列

启动队列有两种方式。

  • work
  • listen

work方式启动。这种方式是单进程运行。如果你更新了代码需要手动重启队列。

php think queue:work –queue message //我们刚刚定义的队列名称

listen方式启动。这种方式是master-worker模型。一个master主进程来监听,当请求进来了启动一个work子进程来运行上面的work方式启动。

php think queue:listen –queue message

我更推荐listen方式来运行。这种方式更新代码后也不需要手动重启。

thinkphp-queue队列导致MySQL server has gone away

虽然队列一时爽,不过还是有缺点的,比如当队列运行时间长了会报错 MySQL server has gone away

原因是使用work模式运行时间长了以后没有释放mysql数据库的链接,导致时间长了以后被mysql server端判断超时切断了链接。

可以改用listen模式运行,这样每次都是启动一个新的work进程来运行程序,每次都会新链接数据库。

可以使用tp的断线重连功能。修改配置文件config/database.php

1
2
3
4
5
6
7

// 数据库连接配置信息
'connections' => [
'mysql' => [
// 是否需要断线重连
'break_reconnect' => true,
],

php-mpdf扩展包中文乱码问题

mpdf是一个可以把html网页转换成pdf文件的扩展包。一开始使用的时候,发现中文乱码了。。在网上查了半天,好多方法都不管用。

最后,在他的文档里面找到了问题原因。

想要输出中文,有两个参数至关重要!!!

  • autoLangToFont 这个值一定要设置为true才可以
  • autoScriptToLang 这个值也一定要设置为true才可以

只要上面两个设置为true,那么你的中文就可以正常输出了。相信我,不能正常输出你来打我。

看一下mpdf文档上面的描述。

mpdf

mpdf

可以看到默认值是false,所以我们使用的时候需要改成true。

设置这两个值也很简单。

1
2
3
4
5
6
7
8
9
10
11
12

use Mpdf\Mpdf;

function test() {
$pdf = new Mpdf;
$pdf->autoLangToFont = true;
$pdf->autoScriptToLang = true;

$pdf->writeHTML('<h1>123</h1>');

return $pdf->output('./test.pdf', 'D');
}

其实,mpdf的文档最开始是有错误的,他的文档中写的默认值是true而不是现在的false。不过从他的源码上可以看到他的默认值其实是false

源码位置:vendor/mpdf/mpdf/src/Config/ConfigVariables.php里面。
这个文件里面是很多变量的默认值,在这里面搜索可以看到这两个值是false。

1
2
3
4
5
6
7
8

// AUTOMATIC FONT SELECTION
// Based on script and/or language
// mPDF 6.0 (similar to previously using function SetAutoFont() )
'autoScriptToLang' => false,

// mPDF 6.0 (similar to old useLang)
'autoLangToFont' => false,

我给他们的github上面提了一个issue,他们才把文档改过来了。

mpdf

最后附上mpdf官方文档:

http://mpdf.github.io/fonts-languages/fonts-in-mpdf-7-x.html

我给他们提的issue:

https://github.com/mpdf/mpdf.github.io/issues/141

thinkphp-tp6使用chunk分块操作数据的坑

有的时候我们会遇到需要定时操作数据的需求,比如定时更新所有用户的权益,徽章等等。这个时候你不能一次性取出所有数据来进行操作,因为数据量太大了,我们一次取出全部,先不说mysql数据库会很慢,就算取出来传给你,网络开销也很大。这时候你通过网络接收到数据以后,会把这些数据放到一个变量里面。这个变量是存在内存中的,如果过大还会导致内存溢出,内存不足的问题。

所以我们就需要分页取出数据来进行操作,比如每次取出100条,操作完了再取出下100条。而tp框架提供了一个方便的chunk方法来供我们使用,免去了我们需要手动limit分页的麻烦。

我之前使用过laravel的chunk,以为两个差不多,看了文档也觉得差不多。下图是tp6文档的描述。

tp

其实单表这么写也没有什么问题,不过一旦你使用了连表查询,就出现问题了。。而他的文档并没有说连表的问题。

虽然他的文档有这么一段也说明了主键和排序的问题。

tp

但是,没想到连表的时候是必须,注意,必须!!!传主键,不然他不知道是哪个表的主键。而laravel就没有这个问题。。

我当时写的时候去找了他的源码,才看到这个问题,因为我正常写完后一直报错。。

看一下他的源码。源码位置在./vendor/topthink/think-orm/src/db/query.php里面的chunk方法。

tp

tp

从这里可以看到他有4个参数。

  • count 每次处理的数量
  • callback 处理的回调函数
  • column 处理的字段名 默认 null
  • order 字段排序 默认asc

前两个我们必传,后面的可选。

他的第二行代码,如果你传了第三个参数,那么使用你传的,不然调用getPk这个函数。这个函数在源码里面也有,就是获取主键。假设你不传,你的主键是id,那么column这个变量就是id。

1
$column  = $column ?: $this->getPk();

接下来的代码你会发现你的column参数,还可以传一个数组。如果是一个数组,那么他在这里不使用这个参数。

如果你传的不是一个数组,那么看有没有.也就是连不连表。因为连表你传的是a.id。如果连表那么explode分割成数组[a,id]的形式。

如果你传的就是id那么直接赋值给变量key

1
2
3
4
5
6
7
8
9
10
11
12
13

if (is_array($column)) {
$times = 1;
$query = $this->options($options)->page($times, $count);
} else {
$query = $this->options($options)->limit($count);

if (strpos($column, '.')) {
[$alias, $key] = explode('.', $column);
} else {
$key = $column;
}
}

接下来就是真正获取数据,然后调用回调函数,再重复获取数据的过程了。

1
$resultSet = $query->order($column, $order)->select();

这里可以看到,我们传数组,那么数组就会直接给order函数,如果是连表的主键a.id,那么就会把[a,id]给order函数,如果是单表,那么默认id主键给order函数。