Liping Zou bio photo

Liping Zou

An Android Developer

Email Twitter Instagram Github Stackoverflow

第四次实验是一道坎啊,话说当年差点没做出来,各种纠结的问题都出现了。这一次的实验涉及到前几次实验的小综合,所以要求有点高。建议一定要认真仔细多阅读几遍指导书。

实验的内容简单来说就是用信号量来实现生产者和消费者问题。

建立一个生产者进程,N个消费者进程(N>1);

用文件建立一个共享缓冲区;

生产者进程依次向缓冲区写入整数0,1,2,…,M,M>=500;

消费者进程从缓冲区读数,每次读一个,并将读出的数字从缓冲区删除,然后将本进程ID和数字输出到标准输出;

缓冲区同时最多只能保存10个数。

完成上述的五个步骤,最好是将pc.c在Ubuntu下测试成功后,在改成适合0.11的版本。记得在Ubuntu下编译的时候一定不要忘了加上-lpthread,在编译的时候要连接上Thread库才可以使用,在0.11下则不需要。在Ubuntu用gcc -o pc pc.c -lpthread -Wall就可以了。

实验的步骤如下:

(1)在ubuntu下,用系统提供的sem_open()、sem_close()、sem_wait()和sem_post()等信号量相关的系统调用编写pc.c程序。

(2)在ubuntu上编译并运行pc.c,检查运行结果。

后面为linux0.11编写信号量相关的系统调用:

(3)参考高级版本的linux内核,实现linux0.11下的山寨版信号量(4个系统调用):

sem_t *sem_open(const char *name, unsigned int value);

int sem_wait(sem_t *sem);

int sem_post(sem_t *sem);

int sem_unlink(const char *name);

(4)、参照实验2,修改相关文件和添加这些系统调用相关的代码

(5)、编译linux0.11内核

(6)、运行虚拟机

(7)、将pc.c复制到虚拟机,并编译运行,查看运行结果。

接下来,我们先来点预备知识。

生产者消费者问题的模型

1
2
3
4
5
6
7
8
9
Producer()
{
    生产一个产品item;
    P(Empty);  //空闲缓存资源
    P(Mutex);  //互斥信号量
    item放到空闲缓存中;
    V(Mutex);
    V(Full);  //产品资源
}
1
2
3
4
5
6
7
8
9
Consumer()
{
    P(Full);  
    P(Mutex);  
    从缓存区取出一个赋值给item;
    V(Mutex);
    V(Empty);
    消费产品item;
} 

其中PV操作的含义如下:

执行P操作P(S)时信号量S的值减1,若结果不为负则P(S)执行完毕,否则执行P操作的进程暂停以等待释放。

执行V操作V(S)时,S的值加1,若结果不大于0则释放一个因执行P(S)而等待的进程。

生产者消费者问题中需要用到三个信号量,其中一个互斥信号量,两个同步信号量。在进行互斥操作的时候一定要注意,P、V操作是成对出现的,先进行P操作,进入临界区,访问临界资源,在进行V操作,出临界区。互斥信号量的初值一般是1.

进行同步操作的时候,要分析清进程间的制约关系。同一信号量的P、V操作也要成对出现,但是其P、V操作分别在不同的进程中。同步信号量的初值一般与资源的数目有关。

信号量值的含义:大于零时,表示可用的临界资源数目;等于零时,表示资源正好用完了;小于零时,表示系统中因请求该资源而被阻塞的进程数目。

在有了上述的预备知识之后,我们可以开始这个实验了。

第一步,首先编写pc.c。pc.c要完成的功能在文章开始的时候就已经说明了。其中要注意的一点是,我对于缓冲区的处理是一次性将缓冲区中的十个数都读出来,然后消费一个,再将其余的九个数写回文件。感觉这种方法要简单一点,至少不需要用到那些文件指针。下面是pc.c文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#define __LIBRARY__
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

#define PMAX 500
#define NUMBER 10
#define CMAX 100

_syscall2(int,sem_open,const char*, name,unsigned int,value);
_syscall1(int,sem_wait,sem_t*,sem);
_syscall1(int,sem_post,sem_t*,sem);
_syscall1(int,sem_unlink,sem_t*,sem);

    static int produce = 0;
    static int consume = 0;

