NET中解决KafKa多线程发送多主题的问题

  一般在KafKa消费程序中消费可以设置多个主题,那在同一程序中需要向KafKa发送不同主题的消息,如异常需要发到异常主题,正常的发送到正常的主题,这时候就需要实例化多个主题,然后逐个发送。

  在NET中用RdKafka组件来做消息处理,在Nuget中引用。

  在程序中初始化Producer,并创建多个Topic

private string comtopic = "topic1";

        private string errtopic = "topic2";

        private string kfkip = "192.168.80.32:9092";

        Topic topic = null;

        Topic errTopic = null;


        public ExcuteFlow()

        {

            try

            {

                Producer producer = new Producer(kfkip);

                topic = producer.Topic(comtopic);

                errTopic = producer.Topic(errtopic);

            }

            catch (RdKafkaException ex)

            {

                LogHelper.Error("KafKa初始化KafKa异常 ", ex);

            }

            catch (Exception ex)

            {

                LogHelper.Error("KafKa初始化异常", ex);

            }


        }

在程序中发送其中一个主题:

          try

            {


                if (topic != null)

                {

                    byte[] datas = Encoding.UTF8.GetBytes(JsonHelper.ToJson(flowCommond));

                    Task<DeliveryReport> deliveryReport = topic.Produce(datas);

                    var unused = deliveryReport.ContinueWith(task =>

                    {

                        LogHelper.Info("内容:{flowCommond.ID} 发送到分区:{task.Result.Partition}, Offset 为: {task.Result.Offset}");

                    });

                }

                else

                {

                    throw new Exception("发送消息到KafKa topic 为空");

                }

            }

            catch (RdKafkaException ex)

            {

                LogHelper.Error("发送消息到KafKa  KafKa异常", ex);

            }

            catch (Exception ex)

            {

                LogHelper.Error("发送消息到KafKa异常", ex);

            }

 flowCommond为要发送的对象内容,格式化为Json字符串再发送。

  另一个主题一样处理。

   这里实现一个线程里面发送多个主题,那下面实现多个线程中如何发送多个主题。

  多线程中如果每个线程都new Producer(kfkip) 一次,那KafKa的连接很快会被占满。

  那这里就用单例模式来解决这个问题,每次要用到Producer时检查一下是否已经存在Producer实例,若存在则直接用不用再生成。

/// <summary>

    /// 单例模式的实现

    /// </summary>

    public class SingleProduct : Producer

    {

        // 定义一个静态变量来保存类的实例

        private static SingleProduct uniqueInstance;

        // 定义一个标识确保线程同步

        private static readonly object locker = new object();

        // 定义私有构造函数,使外界不能创建该类实例

        private SingleProduct(string brokerList) : base(brokerList)

        {

        }


        /// <summary>

        /// 定义公有方法提供一个全局访问点,同时你也可以定义公有属性来提供全局访问点

        /// </summary>

        /// <returns></returns>

        public static SingleProduct GetInstance()

        {

            // 当第一个线程运行到这里时,此时会对locker对象 "加锁",

            // 当第二个线程运行该方法时,首先检测到locker对象为"加锁"状态,该线程就会挂起等待第一个线程解锁

            // lock语句运行完之后(即线程运行完之后)会对该对象"解锁"

            if (uniqueInstance == null)

            {

                lock (locker)

                {

                    // 如果类的实例不存在则创建,否则直接返回

                    if (uniqueInstance == null)

                    {

                        string kfkip = System.Configuration.ConfigurationManager.AppSettings["KfkIP"];


                        try

                        {

                            uniqueInstance = new SingleProduct(kfkip);

                            LogHelper.Error("单例模式 实例化 SingleProduct");

                        }

                        catch (RdKafkaException ex)

                        {

                            LogHelper.Error("单例模式 KafKa初始化KafKa异常 ", ex);

                        }

                        catch (Exception ex)

                        {

                            LogHelper.Error("单例模式 KafKa初始化异常", ex);

                        }

                    }

                }

            }


            return uniqueInstance;

        }

    }

然后在初始化的代码中替换Producer producer = new Producer(kfkip);为 Producer producer = SingleProduct.GetInstance();

  OK!以上就完成了多线程多主题的消息发送。

