【C】高并发内存池设计

2

高并发内存池设计

高并发下传统方式的弊端

在传统C语言中,我们使用malloc、calloc、realloc、free来进行内存的申请分配与释放,函数原型如下。C++中则是new、delete。

  • void *malloc(size_t size);

    • malloc在内存的动态存储区中分配了一块长度为size字节的连续区域返回该区域的首地址。
  • void *calloc(size_t nmemb, size_t size);

    • 与malloc相似,参数size为申请地址的单位元素长度,nmemb为元素个数,即在内存中申请,nmemb*size个字节大小的连续地址空间,内存会初始化为0。
  • void realloc(void ptr, size_t size);

    • 给一个已经分配了地址的指针重新分配空间,参数ptr为原有的空间地址,size是重新申请的地址长度,若为NULL,它就等同于malloc。
  • void free(void *ptr);


弊端

  • 弊端1:高并发时较小内存块的使用,导致系统调用频繁,降低了系统的执行效率。

image-20220825160713001


  • 弊端2: 频繁使用时增加了系统内存的碎片,降低内存使用效率。

  • 内存碎片-已经被分配出去(能明确指出属于哪个进程)却不能被利用的内存空间。

    • 产生根源:
    1. 内存分配必须起始于可被4、8、16整除(视处理器体系结构而定)的地址。
    2. MMU的分页机制的限制,操作系统按页给空间。

    image-20220825160739926


  • 弊端3:没有垃圾回收机制,容易造成内存泄漏,导致内存枯竭。

  • 示例1:malloc后没有free

  • void log_error(char *reason) { 
      char *p1; 
      p1 = malloc(100); 
      sprintf(p1,"The f1 error occurred because of '%s'.", reason); 
      log(p1); 
    }
  • 示例2:打开文件同样需要申请内存资源,fopen后没有fclose。

  • int getkey(char *filename) { 
      FILE *fp; 
      int key; 
      fp = fopen(filename, "r");
      fscanf(fp, "%d", &key); 
      //fclose(fp);
      return key; 
    }
    

  • 弊端4:内存分配与释放的逻辑在程序中相隔较远时,从而降低了程序的稳定性。

例如:由于相隔久远,误将非malloc的变量进行free。


解决弊端

系统层——内存管理组件的选择

  • 解决方案:使用高性能内存管理组件Tcmalloc、Jemalloc,替换Glibc Ptmalloc,以解决优化效率和碎片问题。

  • 内存管理组件的选择:

image-20220825161118003


应用层——内存池技术

什么是内存池技术?

  • 就是真正使用内存之前,先申请分配一定数量的、大小相等(一般情况下)的内存块留作备用。当有新的内存需求时,就从内存池中分出一部分内存块,若剩余的内存块不够了,就再继续申请新的内存, 统一对程序所使用的内存进行分配和回收。

  • 这样做的一个显著优点是,使得内存分配的效率得到很大提升。


  • 个人理解:什么是内存池 & 内存池的用处。
    • 减少频繁的系统调用以减少时间开销,一次性申请一块大内存,然后给需要的程序进程分配,不够了就再要。

image-20220825161808834


内存池如何解决弊端?

  • 高并发时系统调用频繁,降低了系统的执行效率。
    • 内存池提前预先分配大块内存,统一释放,极大的减少了malloc和free等函数的调用。
  • 频繁使用时增加了系统内存的碎片,降低了内存使用效率。
    • 内存池每次请求分配大小适度的内存块,避免了碎片的产生。
  • 没有垃圾回收机制,容易造成内存泄漏。
    • 在声明周期结束后统一释放内存,完全避免了内存泄漏的产生。
  • 内存分配与释放的逻辑在程序中相隔较远时,降低了程序的稳定性。
    • 在声明周期结束后统一释内存,避免重复释放指针或释放空指针等情况。

高并发时内存池如何实现?

  • 高并发——是互联网分布式系统架构设计中必须考虑的因素之一,它通常是指,通过设计保证系统能够同时并行处理很多请求。
  • 高并发的特点:
    • 响应时间短
    • 吞吐量大
    • 每秒响应请求数QPS
    • 并发用户数高
  • 内存池设计考虑
    • 设计逻辑应该尽量简单,避免不同请求之间相互影响,尽量降低不同模块之间的耦合。
    • 内存池生存时间应该尽可能短,与请求或者连接具有相同的周期,减少碎片堆积和内存泄漏。

