資源簡介
該demo是C#中怎樣使用kafka的demo,將demo中的Program.cs中的配置server的IP地址改成本機,即可運行

代碼片段和文件信息
using?System;
using?System.Text;
using?System.Collections.Generic;
using?Confluent.Kafka;
using?Confluent.Kafka.Serialization;
namespace?Kafka.Demo
{
????class?Program
????{
????????static?void?Main(string[]?args)
????????{
????????????if?(string.IsNullOrEmpty(args[0]))
????????????{
????????????????Console.WriteLine(“consume>cmd?[test-consume]“);
????????????????Console.WriteLine(“produce>cmd?[test-produce]“);
????????????}
????????????Console.WriteLine(args[0]);
????????????if?(args[0].Equals(“test-consume“))
????????????{
????????????????Consume(“testconn“);
????????????}
????????????else?if?(!string.IsNullOrEmpty(args[0]))
????????????{
????????????????Consume(args[0]);
????????????}
????????????if?(args[0].Equals(“test-produce“))
????????????{
????????????????Produce(“testconn“);
????????????}
????????}
????????static?void?Produce(string?myTopic)
????????{
????????????var?config?=?new?Dictionaryject>
????????????????{
????????????????????{?“bootstrap.servers“?“10.37.36.96:909210.37.36.97:909210.37.36.103:9092“?}
????????????????};
????????????using?(var?producer?=?new?Producer(config?null?new?StringSerializer(Encoding.UTF8)))
????????????{
????????????????var?dr?=?producer.ProduceAsync(myTopic?null?“test?message?text“).Result;
????????????????Console.WriteLine($“Delivered?‘{dr.Value}‘?to:?{dr.TopicPartitionOffset}“);
????????????}
????????}
????????static?void?Consume(string?myTopic)
????????{
????????????var?count?=?0;
????????????bool?canceled?=?false;
????????????var?conf?=?new?Dictionaryject>
????????????{
????????????????{?“group.id“?“test-consumer-group“?}
????????????????{?“bootstrap.servers“?“10.37.36.96:909210.37.36.97:909210.37.36.103:9092“?}
????????????????{?“auto.commit.interval.ms“?5000?}
????????????????{?“auto.offset.reset“?“earliest“?}
????????????};
????????????using?(var?consumer?=?new?Consumer(conf?null?new?StringDeserializer(Encoding.UTF8)))
????????????{
????????????????consumer.OnMessage?+=?(_?msg)
??????????????????=>
????????????????{
????????????????????count++;
????????????????????Console.WriteLine($“{msg.Topic}.{msg.Partition}.{msg.Offset}“);
????????????????????Console.WriteLine($“Value:{msg.Value}“);
????????????????????//Console.WriteLine($“Read?‘{msg.Value}‘?from:?{msg.TopicPartitionOffset}“);
????????????????};
????????????????consumer.onerror?+=?(_?error)
??????????????????=>
????????????????{
????????????????????canceled?=?true;
????????????????????Console.WriteLine($“Error:?{error}“);
????????????????};
????????????????consumer.OnConsumeError?+=?(_?msg)
??????????????????=>
????????????????{
????????????????????canceled?=?true;
????????????????????Console.WriteLine($“Consume?error?({msg.TopicPartitionOffset}):?{msg.Error}“);
????????????????};
????????????????//consumer.Subscribe(“my-topic“);
????????????????consumer.Subscribe(myTopic);
??
?屬性????????????大小?????日期????時間???名稱
-----------?---------??----------?-----??----
?????目錄???????????0??2018-08-24?13:57??Kafka.Demo\
?????目錄???????????0??2018-08-24?13:57??Kafka.Demo\.vscode\
?????文件????????1206??2018-08-24?13:57??Kafka.Demo\.vscode\launch.json
?????文件?????????322??2018-08-24?13:57??Kafka.Demo\.vscode\tasks.json
?????目錄???????????0??2018-08-24?13:56??Kafka.Demo\bin\
?????目錄???????????0??2018-08-24?13:56??Kafka.Demo\bin\Debug\
?????目錄???????????0??2018-08-24?14:02??Kafka.Demo\bin\Debug\netcoreapp2.0\
?????文件???????12900??2018-08-24?14:02??Kafka.Demo\bin\Debug\netcoreapp2.0\Kafka.Demo.deps.json
?????文件????????7680??2018-08-24?15:24??Kafka.Demo\bin\Debug\netcoreapp2.0\Kafka.Demo.dll
?????文件????????1132??2018-08-24?15:24??Kafka.Demo\bin\Debug\netcoreapp2.0\Kafka.Demo.pdb
?????文件?????????244??2018-08-24?14:02??Kafka.Demo\bin\Debug\netcoreapp2.0\Kafka.Demo.runtimeconfig.dev.json
?????文件?????????154??2018-08-24?14:02??Kafka.Demo\bin\Debug\netcoreapp2.0\Kafka.Demo.runtimeconfig.json
?????文件?????????272??2018-08-24?13:58??Kafka.Demo\Kafka.Demo.csproj
?????目錄???????????0??2018-08-24?14:02??Kafka.Demo\obj\
?????目錄???????????0??2018-08-24?13:56??Kafka.Demo\obj\Debug\
?????目錄???????????0??2018-08-24?14:02??Kafka.Demo\obj\Debug\netcoreapp2.0\
?????文件????????1121??2018-08-24?13:57??Kafka.Demo\obj\Debug\netcoreapp2.0\Kafka.Demo.AssemblyInfo.cs
?????文件??????????42??2018-08-24?13:57??Kafka.Demo\obj\Debug\netcoreapp2.0\Kafka.Demo.AssemblyInfoInputs.cache
?????文件??????????42??2018-08-24?14:02??Kafka.Demo\obj\Debug\netcoreapp2.0\Kafka.Demo.csproj.CoreCompileInputs.cache
?????文件????????1145??2018-08-24?14:02??Kafka.Demo\obj\Debug\netcoreapp2.0\Kafka.Demo.csproj.FileListAbsolute.txt
?????文件??????493934??2018-08-24?13:59??Kafka.Demo\obj\Debug\netcoreapp2.0\Kafka.Demo.csprojResolveAssemblyReference.cache
?????文件????????7680??2018-08-24?15:24??Kafka.Demo\obj\Debug\netcoreapp2.0\Kafka.Demo.dll
?????文件????????1132??2018-08-24?15:24??Kafka.Demo\obj\Debug\netcoreapp2.0\Kafka.Demo.pdb
?????文件?????????149??2018-08-24?14:02??Kafka.Demo\obj\Kafka.Demo.csproj.nuget.cache
?????文件????????1588??2018-08-24?14:02??Kafka.Demo\obj\Kafka.Demo.csproj.nuget.g.props
?????文件?????????981??2018-08-24?13:56??Kafka.Demo\obj\Kafka.Demo.csproj.nuget.g.targets
?????文件??????101477??2018-08-24?14:02??Kafka.Demo\obj\project.assets.json
?????文件????????3180??2018-08-24?15:53??Kafka.Demo\Program.cs
評論
共有 條評論