int main(int argc,char* argv[])
{
    sem_t *Mutex;
    sem_t *number;
    sem_t *location;
    FILE *f;
    int i,j,k,m;
    int x,y;
    int read=-1;
    int b[10];
    pid_t pid[5];

    Mutex = (sem_t*)sem_open("Mutex",1);
    number = (sem_t*)sem_open("number",0);
    location = (sem_t*)sem_open("location",10);

    for(i=0; i<5; i++)
    {
        pid[i] = fork();
        if(pid[i]<0)
        {
            printf("error in fork!\n");
            exit(0);
        }
        else if(pid[i]==0)
        {
            printf("process%d\n",getpid());
            for(k=0; k<CMAX; k++)
            {
                sem_wait(number);
                sem_wait(Mutex);
                f = fopen("test.txt","rb");
                for(m=0; m<NUMBER; m++)
                {
                    read =-1;
                    fread(&read,sizeof(int),1,f);
                    b[m]=read;
                }
                fclose(f);
                printf("%d: %d\n",getpid(),b[0]);
                fflush(stdout);
                f = fopen("test.txt","wb");
                for(m=1; m<NUMBER; m++)
                {
                    if(b[m] >= 0)
                        fwrite(&(b[m]),sizeof(int),1,f);
                }
                consume++;
                fflush(f);
                /*for(x=0;x<1000;x++)
                    {
                        for(y=0;y<1000;y++)
                        ;
                    }*/
                fclose(f);
                sem_post(Mutex);
                sem_post(location);
            }
            exit(0);

        }
    }

    printf("father process\n");
    for(j=0; j<PMAX; j++)
    {
        sem_wait(location);
        sem_wait(Mutex);
        f= fopen("test.txt","ab+");

        read++;
        fwrite(&read,sizeof(int),1,f);
        fflush(f);
        produce++;
        /*for(x=0;x<1000;x++)
            {
                for(y=0;y<1000;y++)
                ;
            }*/
        fclose(f);
        sem_post(Mutex);
        sem_post(number);
    }
    /*for(x=0;x<1000;x++)
      {
          for(y=0;y<1000;y++)
          ;
      }*/
    wait(NULL);
    wait(NULL);
    wait(NULL);
    wait(NULL);
    wait(NULL);
    sem_unlink(Mutex);
    sem_unlink(number);
    sem_unlink(location);


    return 0;
}

将pc.c完成,只是完成了这个实验很小的一部分。较难的部分是sem.c的编写。

sem_t是信号量类型,需要自定义。下面是unistd.h文件中对于信号量类型的定义。在实现信号量的时候需要用到一个队列,用来保存当前等待的进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct queue
{
    int front;
    int rear;
    struct task_struct *task[30];
};
typedef struct queue queue;
   
struct semaphore
{
    int value;
    int flag;
    int lock;
    /*char *name;*/
    queue wait;
};
typedef struct semaphore sem_t;

接着是sem.c。这部分的难度较大,仅供参考。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#include <errno.h>
#include <linux/kernel.h>
#include <asm/segment.h>
#include <asm/system.h>
#include <linux/sched.h>
#include <unistd.h>
/*#include <signal.h>*/
#define MAX 30

char names[MAX][MAX] = {'\0'};
sem_t arr[30]= {NULL};

void MakeNull(queue *q)
{
    q->front = 0;
    q->rear = MAX - 1;
}

int add(int x)
{
    int y;
    y = (x+1)%MAX;
    return y;
}

void EnQueue(struct task_struct *curr, queue *q)
{
    if(add(add(q->rear))==q->front)
        printk("queue is full");
    else
    {
        q->rear = add(q->rear);
        q->task[q->rear]=curr;
    }
}

void DeQueue(queue *q)
{
    if(add(q->rear)==q->front)
    {
        printk("queue is empty");
    }
    else
        q->front = add(q->front);
}

struct task_struct *Front( queue* q )
{
    if(add(q->rear)==q->front)
        return NULL;
    else
        return (q->task[q->front]);
}

int sys_sem_open(const char *name, unsigned int value)
{
    char n;
    int i = 0;
    int tag;
    char MyName[MAX];

    while ((n = get_fs_byte(&name [i])) != '\0')
    {
        MyName[i] = n;
        i++;
    }
    MyName[i] = '\0';

    for (i = 0; i < MAX; i++)
    {
        tag = strcmp(MyName,names[i]);
        if (tag==0)
            return &arr[i];
    }

    for (i = 0; i < MAX; i++)
    {
        if (arr[i].flag == 0)
        {
            strcpy(names[i],MyName);
            arr[i].flag = 1;
            arr[i].value = value;
            /*printk("%d",value);*/
            MakeNull( &(arr[i].wait));
            return &arr[i];
        }
    }
    return NULL;
}

int sys_sem_wait(sem_t *sem)
{
    int value;
    cli();
    (sem->value) = (sem->value)-1;
    value = sem->value;
    if (value < 0)
    {
        EnQueue(current, &(sem->wait));
        /*sleep();*/
        current->state = TASK_UNINTERRUPTIBLE;
        schedule();
    }
    sti();
    return 0;
}

int sys_sem_post(sem_t *sem)
{
    struct task_struct* tmp;
    int value;
    cli();
    (sem->value) = (sem->value)+1;
    value = sem->value;
    /*printk("post  %d\t",sem->value);*/
    if (value <= 0)
    {
        tmp = Front(&(sem->wait));
        /*wake(tmp);*/
         if (tmp != NULL)
        (*tmp).state = TASK_RUNNING;
        DeQueue(&(sem->wait));
    }

    sti();
    return 0;
}
int sys_sem_unlink(const char *name)
{
    char n;
    int i=0,j=0,k=0;
    char aName[MAX];
    int tag;
    while ((n = get_fs_byte(name + i)) != '\0')
    {
        aName[i] = n;
        i++;
    }
    aName[i] = '\0';
    for(j=0; j<MAX; j++)
    {
        tag = strcmp(names[i],aName);
        if (tag==0)
        {
            arr[i].flag = 0;
            arr[i].value = 0;
            for(k=0;k<MAX;k++)
            {
               names[i][k] = '\0';
            }
            return 0;
        }
    }
    return -1;
}

剩下的工作就是增加系统调用在第二次实验中已经做过了,可以参考系统调用这篇文章。

这次实验可以说的就这么多了,还需要大家耐心研究研究sem.c的实现那块。

偶然发现了实验报告(仅供参考啊):

1.在pc.c中去掉所有与信号量有关的代码,再运行程序,执行效果有变化吗?为什么会这样?

答:有,消费者消费的数据将不在按照递增顺序输出,并且会出现read error。因为没有了p(empty)操作,所以当生产者已经写满 了缓冲区仍然要进入生产时,就会覆盖掉还没有被消费者取走的数据,这样当消费者进入缓冲区取出数据时,就不是正常的递增顺序了,假设生产者在缓冲区中写了400,401,402,403,404,405,406,407,408,409共10个数据,那么还继续写,就会把400变为410,这样当消费者进入取数时,就会出现399,410,401,这样的输出顺序,同样的,因为没有了p(full)操作,当缓冲区的数据都被取走了,而消费者依然进入取数,读出的数据就是无效的。而没有p(mutex)v(mutex)这个加锁和解锁的操作,使得临界区代码可能出现多进程并发访问的情况,因此会有read error的情况出现。

2.实验的设计者在第一次编写生产者——消费者程序的时候,是这么做的:

1
2
3
4
5
6
7
8
9
Producer()
{
    P(Mutex);  //互斥信号量
    生产一个产品item;
    P(Empty);  //空闲缓存资源
    item放到空闲缓存中;
    V(Full);  //产品资源
    V(Mutex);
}
1
2
3
4
5
6
7
8
9
Consumer()
{
    P(Mutex);  
    P(Full);  
    从缓存区取出一个赋值给item;
    V(Empty);
    消费产品item;
    V(Mutex);
} 

这样可行吗?如果可行,那么它和标准解法在执行效果上会有什么不同?如果不可行,那么它有什么问题使它不可行?

答:不可行。会有死锁情况的发生,当mutex = 1,并且生产者要进入生产一个产品,假设此时empty.value为0,那么会有mutex.value = 0,p(empty)后得到的value小于0,生产者进程进入等待在信号量empty的等待队列上面,并调用schedule(),可是此时并未解锁,即mutex.value值仍然为0。它们都等待在信号量mutex上面。同理,消费者进程也是如此,若mutex.value = 1,full.value = 0,在执行完p(mutex)p(full)之后,mutex.value = 0,并且将消费者进程放入等待在信号量full的等待队列上面,而此时也并未释放mutex,因此消费者和生产者进程都等待在mutex信号量上面。