实现思路——分而治之

  • 对于每个请求或者连接都会建立相应相应的内存池,建立好内存池之后,我们可以直接从内存池中申请所需要的内存,不用去管内存的释放,当内存池使用完成之后一次性销毁内存池。
  • 区分大小内存块的申请和释放,大于池尺寸的定义为大内存块,使用单独的大内存块链表保存,即时分配和释放;小于等于池尺寸的定义为小内存块,直接从预先配的内存块中提取,不够就扩充池中的内存,在生命周期内对小块内存不做释放,直到最后统一销毁。

Nginx内存池结构设计

  • 主要结构图:

image-20220826154546795

image-20220826162649720

  • ngx_pool_t(内存池头结点)结构示意图,图中没有示意出large后面链接的大内存块。

image-20220826162845945


部分源码解析

  • 注意:部分Nginx源代码可能有所删减,这里仅分析有关内存池的部分内容。

  • 下面文字中,小内存即一般内存。

  • nginx内存池流程,首先创建一个头结点,头结点不同于其它普通内存块,除了待分配内存区,还存储着当前内存池的其它相关信息。后续内存不够需要扩充,将扩充出来的内存块链接到内存池头结点后,扩充内块分为大内存块和一般内存块,分别链接到不同的位置,大内存链表和小内存链表。由申请内存时不够自动个扩充。

  • 申请内存时,首先判断申请的大小,如果超过当前内存池设定的最大申请大小,则进行大内存申请,反之则进行小内存申请。

    • 申请大内存时,首先申请好所需要的空间,然后在内存池头结点的大内存节点中,寻找有没有可重用的结点,如果有就将刚创建好的内存链接上去。反之创一个新节点,将创建好的内存链接上去,插入到大内存链表中。
    • 申请小内存时,首先在头节点后链接的其它小内存块中寻找,是否有有可用空间的小内存块。有则使用,无则重新申请,同样链接到内存池头节点的小内存块链表中。
  • 重置内存池,将内存池的大块内存释放,然后重置每个小块内存。

主要数据结构:

  • 内存池头的结构体:ngx_pool_s
struct ngx_pool_s {
    ngx_pool_data_t       d;        // 内存池当前的数据区指针的结构体
    size_t                max;      // 当前数据块最大可分配的内存大小(Bytes)
    ngx_pool_t           *current;  // 当前正在使用的数据块的指针
    ngx_pool_large_t     *large;    // pool 中指向大数据块的指针(大数据快是指 size > max 的数据块)
};
//别名
typedef struct ngx_pool_s            ngx_pool_t;
  • 小内存块的结构体: ngx_pool_data_t
typedef struct {
    u_char               *last;     // 保存当前数据块中内存分配指针的当前位置。每次Nginx程序从内存池中申请内存时,从该指针保存的位置开始划分出请求的内存大小,并更新该指针到新的位置。
    u_char               *end;      // 保存内存块的结束位置
    ngx_pool_t           *next;     // 内存池由多块内存块组成,指向下一个数据块的位置。
    ngx_uint_t            failed;   // 当前数据块内存不足引起分配失败的次数
} ngx_pool_data_t;
  • 大内存块的结构体:ngx_pool_large_s
struct ngx_pool_large_s {
    ngx_pool_large_t     *next;     // 指向下一块大内存块的指针
    void                 *alloc;    // 大内存块的起始地址
};
//别名
typedef struct ngx_pool_large_s  ngx_pool_large_t;
  • 创建内存池:ngx_create_pool
ngx_pool_t *
ngx_create_pool(size_t size)
{
    ngx_pool_t  *p;

    //开辟空间
    p = ngx_memalign(NGX_POOL_ALIGNMENT, size);
    if (p == NULL) {
        return NULL;
    }

    p->d.last = (u_char *) p + sizeof(ngx_pool_t);//指向实际要分配内存的位置,也就是跳过了前面那些
    p->d.end = (u_char *) p + size;//指向末端

    p->d.next = NULL;
    p->d.failed = 0;

    size = size - sizeof(ngx_pool_t);
    p->max = (size < NGX_MAX_ALLOC_FROM_POOL) ? size : NGX_MAX_ALLOC_FROM_POOL;//检测剩余空间

    p->current = p;//正在工作(分配)的内存块

    p->large = NULL;

    return p;
}
  • 其中负责开辟空间的函数ngx_memalign,其实就是将malloc进行了简单的封装。这里为了省事,将Nginx源代码简化。
//将Nginx源代码简化
#define ngx_memalign(alignment, size)  ngx_alloc(size)

//开辟空间,将malloc进行简单的封装。
void *
ngx_alloc(size_t size)
{
    void  *p;

    p = malloc(size);
    if (p == NULL) {
        fprintf(stderr,"malloc(%zu) failed", size);
    }

    if(debug) fprintf(stderr, "malloc: %p:%zu", p, size);

    return p;
}
  • 从内存池中申请空间:ngx_palloc
void *
ngx_palloc(ngx_pool_t *pool, size_t size)
{
#if !(NGX_DEBUG_PALLOC)
    if (size <= pool->max) {
        return ngx_palloc_small(pool, size, 1);//如果要分配小内存块-从池子中取
    }
#endif
    return ngx_palloc_large(pool, size);//如果要分配大内存块-扩展
}
  • 申请小内存空间:ngx_palloc_small
//从内存池头结点中所挂的一块一块普通内存块中搜索看是否有容量够的。
static inline void *
ngx_palloc_small(ngx_pool_t *pool, size_t size, ngx_uint_t align){
    u_char      *m;
    ngx_pool_t  *p;
    p = pool->current;
    do {
        m = p->d.last;
        if (align){//是否边界对齐
            //对齐到为是对齐数的倍数
            m = ngx_align_ptr(m, NGX_ALIGNMENT);
        }
        //剩余内存是否够用
        if ((size_t) (p->d.end - m) >= size) {
            p->d.last = m + size;
            return m;
        }    
        p = p->d.next;//切换到下一块进行剩余容量判断
    } while (p);
    //实在不行了,重新创建内存块。
    return ngx_palloc_block(pool, size);
}
  • 创建普通内存块:ngx_palloc_block
static void *
ngx_palloc_block(ngx_pool_t *pool, size_t size){
    u_char      *m;
    size_t       psize;
    ngx_pool_t  *p, *new;

    psize = (size_t) (pool->d.end - (u_char *) pool);

    m = ngx_memalign(NGX_POOL_ALIGNMENT, psize);
    if (m == NULL) {
        return NULL;
    }

    new = (ngx_pool_t *) m;

    new->d.end = m + psize;
    new->d.next = NULL;
    new->d.failed = 0;

    m += sizeof(ngx_pool_data_t);
    m = ngx_align_ptr(m, NGX_ALIGNMENT);
    new->d.last = m + size;

    //将内存池中链接的所有结点都遍历一遍,将它们的尝试次数+1,大于4的直接淘汰。
    for (p = pool->current; p->d.next; p = p->d.next) {
        if (p->d.failed++ > 4) {
            pool->current = p->d.next;// >4淘汰掉
        }
    }
    p->d.next = new;
    return m;
}
  • 申请大空间(大内存块):ngx_palloc_large
static void *
ngx_palloc_large(ngx_pool_t *pool, size_t size){
    void              *p;
    ngx_uint_t         n;
    ngx_pool_large_t  *large;

    p = ngx_alloc(size);//申请一块新的够用大空间
    if (p == NULL) {
        return NULL;
    }

    n = 0;
    //先找有没有之前不用的闲置大内存。
    for (large = pool->large; large; large = large->next) {//遍历池子的大内存链表
        if (large->alloc == NULL) {
            large->alloc = p;
            return p;
        }

        if (n++ > 3) {//就尝试找4次
            break;
        }
    }

    //创建一个大内存结点
    large = ngx_palloc_small(pool, sizeof(ngx_pool_large_t), 1);//第一次肯定会直接执行这个
    if (large == NULL) {
        ngx_free(p);
        return NULL;
    }

    //类似链表头插法
    large->alloc = p;
    large->next = pool->large;
    pool->large = large;

    return p;
}
  • 销毁内存池:ngx_destroy_pool