相关文章:

  • 消息队列 Kafka 的基本知识及 .NET Core 客户端

  • .net Kafka.Client多个Consumer Group对Topic消费不能完全覆盖研究总结(一)

  • .net Kafka.Client多个Consumer Group对Topic消费不能完全覆盖研究总结(二)

原文地址:http://www.cnblogs.com/zhangs1986/p/7285525.html


.NET社区新闻,深度好文,微信中搜索dotNET跨平台或扫描二维码关注

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/324196.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaFX图表(六)之条形图

翻译自 JavaFX - 条形图 条形图用于表示使用矩形条的分组数据。这些条的长度描绘了这些值。条形图中的条形可以垂直或水平绘制。 以下是条形图&#xff0c;比较各种汽车品牌。 在JavaFX中&#xff0c;条形图由名为BarChart的类表示。该类属于包javafx.scene.chart。通过实例…

孩子大了真是不好管了

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注公众号&#xff1a;雄雄的小课堂。今天的这篇文章完全是有感而发。我有两个弟弟&#xff0c;老二目前工作较稳定&#xff0c;暂且不表&#xff0c;主要想说一下老三。

IdentityServer4 配置负载均衡

如果使用 IdentityServer4 做授权服务的负载均衡&#xff0c;默认情况下是不可以的&#xff0c;比如有两个授权服务站点&#xff0c;一个资源服务绑定其中一个授权服务&#xff08;Authority配置&#xff09;&#xff0c;如果通过另外一个授权服务获取access_token&#xff0c;…

JavaFX图表(七)之散点图

翻译自 JavaFX - 散点图 散点图是一种图形&#xff0c;它使用在笛卡尔平面中绘制的两个变量的值。它通常用于找出两个变量之间的关系。 以下是在面积和重量之间绘制的散点图。 在JavaFX中&#xff0c;Scatter图表由名为ScatterChart的类表示。该类属于包javafx.scene.chart。…

来自一位家长的电话

【文本不推荐看&#xff0c;只当做个人反思记录】大家好&#xff0c;我是雄雄&#xff0c;欢迎关注本公众号【雄雄的小课堂】。最近&#xff0c;本人的个人站上线&#xff0c;点击文末左下角的“阅读原文&#xff08;http://muxiongxiong.cn&#xff09;”即可浏览&#xff0c;…

Visual Studio 2017 15.3 预览版发布,接近最终版

从Visual Studio 2017 15.3预览版的发布时间表中可以看出&#xff0c;Microsoft似乎马上要发布这一版本的正式版。过去几周对VS2017 15.3的改动主要集中在问题修复上&#xff0c;开发人员可以注意到&#xff0c;最近的VS2017 15.3 Preview版本提供了对C# 7.1的支持&#xff0c;…

JavaFX图表(八)之堆积条形图

翻译自 JavaFX - 堆积条形图 StackedBarChart是BarChart的变体&#xff0c;它绘制了指示类别数据值的条形图。条形可以是垂直的或水平的&#xff0c;这取决于哪个轴是类别轴。每个系列的栏位于上一系列的顶部。 以下是堆积条形图&#xff0c;描绘了人口增长。 在JavaFX中&a…

P2698-花盆Flowerpot【单调队列】

正题 链接 https://www.luogu.org/record/show?rid7934370 大意 有n滴水&#xff0c;给出坐标&#xff0c;水每一个时间单位会往下掉一格&#xff0c;花盆可以随意摆放&#xff0c;要求在宽度最小的情况下接住的第一滴水和最后一滴水时间差超过D 解题思路 横坐标排序&am…

Redis(案例二:高并发商品首页热点数据开发实战)

热点数据 经常会被查询&#xff0c;但是不经常被修改或者删除的数据 ⾸⻚-详情⻚ 链路逻辑 检查缓存是否有 缓存不存在则查询数据库 查询结果放到缓存&#xff0c;设置过期时间 下次访问则命中缓存 代码实现 pojo —热点视频 Data NoArgsConstructor AllArgsConstructor pub…

VS2012找不到EF框架实体模型的解决方法

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注本公众号【雄雄的小课堂】。最近&#xff0c;本人的个人站上线&#xff0c;点击文末左下角的“阅读原文”即可浏览&#xff0c;欢迎浏览、点赞与留言呦~网址&#xff1a;http://www.muxiongxiong.cn前天&#xff0c;在上课时&am…

