百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程网 > 正文

通过生产者消费者问题学习多线程(生产者消费者进程)

yuyutoo 2025-04-07 20:59 11 浏览 0 评论

生产者消费者问题,也称有限缓冲问题,是多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。



要理解生产消费者问题,首先应弄清PV操作与信号量。信号量的值与相应资源的使用情况有关。当它的值大于0时,表示当前可用资源的数量;当它的值小于0时,其绝对值表示等待使用该资源的线程个数,信号量的值仅能由PV操作来改变。一般来说,信号量S >= 0时,S表示可用资源的数量。执行一次P操作意味着请求分配一个单位资源,因此S的值减1;当S < 0时,表示已经没有可用资源,请求者必须等待别的进程释放该类资源,它才能运行下去。而执行一个V操作意味着释放一个单位资源,因此S的值加1;若S < 0,表示有某些进程正在等待该资源,因此要唤醒一个等待状态的进程,使之运行下去。

正确的实现生产者消费者问题,一般需要三个信号量:

  • empty,表示缓冲区中的空闲空间,即还可以容纳多少元素,初始值为缓冲区大小。
  • full,表示缓冲区中已经有多少元素,初始值为0。
  • mutex,互斥信号量,初始值为1。对缓冲区的访问是排它的,即某一时刻只能有一个生产者向缓冲区里放元素,或一个消费者从中取元素。

生产者

消费者

P(empty)
P(mutex)
向缓冲区增加元素
V(mutex)
V(full)

P(empty)
P(mutex)
向缓冲区增加元素
V(mutex)
V(full)

使用内置锁

生产者、消费者的问题,其实就是线程间同步的问题,在java中,是通过synchronizedwaitnotifyAll来实现的。

public class Buffer {
    private List list = new LinkedList();
    private int size;

    public Buffer(int size) {
        this.size = size;
    }

    public void add(V v) throws InterruptedException {
        synchronized(this) {
            while (list.size() == size) {
                this.wait();
            }


            list.add(v);
            this.notifyAll();
        }
    }

    public V take() throws InterruptedException {
        synchronized(this) {
            while (list.size() == 0) {
                this.wait();
            }

            V v = list.get(0);
            list.remove(v);
            this.notifyAll();

            return v;
        }
    }
}

add方法中对list.size() == size的判断相当于P(empty)操作,当缓冲区中没有空闲位置的时候通过调用wait方法强制阻塞当前线程同时释放this对象的锁,使得别的线程(消费者)有机会获得锁去取走元素。

list.size() == size的判断放进while中判断而不是if来判断是因为可能有多个生产者,当它们被唤醒后只有其中一个能真正向缓冲区增加元素,其它生产者会因为竞争失败再次处于阻塞状态;如果只有一个生产者线程,是可以使用if替换while的。

当生产者向缓冲区中增加元素后,通过this.notifyAll()方法通知消者费线程到缓冲区中去取元素。这里需要特别说明,在this对象上等待的线程,即可能是等待从缓冲区取的消费者线程,也可能是等待向缓冲区放的生产者。当调用this.notifyAll方法时,所有这些等待的线程都会被唤醒,某种程度上来讲,这种性能的损耗很大,第一:唤醒生产者线程完全没有意义;第二:被唤醒的消费者线程中只有一个能真正得到执行。另外,不难看出如果有多个生产者与消费者线程的情况下,使用notify()方法代替notifyAll()方法有可能发生活锁的情况。

条件通知


调用notifyAll()方法的代价很高,其实我们是可以减少它的调用次数的。想想add方法中的notifyAll()方法的本意是想通知阻塞的消费者线程去缓冲区取元素,如果在生产者向缓冲区里增加元素时,缓冲区并不是空的,意味着此刻并不会有消费者因为缓冲区为空而等待,也就无需进行通知了。

采用条件通知的方式重新实现的add()方法如下。

public void add(V v) throws InterruptedException {
    synchronized(this) {
        int currentSize = list.size();
        while (currentSize == size) {
            this.wait();
        }

        list.add(v);

        if (0 == currentSize) {
            this.notifyAll();
        }
    }
}

使用显示锁

使用内置锁有一个明显的缺陷,在像生产者、消费者这类问题上,多个线程可能在同一个条件队列上等待不同的条件谓词,如生产者向缓冲区增加元素后调用notifyAll()方法时,其它的生产者线程也会被唤醒。

另一方面,诸多被唤醒的消费者线程中也只有一个能真正得到执行。使用Lock与Condition可以使得等待特定条件谓词的线程处在同一个条件队列,从根本上解决了内置锁的缺陷。

使用显示锁重新实现的Buffer如下:

public class Buffer {
    private List list = new LinkedList();
    private Lock lock = new ReentrantLock();
    private Condition full = lock.newCondition();
    private Condition empty= lock.newCondition();

    private int size;

    public Buffer(int size) {
        this.size = size;
    }

    public void add(V v) throws InterruptedException {
        lock.lock();
        try {
            while (list.size() == size) {
                full.await();
            }

            list.add(v);
            empty.signal();
        } finally {
            lock.unlock();
        }
    }

