C# 并行队列 生产者消费者demo

csharp

浏览数:257

2019-1-7

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            //Console.WriteLine("Hello World!");

            for (int i = 0; i < 100; i++)
            {
                Task.Run(() => {
                    Producer();
                });
            }
            

            Execute();

            Console.ReadKey();
        }        public static void Execute()
        {
            //调用Invoke,使得生产者任务和消费者任务并行执行
            //Producer方法和Customer方法在Invoke中的参数顺序任意,不论何种顺序都会获得正确的结果
            //Parallel.Invoke(() => Customer(), () => Customer(), () => Customer(), () => Customer(),
            //    () => Customer(), () => Customer(), () => Customer(), () => Customer(), () => Customer(),
            //    () => Customer());

            Parallel.For(0, 50, i =>
            {
                Customer();
                //Task.Run(() => Customer());
            });
            

            Console.WriteLine(string.Join(",", customerColl));
        }

        //生成者集合
        private static BlockingCollection<string> producerColl = new BlockingCollection<string>();
        //消费者集合
        private static BlockingCollection<string> customerColl = new BlockingCollection<string>();

        private static Random random = new Random();
        public static void Producer()
        {
            //循环将数据加入生成者集合
            for (int i = 1; i < 100; i++)
            {

                Thread.Sleep(random.Next(1, 100));
                var value = $"{Thread.CurrentThread.ManagedThreadId}_{i}";
                producerColl.Add(value);

                Console.WriteLine($"生产: {value} Thread :{Thread.CurrentThread.ManagedThreadId}");
            }

            //设置信号,表明不在向生产者集合中加入新数据
            //可以设置更加复杂的通知形式,比如数据量达到一定值且其中的数据满足某一条件时就设置完成添加
            // producerColl.CompleteAdding();
        }

        public static void Customer()
        {
            //调用IsCompleted方法,判断生成者集合是否在添加数据,是否还有未"消费"的数据
            //注意不要使用IsAddingCompleted,IsAddingCompleted只表明集合标记为已完成添加,而不能说明其为空
            //而IsCompleted为ture时,那么IsAddingCompleted为ture且集合为空
            while (!producerColl.IsCompleted)
            {
                //调用Take或TryTake "消费"数据,消费一个,移除一个
                //TryAdd的好处是提供超时机制
                // customerColl.Add(string.Format("消费:{0}", producerColl.Take()));
                Thread.Sleep(random.Next(100, 500));
                Console.WriteLine($"消费: {producerColl.Take()} Thread: {Thread.CurrentThread.ManagedThreadId}");

            }
        }
    }
}