C#使用Xamarin开发可移植移动应用(2.Xamarin.Forms布局,本篇很长,注意)附源码

前言 源码地址:https://github.com/l2999019/DemoApp 可以Star一下,随意 - - 一点感想 很意外的,第一篇博文被博客园的编辑大哥置顶了.感谢. 评论也很多,褒贬不一,我还是那句话.技术是从无到有的过程,就像一个刚出生的人 不是说他有个强大的爸爸 所以就可以一出生就上天. …

构建SpringCloud项目基础框架

文章目录父项目microcloud本地模拟RPC调用common-api子模块创建dto类创建服务接口创建一个对象拷贝的工具类provider-dept-8001 子模块bootstrap.ymlapplication.ymllogback-spring.xmlSwagger配置创建MyBatisPlus配置类部门 数据库创建脚本创建Dept映射类创建IDeptDAO数据接口…

Nginx中如何配置中文域名?

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注公众号【雄雄的小课堂】。最近我的个人站上线了&#xff0c;一直在优化中&#xff0c;目前优化最多的就是后台&#xff0c;将主页面的色彩重新搭配了下&#xff0c;稍微好看点儿了&#xff0c;以下是后台界面&#xff1a;前台界…

JavaFX图表(九)之堆积面积图

翻译自 JavaFX - 堆积面积图 StackedArea Chart是区域图的变体&#xff0c;显示每个值的贡献趋势&#xff08;例如 - 加班&#xff09;。堆叠区域使每个系列相邻&#xff0c;但不与前面的系列重叠。这与区域图表形成对比&#xff0c;其中每个系列覆盖前面的系列。 以下是描绘…

ASP.NET Core MVC – 自定义 Tag Helpers

介绍 在之前的内容中&#xff0c;我们谈到了Tag Helpers&#xff0c;我们还谈到了 caching Tag Helpers和form Tag Helpers。 通过创建自定义Tag Helpers&#xff0c;我们可以扩展现有元素或创建我们自己的元素。 Tag Helper是实现ITagHelper接口类的统称。MVC为我们提供了该…

sqlserver建库建表建约束,删库删表删约束的示例总结

1.创建一个数据库catedb,保存在本地D盘下面的DB中&#xff0c;主数据文件的初始大小为5MB&#xff0c;最大为200MB&#xff0c;增长率为10%&#xff0c;日志文件的初始大小为3MB&#xff0c;无限制增长&#xff0c;其增长率为2MB&#xff0c;请使用sql语句检测master数据库中是…

JFreeChart(二)之饼图

转载自 JFreeChart饼图 在饼图中&#xff0c;每个扇区的弧长成正比它代表的数量。本章演示了如何使用JFreeChart 从一个给定的业务数据创建饼图。 业务数据 下面的例子描述了移动销售饼图。以下是不同移动品牌和销售(每天单位)列表。 S.N.手机品牌销售(天)1Iphone 5S202Sam…

mysql中如何将默认用户名root改成其他?

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注公众号【雄雄的小课堂】。最近&#xff0c;我的个人站上线啦&#xff0c;大家可以直接在浏览器的地址栏中输入&#xff1a;穆雄雄.com&#xff0c;轻轻敲击回车&#xff0c;即可直接进入……欢迎大家多多关注&#xff0c;多多留…

ASP.NET Core - Razor 页面介绍

简介 随着ASP.NET Core 2 即将来临&#xff0c;最热门的新事物是Razor页面。在之前的一篇文章中&#xff0c;我们简要介绍了ASP.NET Core Razor 页面。 Razor页面是ASP.NET Core的一个新功能&#xff0c;可以使基于页面的编程方式更容易&#xff0c;更高效。 大众的初步印象是…

P2153-晨跑【费用流,网络流,拆点】

前言 这是评测记录 正题 AC评测记录链接&#xff1a; https://www.luogu.org/record/show?rid7945350 大意 一个图&#xff0c;没错要求不能走重复的边和点。求走最多次的情况下路最短。 解题思路 每次行走就是一个流量在流&#xff0c;然后将边权设为1就可以保证边只能走…