    public V take() throws InterruptedException {
        lock.lock();
        try {
            while (list.isEmpty()) {
                empty.await();
            }

            V v = list.get(0);
            list.remove(v);
            full.signal();

            return v;
        } finally {
            lock.unlock();
        }
    }
}

缓冲区是否已满和缓冲区是否为空分别由fullempty两个条件谓词表示,因而在唤醒的时候具有更强的针对性,而且只需要唤醒其中一个等待的线程(使用signal而不是signalAll),在竞争激烈的情况下可以减少很多无效的线程上下文切换与加锁操作从而避免性能损耗。

内置锁与显示锁


Lock对象还提供了tryLocklockInterruptibly等方法,支持加锁时设置超时和响应中断等能力,比内置锁灵活很多。在jdk1.5中,显示锁的性能是明显好于内置锁的;在jdk1.6以后的版本使用了改进的算法管理内置锁,两者之间的性能差异几乎可以忽略。

内置锁虽然在灵活性方面差了些,但是有一个明显的优势:在线程转储中可以看到哪些调用帧中获得了哪些锁,对于分析死锁等问题时带来的便利是显示锁无法比拟的。另外,未来提升内置锁性能的可能性应该会更高,因为它是JVM的内置属性,除非需要处理一些内置锁无法满足的问题,否则还是应该优先使用内置锁。

4. 使用concurrent包中的工具类

Semaphore

jdk1.5引入了Semaphore类,表示信号量,accqure方法相当于P操作,release方法相当于V操作。在掌握了生产者、消费者的解题流程后,使用Semaphore来实现Buffer也是很简单的事,使用信号量实现的Buffer如下:

public class Buffer {
    private volatile List list = new LinkedList();
    private Semaphore mutex = new Semaphore(1);
    private Semaphore full;
    private Semaphore empty = new Semaphore(0);

    public Buffer(int size) {
        full  = new Semaphore(size);
    }

    public void add(V v) throws InterruptedException {
        full.acquire();

        try {
            mutex.acquire();
            list.add(v);
        } catch(InterruptedException e) {
            full.release();
        } finally {
            mutex.release();
        }

        empty.release();
    }

    public V take() throws InterruptedException {
        empty.acquire();

        try {
            mutex.acquire();
            V v = list.get(0);
            list.remove(v);

            return v;
        } catch(InterruptedException e) {
            empty.release();
            throw e;
        } finally {
            mutex.release();
            full.release();
        }
    }
}

当然了,这种方式实现的Buffer无论是性能,又或者可读性方面都不比前两种方式好,仅当作练习的话那还是很经典的一个例子的。

BlockingQueue

jdk1.5中新引入的BlockingQueue,它是专门为解决生产者、消费者问题而生的,它定义了四组方法以及五种不同的实现:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • DelayQueue
  • PriorityBlockingQueue
  • SynchronousQueue

使用LockFree算法实现

在竞争的情况下系统的性能会因为加锁产生上下文切换与调度延迟而降低,而非竞争的情况下多余的加锁操作本身也会消耗掉一部分性能。

Compare And Swap

CAS 指的是现代 CPU 广泛支持的一种对内存中的共享数据进行操作的一种特殊指令。这个指令会对内存中的共享数据做原子的读写操作。简单介绍一下这个指令的操作过程:首先,CPU 会将内存中将要被更改的数据与期望的值做比较。然后,当这两个值相等时,CPU 才会将内存中的数值替换为新的值。否则便不做操作。最后,CPU 会将旧的数值返回。这一系列的操作是原子的。它们虽然看似复杂,但却是 Java 5 并发机制优于原有锁机制的根本。简单来说,CAS 的含义是“我认为原有的值应该是什么,如果是,则将原有的值更新为新值,否则不做修改,并告诉我原来的值是多少”。


java.util.concurrent.atomic包提供了大部分数据类型的原子封装,在原有数据类型的基础上,提供了原子性的操作方法,保证了线程安全。下面是采用CAS方式实现的Buffer:

public class Buffer {
    private static class Node {
        E item;
        AtomicReference<Node> next;

        Node(E item, Node next) {
            this.item = item;
            this.next = new AtomicReference<Node>(next);
        }
    }

    private final Node dummy = new Node(null, null);
    private final AtomicReference<Node> head = new AtomicReference<Node>(dummy);
    private final AtomicReference<Node> tail = new AtomicReference<Node>(dummy);

    public boolean add(E e) {
        Node newNode = new Node(e, null);
        while (true) {
            Node t = tail.get();
            Node residue = t.next.get();

            if (t == tail.get()) {
                if (residue == null) {
                    if (t.next.compareAndSet(null, newNode)) {
                        tail.compareAndSet(t, newNode);
                        return true;
                    }
                } else {
                    tail.compareAndSet(t, residue);
                }
            }
        }
    }