//销毁内存池
void
ngx_destroy_pool(ngx_pool_t *pool)
{
    ngx_pool_t          *p, *n;
    ngx_pool_large_t    *l;

#if (NGX_DEBUG)
    for (l = pool->large; l; l = l->next) {
        fprintf(stderr,"free: %p", l->alloc);
    }
    for (p = pool, n = pool->d.next; /* void */; p = n, n = n->d.next) {
        fprintf(stderr,"free: %p, unused: %zu", p, p->d.end - p->d.last);
        if (n == NULL) {
            break;
        }
    }
#endif
    //真正的销毁处理流程
    for (l = pool->large; l; l = l->next) {//处理大块
        if (l->alloc) {
            ngx_free(l->alloc);
        }
    }

    //小块
    for (p = pool, n = pool->d.next; /* void */; p = n, n = n->d.next) {
        ngx_free(p);
        if (n == NULL) {
            break;
        }
    }
}
  • 清理大内存块:ngx_pfree
ngx_int_t
ngx_pfree(ngx_pool_t *pool, void *p)
{
    ngx_pool_large_t  *l;
    //遍历寻找目标-大内存块p
    for (l = pool->large; l; l = l->next) {
        if (p == l->alloc) {
            fprintf(stderr,"free: %p", l->alloc);
            ngx_free(l->alloc);
            l->alloc = NULL;

            return NGX_OK;
        }
    }

    return NGX_DECLINED;
}
  • 清理内存块:ngx_free
//ngx_free 为free的别名
#define ngx_free         free
  • 重置内存池:ngx_reset_pool
void
ngx_reset_pool(ngx_pool_t *pool)
{
    ngx_pool_t        *p;
    ngx_pool_large_t  *l;

    //释放大块
    for (l = pool->large; l; l = l->next) {
        if (l->alloc) {
            ngx_free(l->alloc);
        }
    }

    //重置每个小块
    for (p = pool; p; p = p->d.next) {
        p->d.last = (u_char *) p + sizeof(ngx_pool_t);
        p->d.failed = 0;
    }

    pool->current = pool;
    //pool->chain = NULL;
    pool->large = NULL;
}

示例:

#include "mem_core.h"

#define BLOCK_SIZE  16   //每次分配内存块大小

#define MEM_POOL_SIZE (1024 * 4) //内存池每块大小

int main(int argc, char **argv){
    int i = 0, k = 0;
    int use_free = 0;

    ngx_pagesize = getpagesize();

    if(argc >= 2){//不使用线程池
        use_free = 1;
        printf("use malloc/free\n");      
    } else {//使用线程池
        printf("use mempool.\n");
    }

    if(!use_free){
        char * ptr = NULL;

        for(k = 0; k< 1024 * 500; k++){
            ngx_pool_t * mem_pool = ngx_create_pool(MEM_POOL_SIZE);//创建

            for(i = 0; i < 1024 ; i++){
                ptr = ngx_palloc(mem_pool,BLOCK_SIZE);//申请

                if(!ptr) fprintf(stderr,"ngx_palloc failed. \n");
                else {//模拟使用
                    *ptr = '\0';
                    *(ptr + BLOCK_SIZE -1) = '\0';
                }
            }
            ngx_destroy_pool(mem_pool);
        }
    } else {
        char * ptr[1024];
        for(k = 0; k< 1024 * 500; k++){
            for(i = 0; i < 1024 ; i++){
                ptr[i] = malloc(BLOCK_SIZE);
                if(!ptr[i]) fprintf(stderr,"malloc failed. reason:%s\n",strerror(errno));
                else{
                    *ptr[i] = '\0';
                    *(ptr[i] +  BLOCK_SIZE - 1) = '\0';
                }
            }
            for(i = 0; i < 1024 ; i++){
                if(ptr[i]) free(ptr[i]);
            }
        }

    }
    return 0;
}

对比:

image-20220826183834952


补充

  • 在C语言中void类型的指针可以直接赋值给其它指针类型, 但是在C++中不行,必须将void*强制转换为对应类型再赋值。

    • 例如:malloc默认返回void*,这允许用于分配任何类型。
    • C语言中的malloc函数的返回值就是一个void *型指针,我们可以把它直接赋给一个其他类型的指针,但从安全的编程风格角度以及兼容性上讲,最好还是将返回的指针强制转换为所需的类型。
  • 参考文章:

  • 在C语言风格的字符串中,手动添加一个'\0',用printf打印输出时会截至到第一个\0,也就是遇到\0停止,但是实际的大小并不会改变。如下代码所示:

#include <stdio.h>

int main(void){
    char c[] = "123";
    printf("%d\n",sizeof(c));//4
    c[1] = '\0';
    printf("%d\n",sizeof(c));//4
    printf("%s\n",c);//1
    return 0;
}