|
fqdu3v2uqgq640129117307.gif (3.61 KB, 下載次數(shù): 1)
下載附件
保存到相冊(cè)
fqdu3v2uqgq640129117307.gif
3 天前 上傳
關(guān)注、星標(biāo)公眾號(hào),直達(dá)精彩內(nèi)容
一 問(wèn)題背景 最近部門時(shí)刻準(zhǔn)備刪庫(kù)跑路的小兄弟調(diào)試CAN通訊的時(shí)候,遇到個(gè)MCU死機(jī)的問(wèn)題,而負(fù)責(zé)這塊代碼的哥們已經(jīng)跑路好幾年了,而這個(gè)設(shè)備也已經(jīng)用了好幾年了,也沒(méi)有反饋過(guò)什么問(wèn)題。但是在他那里測(cè)試MCU 100%會(huì)死機(jī),進(jìn)入hardfault,一時(shí)間CPU都被干燒了,祖?zhèn)鞯拇a這么不靠譜嗎....
最終問(wèn)題定位在kfifo這部分出了問(wèn)題,而kfifo是linux kernal里面非常經(jīng)典的一段代碼,應(yīng)該不至于吧,對(duì)此進(jìn)行了分析驗(yàn)證。
二 什么是kfifo 釋義摘自:https://blog.csdn.net/linyt/article/details/53355355 講解的很詳細(xì),如有侵權(quán),聯(lián)系刪除~
kfifo是內(nèi)核里面的一個(gè)First In First Out數(shù)據(jù)結(jié)構(gòu),它采用環(huán)形循環(huán)隊(duì)列的數(shù)據(jù)結(jié)構(gòu)來(lái)實(shí)現(xiàn),提供一個(gè)無(wú)邊界的字節(jié)流服務(wù),并且使用并行無(wú)鎖編程技術(shù),即當(dāng)它用于只有一個(gè)入隊(duì)線程和一個(gè)出隊(duì)線程的場(chǎng)情時(shí),兩個(gè)線程可以并發(fā)操作,而不需要任何加鎖行為,就可以保證kfifo的線程安全。
kfifo代碼既然肩負(fù)著這么多特性,那我們先一敝它的代碼:
2.1 kfifo數(shù)據(jù)結(jié)構(gòu) struct kfifo {
unsigned char *buffer; /* the buffer holding the data */
unsigned int size; /* the size of the allocated buffer */
unsigned int in; /* data is added at offset (in % size) */
unsigned int out; /* data is extracted from off. (out % size) */
spinlock_t *lock; /* protects concurrent modifications */
};
這是kfifo的數(shù)據(jù)結(jié)構(gòu),kfifo主要提供了兩個(gè)操作,__kfifo_put(入隊(duì)操作)和__kfifo_get(出隊(duì)操作)。 它的各個(gè)數(shù)據(jù)成員如下:
buffer: 用于存放數(shù)據(jù)的緩存
size: buffer空間的大小,在初化時(shí),將它向上擴(kuò)展成2的冪
lock: 如果使用不能保證任何時(shí)間最多只有一個(gè)讀線程和寫線程,需要使用該lock實(shí)施同步。
in, out: 和buffer一起構(gòu)成一個(gè)循環(huán)隊(duì)列。 in指向buffer中隊(duì)頭,而且out指向buffer中的隊(duì)尾,它的結(jié)構(gòu)如示圖如下:
+--------------------------------------------------------------+
| || |
+--------------------------------------------------------------+
^ ^ ^
| | |
out in size
當(dāng)然,內(nèi)核開(kāi)發(fā)者使用了一種更好的技術(shù)處理了in, out和buffer的關(guān)系,我們將在下面進(jìn)行詳細(xì)分析。
2.2 kfifo功能描述 kfifo提供如下對(duì)外功能規(guī)格:
1 只支持一個(gè)讀者和一個(gè)讀者并發(fā)操作;2 無(wú)阻塞的讀寫操作,如果空間不夠,則返回實(shí)際訪問(wèn)空間;kfifo_alloc 分配kfifo內(nèi)存和初始化工作
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
unsigned char *buffer;
struct kfifo *ret;
/*
* round up to the next power of 2, since our 'let the indices
* wrap' tachnique works only in this case.
*/
if (size & (size - 1)) {
BUG_ON(size > 0x80000000);
size = roundup_pow_of_two(size);
}
buffer = kmalloc(size, gfp_mask);
if (!buffer)
return ERR_PTR(-ENOMEM);
ret = kfifo_init(buffer, size, gfp_mask, lock);
if (IS_ERR(ret))
kfree(buffer);
return ret;
}
這里值得一提的是,kfifo->size的值總是在調(diào)用者傳進(jìn)來(lái)的size參數(shù)的基礎(chǔ)上向2的冪擴(kuò)展,這是內(nèi)核一貫的做法。這樣的好處不言而喻——對(duì)kfifo->size取模運(yùn)算可以轉(zhuǎn)化為與運(yùn)算,如下:
kfifo->in % kfifo->size
可以轉(zhuǎn)化為
kfifo->in & (kfifo->size – 1)
在kfifo_alloc函數(shù)中,使用size & (size – 1)來(lái)判斷size 是否為2冪,如果條件為真,則表示size不是2的冪,然后調(diào)用roundup_pow_of_two將之向上擴(kuò)展為2的冪。
這都是常用的技巧,只不過(guò)大家沒(méi)有將它們結(jié)合起來(lái)使用而已,下面要分析的__kfifo_put和__kfifo_get則是將kfifo->size的特點(diǎn)發(fā)揮到了極致。
__kfifo_put和__kfifo_get巧妙的入隊(duì)和出隊(duì)
_kfifo_put是入隊(duì)操作,它先將數(shù)據(jù)放入buffer里面,最后才修改in參數(shù);__kfifo_get是出隊(duì)操作,它先將數(shù)據(jù)從buffer中移走,最后才修改out。你會(huì)發(fā)現(xiàn)in和out兩者各司其職。
下面是__kfifo_put和__kfifo_get的代碼
unsigned int __kfifo_put(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->size - fifo->in + fifo->out);
/*
* Ensure that we sample the fifo->out index -before- we
* start putting bytes into the kfifo.
*/
smp_mb();
/* first put the data starting from fifo->in to buffer end */
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
/* then put the rest (if any) at the beginning of the buffer */
memcpy(fifo->buffer, buffer + l, len - l);
/*
* Ensure that we add the bytes to the kfifo -before-
* we update the fifo->in index.
*/
smp_wmb();
fifo->in += len;
return len;
}
奇怪嗎?代碼完全是線性結(jié)構(gòu),沒(méi)有任何if-else分支來(lái)判斷是否有足夠的空間存放數(shù)據(jù)。內(nèi)核在這里的代碼非常簡(jiǎn)潔,沒(méi)有一行多余的代碼。
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
這個(gè)表達(dá)式計(jì)算當(dāng)前寫入的空間,換成人可理解的語(yǔ)言就是:
l = kfifo可寫空間和預(yù)期寫入空間的最小值
使用min宏來(lái)代if-else分支
__kfifo_get也應(yīng)用了同樣技巧,代碼如下:
unsigned int __kfifo_get(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->in - fifo->out);
/*
* Ensure that we sample the fifo->in index -before- we
* start removing bytes from the kfifo.
*/
smp_rmb();
/* first get the data from fifo->out until the end of the buffer */
l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
/* then get the rest (if any) from the beginning of the buffer */
memcpy(buffer + l, fifo->buffer, len - l);
/*
* Ensure that we remove the bytes from the kfifo -before-
* we update the fifo->out index.
*/
smp_mb();
fifo->out += len;
return len;
}
原來(lái),kfifo每次入隊(duì)或出隊(duì),kfifo->in或kfifo->out只是簡(jiǎn)單地kfifo->in/kfifo->out += len,并沒(méi)有對(duì)kfifo->size 進(jìn)行取模運(yùn)算。因此kfifo->in和kfifo->out總是一直增大,直到unsigned in最大值時(shí),又會(huì)繞回到0這一起始端。但始終滿足:
kfifo->in - kfifo->out size
即使kfifo->in回繞到了0的那一端,這個(gè)性質(zhì)仍然是保持的。
對(duì)于給定的kfifo:
數(shù)據(jù)空間長(zhǎng)度為:kfifo->in - kfifo->out
而剩余空間(可寫入空間)長(zhǎng)度為:kfifo->size - (kfifo->in - kfifo->out)
盡管kfifo->in和kfofo->out一直超過(guò)kfifo->size進(jìn)行增長(zhǎng),但它對(duì)應(yīng)在kfifo->buffer空間的下標(biāo)卻是如下:
kfifo->in % kfifo->size (i.e. kfifo->in & (kfifo->size - 1))
kfifo->out % kfifo->size (i.e. kfifo->out & (kfifo->size - 1))
往kfifo里面寫一塊數(shù)據(jù)時(shí),數(shù)據(jù)空間、寫入空間和kfifo->size的關(guān)系如果滿足:
kfifo->in % size + len > size
那就要做寫拆分了,見(jiàn)下圖:
kfifo_put(寫)空間開(kāi)始地址
|
\_/
|XXXXXXXXXX
XXXXXXXX|
+--------------------------------------------------------------+
| || |
+--------------------------------------------------------------+
^ ^ ^
| | |
out%size in%size size
^
|
寫空間結(jié)束地址
第一塊當(dāng)然是: [kfifo->in % kfifo->size, kfifo->size]
第二塊當(dāng)然是:[0, len - (kfifo->size - kfifo->in % kfifo->size)]
下面是代碼,細(xì)細(xì)體味吧:
/* first put the data starting from fifo->in to buffer end */
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
/* then put the rest (if any) at the beginning of the buffer */
memcpy(fifo->buffer, buffer + l, len - l);
對(duì)于kfifo_get過(guò)程,也是類似的,請(qǐng)各位自行分析。
kfifo_get和kfifo_put無(wú)鎖并發(fā)操作
計(jì)算機(jī)科學(xué)家已經(jīng)證明,當(dāng)只有一個(gè)讀經(jīng)程和一個(gè)寫線程并發(fā)操作時(shí),不需要任何額外的鎖,就可以確保是線程安全的,也即kfifo使用了無(wú)鎖編程技術(shù),以提高kernel的并發(fā)。
kfifo使用in和out兩個(gè)指針來(lái)描述寫入和讀取游標(biāo),對(duì)于寫入操作,只更新in指針,而讀取操作,只更新out指針,可謂井水不犯河水,示意圖如下:
||
+--------------------------------------------------------------+
| || |
+--------------------------------------------------------------+
||
^ ^ ^
| | |
out in size
為了避免讀者看到寫者預(yù)計(jì)寫入,但實(shí)際沒(méi)有寫入數(shù)據(jù)的空間,寫者必須保證以下的寫入順序:
往[kfifo->in, kfifo->in + len]空間寫入數(shù)據(jù)
更新kfifo->in指針為 kfifo->in + len
在操作1完成時(shí),讀者是還沒(méi)有看到寫入的信息的,因?yàn)閗fifo->in沒(méi)有變化,認(rèn)為讀者還沒(méi)有開(kāi)始寫操作,只有更新kfifo->in之后,讀者才能看到。
那么如何保證1必須在2之前完成,秘密就是使用內(nèi)存屏障:smp_mb(),smp_rmb(), smp_wmb(),來(lái)保證對(duì)方觀察到的內(nèi)存操作順序。
三 kfifo在工程中的移植應(yīng)用上面已經(jīng)詳細(xì)介紹了kfifo的實(shí)現(xiàn)原理以及讓人驚嘆的代碼簡(jiǎn)潔之道,接下來(lái)在MCU工程中實(shí)際去應(yīng)用,看看效果如何。
直接上代碼:
kfifo.h
#ifndef __KFIFO_H
#define __KFIFO_H
#include "stdint.h"
#include "stdlib.h"
#include "string.h"
#include "stdio.h"
#define MAX(a, b) (((a) > (b)) ? (a) : (b))
#define MIN(a, b) (((a) (b)) ? (a) : (b))
#define is_power_of_2(x) ((x) != 0 && (((x) & ((x)-1)) == 0))
typedef struct {
unsigned char *buffer; /* the buffer holding the data*/
unsigned int size; /* the size of the allocated buffer*/
unsigned int in; /* data is added at offset (in % size)*/
unsigned int out; /* data is extracted from off. (out % size)*/
} kfifo;
/**
* @brief CAN Rx message structure definition
*/
#define CAN_FIFO_SIZE (2 * 1024)
typedef struct {
uint8_t FifoBuf1[CAN_FIFO_SIZE];
// uint8_t FifoBuf2[CAN_FIFO_SIZE];
uint8_t FpStep;
} CAN_STA_TYPE;
/* USER CODE BEGIN Prototypes */
typedef struct {
uint32_t StdId;
uint32_t ExtId;
uint32_t IDE;
uint32_t RTR;
uint32_t DLC;
uint8_t Data[8];
uint32_t Timestamp;
uint32_t FilterMatchIndex;
} MyCanRxMsgTypeDef;
extern kfifo gFifoReg1;
extern kfifo gFifoReg2;
void kfifo_init(kfifo *fifo, unsigned char *buffer, unsigned int size);
unsigned int kfifo_put(kfifo *fifo, unsigned char *buffer, unsigned int len);
unsigned int kfifo_get(kfifo *fifo, unsigned char *buffer, unsigned int len);
unsigned int can_fifo_get(kfifo *fifo, unsigned char *buffer, unsigned int len);
void kfifo_reset(kfifo *fifo);
unsigned int kfifo_len(kfifo *fifo);
#endif
kfifo.c
#include "kfifo.h"
kfifo gFifoReg1;
kfifo gFifoReg2;
/*
* kfifo初始化
*/
void kfifo_init(kfifo *fifo, unsigned char *buffer, unsigned int size)
{
if (!is_power_of_2(size))
{
return;
}
fifo->buffer = buffer;
fifo->size = size;
fifo->in = fifo->out = 0u;
memset(fifo->buffer, 0, size);
}
/*
* __kfifo_put - puts some data into the FIFO, no locking version
* @fifo: the fifo to be used.
* @buffer: the data to be added.
* @len: the length of the data to be added.
*
* This function copies at most 'len' bytes from the 'buffer' into
* the FIFO depending on the free space, and returns the number of
* bytes copied.
*
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int kfifo_put(kfifo *fifo, unsigned char *buffer, unsigned int len)
{
unsigned int l = 0;
len = MIN(len, fifo->size - fifo->in + fifo->out);
/* first put the data starting from fifo->in to buffer end*/
l = MIN(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
/* then put the rest (if any) at the beginning of the buffer*/
memcpy(fifo->buffer, buffer + l, len - l);
fifo->in += len;
return len;
}
/*
* __kfifo_get - gets some data from the FIFO, no locking version
* @fifo: the fifo to be used.
* @buffer: where the data must be copied.
* @len: the size of the destination buffer.
*
* This function copies at most 'len' bytes from the FIFO into the
* 'buffer' and returns the number of copied bytes.
*
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int kfifo_get(kfifo *fifo, unsigned char *buffer, unsigned int len)
{
unsigned int l = 0;
len = MIN(len, fifo->in - fifo->out);
/* first get the data from fifo->out until the end of the buffer*/
l = MIN(len, fifo->size - (fifo->out & (fifo->size - 1)));
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
/* then get the rest (if any) from the beginning of the buffer*/
memcpy(buffer + l, fifo->buffer, len - l);
fifo->out += len;
return len;
}
unsigned int can_fifo_get(kfifo *fifo, unsigned char *buffer, unsigned int len)
{
//__disable_irq();
len = kfifo_get(fifo, buffer, len);
//__enable_irq();
return len;
}
/*
* __kfifo_reset - removes the entire FIFO contents, no locking version
* @fifo: the fifo to be emptied.
*/
void kfifo_reset(kfifo *fifo)
{
fifo->in = fifo->out = 0;
}
/*
* __kfifo_len - returns the number of bytes available in the FIFO, no locking version
* @fifo: the fifo to be used.
*/
unsigned int kfifo_len(kfifo *fifo)
{
return fifo->in - fifo->out;
}
因?yàn)槭菃尉程讀寫,所以沒(méi)有進(jìn)行加鎖操作。
回到問(wèn)題背景所說(shuō)的,之前用的時(shí)候一直沒(méi)問(wèn)題,現(xiàn)在測(cè)試100%把CPU干冒煙,為何會(huì)有如此大的差別?
回顧了下測(cè)試環(huán)境,“跑路的工程師”調(diào)試的時(shí)候使用的上位機(jī)是自定義的數(shù)據(jù)發(fā)送間隔時(shí)間,對(duì)與kfifo來(lái)說(shuō),發(fā)個(gè)間隔時(shí)間較大,kfifo進(jìn)出次數(shù)不會(huì)差太多,kfifo是“反應(yīng)的過(guò)來(lái)”的,實(shí)際使用過(guò)的時(shí)候,上位機(jī)替換成了APP,數(shù)據(jù)發(fā)送間隔更大,所以沒(méi)有出問(wèn)題。
現(xiàn)在,“準(zhǔn)備刪庫(kù)跑路的工程師”,用的是固定間隔的上位機(jī),而且數(shù)據(jù)發(fā)送頻率很高,接收使用的CAN中斷接收,發(fā)送的時(shí)候是使用的任務(wù)輪詢,任務(wù)運(yùn)行周期是100ms,創(chuàng)建的buffer緩沖為2048字節(jié),發(fā)送一會(huì)兒就會(huì)死機(jī),100%復(fù)現(xiàn),增大buffer,不過(guò)是死的時(shí)間晚了點(diǎn),雖遲但到~~~
void can_nbbus_entry(void *parameter)
{
nbbus_can_init();
while (1)
{
nbbus_poll(&nbbus_can, NB_BUS_BATDIAG_ID);
rt_thread_mdelay(100);
}
}
于是對(duì)數(shù)據(jù)幀進(jìn)行了分析,can接收到的原始數(shù)據(jù)幀是沒(méi)問(wèn)題的,前面沒(méi)出問(wèn)題的數(shù)據(jù)一直沒(méi)問(wèn)題,MCU崩潰的時(shí)候,CAN接收到的原始數(shù)據(jù)依然是沒(méi)問(wèn)題的,原始數(shù)據(jù)既然沒(méi)問(wèn)題,問(wèn)題應(yīng)該出在后面解析轉(zhuǎn)發(fā)流程了,分析最終定位在數(shù)據(jù)幀出現(xiàn)了錯(cuò)位,CAN數(shù)據(jù)幀的數(shù)據(jù)出現(xiàn)了移位,導(dǎo)致can數(shù)據(jù)長(zhǎng)度字節(jié)取了一個(gè)比較大的數(shù),數(shù)據(jù)拷貝的時(shí)候出現(xiàn)了越界,擦出了內(nèi)存中的一些數(shù)據(jù),進(jìn)入了hardfault
那么為什么CAN數(shù)據(jù)幀出現(xiàn)了錯(cuò)位呢,并且似乎出現(xiàn)在某一固定時(shí)刻?
can數(shù)據(jù)接收數(shù)據(jù)如下,can的結(jié)構(gòu)體有36字節(jié),每次kfifo_put傳入的len為36字節(jié),剩余空間與36字節(jié)進(jìn)行比較
typedef struct
{
uint32_t StdId;
uint32_t ExtId;
uint32_t IDE;
uint32_t RTR;
uint32_t DLC;
uint8_t Data[8];
uint32_t Timestamp;
uint32_t FilterMatchIndex;
} MyCanRxMsgTypeDef;
void HAL_CAN_RxFifo1MsgPendingCallback(CAN_HandleTypeDef *hcan)
{
HAL_StatusTypeDef HAL_RetVal;
if (hcan == &hcan1)
{
HAL_RetVal = HAL_CAN_GetRxMessage(hcan, CAN_RX_FIFO1, (CAN_RxHeaderTypeDef *)&gCanRxMsg11, gCanRxMsg11.Data);
if ( HAL_OK == HAL_RetVal)
{
kfifo_put(&gFifoReg1, (uint8_t *)&gCanRxMsg11, sizeof(gCanRxMsg11));
__HAL_CAN_ENABLE_IT (hcan, CAN_IT_RX_FIFO1_MSG_PENDING);
}
}
}
接下來(lái)著重分析下數(shù)據(jù)是如何產(chǎn)生錯(cuò)亂的,前面說(shuō)到定義了2048字節(jié)的buffer緩存空間,對(duì)存儲(chǔ)空間結(jié)構(gòu)分組如下: |
|