    public E take() {
        while (true) {
            Node h = head.get();
            Node t = tail.get();
            Node first = h.next.get();

            if (h == head.get()) {
                if (h == t) {
                    if (first == null) {
                        return null;
                    } else {
                        tail.compareAndSet(t, first);
                    }
                } else if (head.compareAndSet(h, first)) {
                    E e = first.item;
                    if (e != null) {
                        first.item = null;
                        return e;
                    }
                }
            }
        }
    }
}

当竞争程度不高时,基于CAS的实现在性能上远远超过基于锁的实现,但是在竞争激烈的情况下,CAS的性能会比锁定的方式差很多,因为CAS是通过不断地重试、回退的方式处理竞争的,在竞争激烈的情况下会消耗很多的CPU资源;CAS的另一个缺点是会引发ABA问题。理解CAS方式实现的Buffer是有一定的困难的,如果您对非阻塞算法感兴趣,请关注笔者后续的文章。

相关推荐

VBA中利用Instr函数(vba int函数)

【分享成果,随喜正能量】每一个在你的生命里出现的人,都有原因,喜欢你的人给了你温暖和勇气,你喜欢的人让你学会了爱和自持,你不喜欢的人教会你宽容与尊重,不喜欢你的人让你自省与成长。。...

Insta360 Link体验:支持4K画质,一款使用场景丰富的AI云台摄像头

记者|王公逸伴随直播、线上会议需求的兴起,网络直播的需求愈发增大,8月2日,影石Insta360正式推出全新产品:Insta360Link,这是一款AI智能云台摄像头。从产品形态来说,Insta3...

VBA技术资料MF299:利用Instr进行文本查找

我给VBA的定义:VBA是个人小型自动化处理的有效工具。利用好了,可以大大提高自己的工作效率,而且可以提高数据的准确度。“VBA语言専攻”提供的教程一共九套,分为初级、中级、高级三大部分,教程是对VB...

Fabric.js 拖放元素进画布 - 掘金

本文简介点赞+关注+收藏=学会了学习Fabric.js,我的建议是看文档不如看demo。本文实现的功能:将元素拖进到画布中并生成对应的图形或图片。效果如下图所示:...

Vue3为什么推荐使用ref而不是reactive

为什么推荐使用ref而不是reactivereactive本身具有很大局限性导致使用过程需要额外注意,如果忽视这些问题将对开发造成不小的麻烦;ref更像是vue2时代optionapi的data的替...

Fabric.js 样式不更新怎么办?(js更改样式)

本文简介带尬猴,我嗨德育处主任不知道你有没有遇到过在使用Fabric.js时无意中一些骚操作修改了元素的样式,但刷新画布却没更新元素样式?如果你也遇到同样的问题的话,可以尝试使用本文的方法。...

Fabric.js 修改画布交互方式到底有什么用?

本文简介点赞+关注+收藏=学会了fabric.js为我们提供了很多厉害的方法。今天要搞明白的一个东西是canvas.interactive。官方文档对canvas.interact...

Rust Web编程:第五章 在浏览器上显示内容

我们现在正处于可以构建一个Web应用程序的阶段,该应用程序可以使用不同的方法和数据管理一系列HTTP请求。这很有用,特别是当我们为微服务构建服务器时。然而,我们也希望非程序员能够与我们的应...

Fabric.js 自由绘制椭圆 - 掘金(canvas画椭圆)

本文简介点赞+关注+收藏=学会了本文讲解在Fabric.js中如何自由绘制椭圆形,如果你还不了解Fabric.js,可以查阅《Fabric.js从入门到精通》。效果如下图所示...

手把手教你实现JS手搓&quot;防抖&quot;优化代码——专业的事用专业的方法!

前言在我们前端编程中,假如我们要给后端发送请求,万一手抖多点了几次,多发送了几遍怎么办?解决方案:防抖!这种事就要交给我们专业的“防抖”先生来处理!今天,我们就来教大家手搓“防抖”...

详解虚拟DOM与Diff算法(虚拟dom一定比实际dom快吗)

vue的虚拟DOM,Diff算法,其中一些关键的地方从别处搬运了一些图进行说明(感谢制图的大佬),也包含比较详细的源码解读。...

走进 React Fiber 的世界(我走进你的世界手势舞视频)

文/阿里淘系F(x)Team-冷卉Fiber设计思想Fiber是对React核心算法的重构,facebook团队使用两年多的时间去重构React的核心算法,在React16以上...

前端新一代框架 Svelte 火了!十个场景带你简单认识它!

近几年听到的主流框架都是Vue、React、Angular,但其实有一个框架在国外非常火,用起来也是很方便,那就是...

借助DeepSeek实现了一个PDF阅读器

1、简介使用pdf.js库加载和显示PDF文件。实现了翻页、缩放功能。提供了基本的错误处理。功能特点:支持选择本地PDF文件。可以逐页查看PDF内容。支持放大缩小功能。界面简洁,易于使...

DeepSeek代码之旅1:卫星地图标记方法之——html语言的实现

最近遇到一个任务,具体功能如下:1、调用高德地图API,图层为卫星图层,根据需要标记兴趣点;2、标记完成后可以保存兴趣点,便于下次加载历史兴趣点。...

取消回复欢迎 发表评论: