项目开发中应用并发的一二事

在多线程环境下,使用BlockingCollection以及ConcurrentQueue来消费生产者生产的资源,这是我自己写的多生产者多消费者的作法,其实也是基于单个task下的阻塞队列的IsComplete来识别的。

使用阻塞队列更简单但是内部的消费者线程比较适合使用单独的线程不适合使用线程池,而且阻塞队列为空时会阻塞消费者线程,当然阻塞线程池内的线程也没什么影响只是不推荐这么做,而且阻塞的队列的性能也没有ConcurrentQueue的性能高。

我在项目中遇到多生产者多消费者问题,多生产者没有问题,但是如何在多线程下消费生产者的资源,这就是比较麻烦了,不能仅仅通过判断数量来做,网上也找了一些资源,但是也都是给了个demo,还不全,自己想了个方法,暂时解决了,回头在研究下别人封装的基于Thread的作法。其实是在<<.NET 中的阻塞队列BlockingCollection的正确打开方式>>基础上做的,也没有什么,但是这是个好思路。后续尝试自己封装线程标志来做,不依靠FCL的阻塞队列。code如下:

ConcurrentDictionary<string, string> dic1 = new ConcurrentDictionary<string, string>();
            ConcurrentDictionary<string, string> dic2 = new ConcurrentDictionary<string, string>();
            ConcurrentQueue<string> queue = new ConcurrentQueue<string>();
            BlockingCollection<string> blockingCollection = new BlockingCollection<string>();

            var t = new Task[50];
            Console.WriteLine("生产者开始写入数据.............\r\n");
            
            for(int i=0; i<=49; i++)
            {
                t[i] = Task.Factory.StartNew((param) =>
                {
                    Console.WriteLine("生产者中 *** 阻塞队列输入: {0}", param.ToString());
                    blockingCollection.Add(param.ToString());
                    Console.WriteLine("生产者中 *** 阻塞队列的数量是: {0}", blockingCollection.Count);

                    Console.WriteLine("生产者中 *** 字典dic1输入: {0}", param.ToString());
                    dic1.TryAdd(param.ToString(), param.ToString());
                    Console.WriteLine("生产者中 *** 字典dic1的数量是: {0}", dic1.Count);

                    Console.WriteLine("生产者中 *** 字典dic2输入: {0}", param.ToString());
                    dic2.TryAdd(param.ToString(), param.ToString());
                    Console.WriteLine("生产者中 *** 字典dic2的数量是: {0}", dic2.Count);

                    Console.WriteLine("生产者中 *** 队列输入: {0}", param.ToString());
                    queue.Enqueue(param.ToString());
                    Console.WriteLine("生产者中 *** 队列的数量: {0}", queue.Count);
                }, i);
            }
            
            //Thread.Sleep(500);
            Console.WriteLine("\r\n消费者开始读入数据.............\r\n");

            while (!blockingCollection.IsCompleted)
            {
                Task tt = Task.Factory.StartNew(() =>
                {
                    foreach (var b in blockingCollection.GetConsumingEnumerable())
                    {
                        Console.WriteLine("消费者中 *** 字典dic1的数量是: {0}", dic1.Count);
                        Console.WriteLine("消费者中 *** 字典dic2的数量是: {0}", dic2.Count);

                        Console.WriteLine("消费者中 *** 阻塞队列的数量是: {0}", blockingCollection.Count);

                        string value1 = "";
                        string value2 = "";
                        dic1.TryGetValue(b, out value1);
                        dic2.TryGetValue(b, out value2);

                        Console.WriteLine("消费者中 *** 字典dic1的键值{0}的value值是: {1}", b, value1);
                        Console.WriteLine("消费者中 *** 字典dic1的键值{0}的value值是: {1}", b, value2);
                        Console.WriteLine("消费者中 *** 队列的数量是: {0}", queue.Count);
                        Console.WriteLine("消费者中 *** 字典的数量是: {0}", dic1.Count);

                        if (queue.Count == 50)
                        {
                            blockingCollection.CompleteAdding();
                        }
                    }
                });
            }

            Console.WriteLine("是否完成添加: {0}", blockingCollection.IsCompleted);

参考:

.Net中的并行编程-7.基于BlockingCollection实现高性能异步队列

相关推荐