معرفی و آموزش Apache Kafka

Apache Kafka یک سیستم ارسال پیام انتشار-اشتراک و یک صف قویست، که می تواند حجم بالایی از داده ها را در اختیار داشته باشد و شما را قادر می سازد تا پیام ها را از یک نقطه به نقطه ی دیگر منتقل کنید. کافکا مناسب استفاده برای هر دو پیام های آفلاین و آنلاین است. پیام های کافکا بر روی دیسک منتشر می شوند و در خوشه های مشخص همانند سازی می شوند . 

معرفی Apache Kafka

در Big Data، حجم عظیمی از داده ­ها استفاده می­شود. با توجه به این داده ­ها ، ما با دو چالش اصلی روبه ­رو هستیم. چالش اول نحوه جمع ­آوری این حجم زیاد از داده ­ها و چالش دوم تجزیه وتحلیل این داده­ های جمع آوری شده می­ باشد. برای غلبه بر این چالش ­ها، شما به یک سیستم پیام­رسانی نیاز دارید.

Apache Kafka برای سیستم ­های توسعه یافته با توانایی گذردهی بالا، طراحی شده است. کافکا تمایل دارد جایگزین مناسبی برای پیام­رسان­ های سنتی شود. در مقایسه با سایر سیستم ­های پیام­رسانی، کافکا از توان گذردهی بهتر، تقسیم­ بندی داخلی، تکثیر و تحمل خطای ذاتی برخوردار است، که موجب مناسب بودن آن برای برنامه­ های پردازش پیام در مقیاس بزرگ می­شود.

سیستم پیام ­رسانی چیست؟

یک سیستم پیام­رسانی مسئول انتقال داده ­ها از یک برنامه به برنامه دیگر می­باشد، بنابراین برنامه ­ها می­توانند بدون نگرانی درمورد نحوه­ ی اشتراک آن­ها روی داده­ ها متمرکز شوند. اساس پیام ­رسانی توسع ه­یافته، مفهوم صف­  بندی قابل اعتماد پیام است. پیام ها به طور غیرهمزمان بین برنامه ­های مشتری و سیستم پیام ­رسانی قرار می گیرند. دو نوع الگوی پیام رسانی وجود دارد؛ یکی نقطه به نقطه و دیگری سیستم پیام رسانی انتشار- اشتراک (pub-sub).
بسیاری از الگوهای پیام رسانی از سیستم pub-sub پیروی می­کنند.

سیستم پیام رسانی نقطه به نقطه

در یک سیستم نقطه به نقطه، پیام ها در یک صف انتشار می­ یابند. یک یا چند مصرف­ کننده می­توانند از پیام ­های موجود در صف استفاده کنند، اما یک پیام خاص فقط توسط یک مصرف­ کننده قابل استفاده است. هنگامی که یک پیام از صف توسط یک مشتری خوانده شود، پیام از آن صف ناپدید می­شود. نمونه ­ی بارز این سیستم یک سیستم پردازش سفارش است، که در آن هر سفارش توسط یک پردازنده سفارش پردازش می­شود­، اما پردازنده ­های چند منظوره­ی سفارش می­توانند همزمان چند سفارش را پردازش کنند. نمودار زیر ساختار این سیستم ­ها را نشان می­دهد.

سیستم­ پیام رسان نقطه به نقطه

سیستم­ پیام ­رسان نقطه به نقطه

سیستم پیام رسانی انتشار و اشتراک (publish-subscribe)

در سیستم انتشار و اشتراک، پیام­ها در یک موضوع انتشار می ­یابند. برخلاف سیستم نقطه به نقطه، مصرف ­کنندگان می توانند مشترک یک یا چند موضوع شوند و از تمام پیام­ های موجود در آن موضوع استفاده کنند. در این سیستم، به تولید کنندگان پیام ناشر (publisher) و به مصرف­ کنندگان پیام مشترک (subscriber) گفته می­شود. نمونه­ای از این سیستم ­ها در زندگی واقعی، تلویزیون دیش (Dish TV) است، که کانال­ های مختلفی از جمله ورزش، فیلم، موسیقی و … را منتشر می­کند و هرکسی می تواند در مجموعه کانال­ های موردنظر خود مشترک شود و هر زمان که کانال­ های موردنظر در دسترس باشند از آنها استفاده کنند.

سیستم پیام رسانی انتشار و اشتراک (publish-subscribe)

Apache Kafka چیست؟

Apache Kafka یک سیستم ارسال پیام انتشار-اشتراک و یک صف قویست، که می­تواند حجم بالایی از داده­ ها را در اختیار داشته باشد و شما را قادر می سازد تا پیام ها را از یک نقطه به نقطه­ ی دیگر منتقل کنید. کافکا مناسب استفاده برای هر دو پیام­ های آفلاین و آنلاین است. پیام های کافکا بر روی دیسک منتشر می­شوند و در خوشه ­های مشخص همانند سازی می شوند تا از هدر رفتن داده جلوگیری شود. کافکا در بالای سرویس هماهنگ سازی ZooKeeper ساخته شده است. این سیستم به خوبی با Apache Storm  و Spark Apache برای تجزیه و تحلیل جریان داده ­ها در لحظه، ادغام شده است.

مزیت­های Apache Kafka

در زیر چند مزیت Apache Kafka آورده شده است:

• قابلیت اطمینان – کافکا توزیع، تقسیم و تکثیر شده است و دارای تلرانس خطا می­ باشد.

• مقیاس پذیری – سیستم پیام رسانی کافکا به راحتی و بدون خرابی مقیاس می ­کند.

• دوام – Apache Kafka از سیستم ثبت توسعه یافته، استفاده می­کند و این به این معنی است که پیام ها در سریع­ترین زمان ممکن روی دیسک منتشر می­شوند، از این رو دوام دارند.

• عملکرد – کافکا توانایی گذردهی بالایی هم برای انتشار و هم برای ارسال پیام دارد. این توانایی، عملکرد پایدار را حفظ می کند حتی بسیاری از سل پیام ها نیز ذخیره می شوند.

کافکا بسیار سریع است و خرابی و از دست رفتن اطلاعات صفر را تضمین می کند.

موارد استفاده­ ی Apache Kafka

Apache Kafka را می توان در بسیاری از موارد استفاده کرد. برخی از آنها در زیر فهرست شده اند:

• اندازه ­گیری­ ها – Apache Kafka اغلب برای داده­ های نظارت عملیاتی استفاده می­شود. این شامل جمع ­آوری آماری از برنامه­ های توزیع شده برای تولید متمرکز داده های عملیاتی است.

• Log Aggregation Solution – از کافکا می توان در یک سازمان برای جمع ­آوری اطلاعات مربوط به چندین سرویس و در دسترس قرار دادن آن­ها، در قالب استاندارد، برای مصرف­ کنندگان مختلف، استفاده کرد.

• پردازش جریان – چارچوب­ های محبوب مانند Storm و Spark Streaming، داده ­ها را از یک موضوع می­خوانند، آن را پردازش می­کنند ، و داده­های پردازش­ شده را برای موضوع جدید ارسال می­ کنند که در دسترس کاربران و برنامه های کاربردی قرار می گیرد. دوام قوی کافکا نیز در زمینه پردازش جریان بسیار مفید است.

نیاز به Apache Kafka

Apache Kafka یک پلتفرم یک پارچه برای پردازش کلیه اطلاعات در لحظه می­باشد. Apache Kafka از ارسال پیام با تاخیر کم پشتیبانی می­کند و در صورت بروز خرابی دستگاه، تلرانس خطا را تضمین می­کند. کافکا توانایی اداره­ ی تعداد زیاد مصرف­کنندگان متنوع را داراست و بسیار سریع است، به طوری که در ثانیه ۲ میلیون انتشار دیتا را انجام می دهد. کافکا تمام داده ها را بر روی دیسک منتشر می­کند، که در واقع به این معنی است که همه نوشته ها به حافظه نهان صفحه OS (RAM) منتقل می شوند. این امر انتقال اطلاعات از حافظه نهان صفحه به سوکت شبکه را بسیار کارآمد می کند.

اصول Apache Kafka

قبل از عمیق شدن در مبحث Apache Kafka ، باید با اصطلاحات اصلی مانند موضوعات (topics)، کارگزاران (brokers)، تولیدکنندگان (producers) و مصرف­کنندگان (consumers) آشنا شوید. نمودار زیر اصطلاحات اصلی را نشان می­دهد و جدول، اجزای نمودار را با جزئیات توضیح می­دهد.

نمودار اصطلاحات اصلی

در نمودار فوق، یک موضوع (topic) در سه بخش تنظیم شده است. بخش۱ دارای دو عامل offset، ۰ و ۱ می­باشد. بخش۲، چهار عامل offset، ۰، ۱، ۲ و ۳ دارد و بخش ۳ تنها یک عامل ­offset، صفر دارد. شناسه­ ی replica همان شناسه­ ی سرور میزبان است.

فرض کنید، اگر ضریب تکثیر موضوع روی ۳ تنظیم شود،Apache Kafka  ۳ replica یکسان از هر بخش ایجاد می­کند و آن­ها را در خوشه قرار می­دهد تا برای همه ­ی عملیات­ های مربوط به آن در دسترس باشد. برای تعادل بار در خوشه، هر کارگزار (broker) یک یا چند قسمت از آن بخش­ها را ذخیره می­کند. تولیدکنندگان و مصرف­کنندگان متعدد می­توانند همزمان پیام­ها را منتشر و بازیابی کنند.

 جدول اجزای Apache Kafka  

ردیفاجزاء و توضیحات
۱موضوع (topic)به جریانی از پیام­های متعلق به یک دسته خاص، موضوع (topic) گفته می­شود. داده­ها در موضوع ­ها (topics) ذخیره می شوند.topicها به بخش­ ها (partitions) تقسیم می شوند. برای هر موضوع، کافکا یک mini-mum  از یک پارتیشن را نگه می­دارد. هر بخش این چنینی، پیام­ ها را در یک ترتیب درخواستی تغییرناپذیر نگه ­می ­دارد؛ این بخش به عنوان مجموعه ای از فایل ­های این ترتیب با اندازه­های برابر اجرا می­شود.
۲بخش (partition)هر موضوع می­تواند چند بخش داشته باشد، بنابراین می تواند یک مقدار دلخواه از داده را اداره کند.
۳Offset بخشهر پیام تقسیم شده یک شناسه ­ی ترتیبی منحصربفرد، به نام offset دارد.
۴Replicaهای هر بخشReplicaها به عنوان backup هر بخش، هرگز داده­ ها را نمی­ خوانند یا نمی­ نویسند و از آن­ها برای جلوگیری از، از دست دادن داده استفاده می­شود.
۵کارگزاران (Brokers)کارگزاران در Apache Kafka ، سیستم­های ساده­ ای هستند که وظیفه حفظ داده ­های منتشر شده را دارند. هر کارگزار می­تواند در هر موضوع (topic) چند بخش (partition) داشته باشد یا کلا نداشته باشد. فرض کنید، اگر یک موضوع دارای n بخش باشد و به تعداد n، کارگزار در دسترس باشد، به هر کارگزار یک بخش تعلق می­گیرد. فرض کنید اگر یک موضوع دارای n بخش باشد و بیش از n کارگزار (n+m) دردسترس باشد، به n کارگزار اول یک بخش تعلق گرفته و به m کارگزار بعدی هیچ بخشی در آن موضوع خاص ارائه نخواهد شد. فرض کنید اگر یک موضوع دارای n بخش باشد و کمتر از n کارگزار (n-m) دردسترس باشد، به هر کارگزار یک یا چند بخش تعلق می­گیرد. این سناریو به دلیل عدم تقسیم بار ناعادلانه در بین کارگزاران توصیه نمی شود.
۶Kafka Cluster (خوشه)در Apache Kafka  داشتن بیش از یک کارگزار به عنوان خوشه کافکا نامیده می­شود. خوشه ­ی کافکا بدون خرابی قابل گسترش است. از این خوشه ها برای مدیریت پایداری و تکثیر داده­ های پیام استفاده می­شود.
۷تولیدکنندهتولیدکنندگان درواقع ناشران پیام­ های یک یا چند موضوع Apache Kafka هستند. تولید کنندگان داد­ه­ ها را به کارگزاران کافکا می ­فرستند. هر بار که یک تولید­کننده­، پیامی را به یک کارگزار ارسال می­کند، کارگزار به سادگی پیام را به آخرین پرونده ­ی بخش اضافه می­کند. در واقع، پیام به یک بخش اضافه می­شود. تولیدکننده همچنین می­تواند به بخش­های مورد انتخاب خود نیز پیام ارسال کند.
۸مصرف­کنندهمصرف­ کنندگان داده­ های کارگزاران را می­خوانند. مصرف ­کنندگان در یک یا چند موضوع مشترک می­شوند و با گرفتن داده ­ها از کارگزاران، پیام های منتشر شده را استفاده می­کنند.
۹رهبررهبر گره­­ی مسئول خواندن و نوشتن همه­ ی بخش­های مشخص شده است. هر بخش دارای یک سرور است که به عنوان یک رهبر عمل می­کند.
۱۰دنبال­ کنندهگره­ ایست که از دستورالعمل­ های رهبر پیروی می­کند. اگر رهبر شکست بخورد، یکی از دنبال­ کنندگان به طور خودکار رهبر جدید خواهد شد. یک دنبال­ کننده به عنوان مصرف­ کننده عادی عمل کرده و با کشیدن پیام­ها به سمت خود ذخیره­ ی داده ­های خود را به روز می­کند.

Apache Kafka – معماری خوشه

در Apache Kafka  داشتن بیش از یک کارگزار به عنوان خوشه کافکا نامیده می­شود. خوشه ­ی کافکا بدون خرابی قابل گسترش است. از این خوشه ها برای مدیریت پایداری و تکثیر داده ­های پیام استفاده می­شود. در این بخش به تشریح معماری خوشه می­پردازیم.      

تصویر زیر نمودار خوشه Apache Kafka را نشان می­دهد.

نمودار خوشه Apache Kafka

نمودار خوشه Apache Kafka

جدول زیر هریک از مولفه ­های نشان داده شده در نمودار بالا را تشریح می­کند.

جدول اجزای خوشه Apache Kafka

ردیفاجزا و توضیحات
۱Broker یا کارگزارمعمولا خوشه­ ی Apache Kafka برای حفظ تعادل بار دیتا از چندین کارگزار تشکیل شده است. کارگزاران کافکا بدون تابعیت هستند، بنابراین از ZooKeeper برای حفظ تابعیت خوشه خود استفاده می­ کنند. یک نمونه کارگزار کافکا می­تواند صدها هزار خواندن و نوشتن در هر ثانیه را کنترل کند و می­تواند سل پیام ­های خود را بدون ایجاد اختلال در عملکرد، کنترل کنند. انتخاب رهبر کارگزار کافکا توسط ZooKeeper انجام می شود
۲ZooKeeperZooKeeper برای مدیریت و هماهنگی کارگزار Apache Kafka استفاده می­شود. خدمات ZooKeeper به طور عمده برای آگاه ساختن تولید­کننده و مصرف­کننده در مورد حضور هر کارگزار جدید یا عدم موفقیت آن در سیستم Apache Kafka استفاده می­شود. از این طریق، تولید­کننده و مصرف­کننده در مورد هماهنگی کار خود با هر کارگزار تصمیم می­گیرند.
۳Producers یا تولید­کنندهتولید کنندگان داده ­ها را به سمت کارگزاران ارسال می­کنند. با شروع کار کارگزار جدید، همه تولید کنندگان آن را جستجو کرده و به ­طور خودکار پیامی را برای آن ارسال می­کنند. تولیدکننده در Apache Kafka بدون انتظار برای تایید از طرف کارگزار، به همان سرعتی که کارگزار بتواند رسیدگی کند، پیام ها را ارسال می­کند.
۴Consumers یا مصرف­ کنندهاز آنجا که کارگزاران Apache Kafka بدون تابعیت هستند، مصرف ­کننده باید با استفاده از offset بخش مورد استفاده، تعداد پیام­ های مصرف شده را حفظ کند. اگر مصرف­ کننده پیام خاصی را تایید کند، به این معنی است که مصرف­ کننده تمام پیام­ های قبلی را استفاده کرده است. برای در دسترس بودن پیام ­های موردنظر برای مصرف، مصرف­ کننده به­ صورت غیر همزمان درخواست ارسال پیام را به کارگزار صادر می کند. مصرف­ کنندگان می­توانند به سادگی با تهیه یک مقدار offset، به هر نقطه از یک بخش برگردند یا از آن گذر کنند. ارزش offst مصرف­ کننده توسط ZooKeeper اطلاع داده می­شود.

 Apache Kafka – جریان کار

Kafka هر دو سیستم پیام رسانی pub-sub و صف را به صورت سریع، قابل اعتماد، بادوام و همراه با تلرانس خطا و خرابی صفر فراهم می­کند. در هر دو مورد، تولیدکنندگان به سادگی پیام را به یک موضوع  ارسال می­کنند و مصرف­کننده می­تواند بسته به نیاز خود، هر نوع سیستم پیام ­رسانی را انتخاب کند. نحوه­ی انتخاب سیستم پیام­رسانی توسط مصرف­کننده در ادامه توضیح داده شده است.

مباحثی که تاکنون مطرح شده در مورد مفاهیم اصلی Apache Kafka بوده است، در این قسمت به بررسی جریان کار Apache Kafka  می­پردازیم.

Apache Kafka مجموعه ­ای از موضوعات (topics) است که به یک یا چند بخش تقسیم شده است. یک بخش در Apache Kafka یک دنباله خطی از پیام هاست که در آن هر پیام توسط اندیس آن (تحت عنوان offset) شناخته می­شود. تمام داده­ های موجود در یک خوشه Apache Kafka، اتحادیه ­ی مجزای بخش ­ها هستند. پیام­های دریافتی در انتهای یک بخش نوشته شده و پیام ­ها به طور متوالی توسط مصرف­ کنندگان خوانده می­شوند. دوام سیستم با تکثیر پیام ­ها به کارگزاران مختلف فراهم می­گردد.

Apache Kafka هر دو سیستم پیام رسانی pub-sub و صف را به صورت سریع، قابل اعتماد، بادوام و همراه با تلرانس خطا و خرابی صفر فراهم می­کند. در هر دو مورد، تولیدکنندگان به سادگی پیام را به یک موضوع (topic) ارسال می­کنند و مصرف ­کننده می­تواند بسته به نیاز خود، هر نوع سیستم پیام­رسانی را انتخاب کند. نحوه ­ی انتخاب سیستم پیام­رسانی توسط مصرف ­کننده در ادامه توضیح داده شده ­است.

جریان کار سیستم پیام­رسانی pub-sub در Apache Kafka

در زیر مراحل جریان کار سیستم پیام­رسانی Pub-Sub در Apache Kafka  آورده شده است:

  • تولید کنندگان در فواصل منظم به موضوع، پیام ارسال می­کنند.
  • کارگزار کافکا تمام پیام­ها را در بخش­هایی که برای موضوع خاص موردنظر تنظیم شده است، ذخیره می­کند. از این طریق تقسیم مساوی پیام­ ها بین بخش­ها تضمین می­شود. اگر تولید­کننده دو پیام ارسال کند و دو بخش موجود باشد، کافکا یک پیام را در بخش اول و پیام دوم را در بخش دوم ذخیره می­کند.
  • مشتری در یک موضوع خاص مشترک می­شود.
  • هنگامی که مصرف­ کننده در یک موضوع مشترک شد، کافکا offset فعلی موضوع را در اختیار مصرف ­کننده قرار می­دهد و همچنین در مجموعه Zookeeper ، offset را ذخیره می­کند.
  • مصرف کننده از کافکا در فواصل زمانی منظم (مثلا هر ۱۰۰ Ms) برای پیام­های جدید درخواست می­کند.
  • هنگامی که کافکا پیام­های تولیدکنندگان را دریافت کرد، آن­ها را برای مصرف­کنندگان ارسال می­کند.
  • مصرف­ کننده پیام را دریافت کرده و آن را پردازش می­کند.
  • پس از پردازش پیام­ها، مصرف­ کننده یک تاییدیه برای کارگزار کافکا ارسال می­کند.
  • کافکا به محض دریافت تأییدیه، offset را به مقدار جدید تغییر داده و آن را در Zookeeper به روز می­کند. از آنجا که offsetها در Zookeeper نگهداری می­شوند، مصرف­ کننده می­تواند پیام بعدی را به درستی، حتی در حین در دسترس نبودن سرور، بخواند.

• جریان فوق تا زمانی که مصرف ­کننده درخواست را متوقف کند تکرار خواهد شد.

• مصرف­ کننده این انتخاب را دارد که در هر زمان از پیام ­های یک موضوع عبور کرده و تمام پیام­ های بعدی را بخواند.

جریان کار سیستم پیام ­رسانی صف (گروه مصرف­ کننده) در Apache Kafka

در سیستم­ پیام­رسانی صف در Apache Kafka به جای یک مصرف­ کننده­ ی واحد، گروهی از مصرف­ کنندگان که دارای شناسه گروه یکسان هستند، در یک موضوع مشترک می­شوند. به زبان ساده، مصرف­ کنندگانی که در یک موضوع با شناسه گروه یکسان، مشترک می­شوند، به عنوان یک گروه واحد در نظر گرفته شده و پیام­ ها در میان آنها به اشتراک گذاشته می­شود. در زیر گردش کار واقعی این سیستم مورد بررسی واقع شده است:

  • تولید کنندگان در فواصل زمانی مشخص پیام­ها را به یک موضوع ارسال می­کنند.
  • مانند سیستم قبل، کافکا تمامی پیام ­ها را در یک بخش مشخص از آن موضوع خاص، ذخیره می­کند.
  • یک مصرف ­کننده­ی واحد مشترک یک موضوع خاص می­شود، فرضا موضوع با شناسه­ ی Topic-01  با شناسه گروه Group-1.
  • تا زمانی که مصرف ­کننده­ی جدید به موضوع، با شناسه ­ی Topic-01  با شناسه گروه Group-1، مشترک شود کافکا به همان روش سیستم Pub-Sub با مصرف­ کنندگان تعامل می­کند.
  • به محض ورود مصرف­ کننده­ ی جدید، کافکا عملیات خود را برای به اشتراک گذاشتن حالت تغییر داده و داده­ ها را بین دو مصرف­ کننده به اشتراک می­گذارد. این اشتراک­ گذاری ادامه خواهد یافت تا زمانی که تعداد مشترکین به تعداد بخش تنظیم شده برای آن موضوع خاص برسد.
  • به محض گذشتن تعداد مصرف­ کنندگان از تعداد بخش­ ها، تا زمانی که مصرف­ کنندگان موجود اشتراک خود را لغو کنند مصرف­ کنندگان جدید هیچ پیامی دریافت نخواهند کرد. این سناریو به وجود می­ آید زیرا به هر مصرف­ کننده در کافکا حداقل یک بخش اختصاص داده می­شود و به محض اینکه همه بخش­ها به مصرف ­کنندگان موجود اختصاص داده شود، مصرف­ کنندگان جدید باید منتظر بمانند.
  • این مشخصه به عنوان گروه مصرف­ کننده نیز خوانده­ می­شود.

 نقش ZooKeeper در Apache Kafka

وابستگی اساسی Apache Kafka ، Apache Zookeeper می­ باشد که یک سرویس پیکربندی و همگام ­سازی توزیع شده می ­باشد. این سرویس به عنوان رابط هماهنگی بین کارگزاران کافکا و مصرف­ کنندگان فعالیت می کند. سرورهای کافکا اطلاعات را از طریق یک خوشه Zookeeper به اشتراک می­گذارند. کافکا ابرداده­ های اساسی مانند اطلاعات مربوط به موضوعات (topics)، کارگزاران، offset­های مصرف­ کنندگان (صف خوان) و غیره را در Zookeeper ذخیره می­کند.

از آنجا که تمام اطلاعات ضروری در Zookeeper ذخیره می­شوند و آن به طور معمول این داده­ها را در مجموعه خود تکثیر می­کند، عدم موفقیت کارگزار کافکا یا Zookeeper بر وضعیت(state) خوشه کافکا تاثیر نمی­گذارد. هنگامی که Zookeeper  دوباره شروع به کار کند، کافکا وضعیت (state) را احیا خواهد کرد. این فعالیت­ها خرابی صفر را برای کافکا به ارمغان می­آورند. در صورت عدم موفقیت رهبر، انتخاب رهبر بین کارگزاران کافکا نیز با استفاده از Zookeeper  انجام می شود.

Apache Kafka – مراحل نصب

مرحله ی ۱ – تأیید نصب جاوا

برای نصب Apache Kafka ، با فرض این که نرم­افزار جاوا را روی دستگاه خود نصب کرده باشید، فقط با استفاده از دستور زیر آن را تأیید کنید.

$ java -version

اگر جاوا با موفقیت در دستگاه شما نصب شده باشد، می توانید نسخه ی نصب شده را مشاهده کنید.

مرحله ی ۱.۱ – دانلود JDK

در صورتی که نرم افزار جاوا را دانلود نکرده اید، لطفاً با مراجعه به لینک زیر، آخرین نسخه JDK را بارگیری کنید.

http://www.oracle.com/technetwork/java/javase/downloads/index.html

هم اکنون آخرین نسخه JDK 8u 60، با شماره فایل((jdk-8u60-linux-x64.tar.gz)) است. لطفا فایل را در دستگاه خود دانلود کنید.

مرحله ی ۱.۲ – استخراج فایل ها

بطور کلی فایل های دانلود شده در پوشه ی دانلودها ذخیره می شوند، آن را تأیید کرده و با استفاده از دستورات زیر، برنامه ی نصب tar را استخراج کنید.

$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

مرحله ی ۱.۳ – به Opt Directory بروید

برای اینکه جاوا در دسترس همه کاربران قرار گیرد، محتوای جاوا استخراج شده را به پوشه usr/local/java/ انتقال دهید.

$ su
password: (type password of root user)
$ mkdir /opt/jdk
$ mv jdk-1.8.0_60 /opt/jdk/

مرحله ی ۱.۴ – تنظیم path

برای تنظیم path و متغیرهای JAVA_HOME ، دستورات زیر را به پرونده ~/.bashrc اضافه کنید.

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

اکنون تمام تغییرات را در سیستم عامل جاری اعمال کنید.

$ source ~/.bashrc

مرحله ی ۱.۵ – گزینه های جاوا
برای تغییر گزینه های جاوا از دستور زیر استفاده کنید.

update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100

مرحله ی ۱.۶ – اکنون با استفاده از دستور تایید  (java -version) توضیح داده شده در مرحله ی ۱، جاوا را تایید کنید.

مرحله ی ۲ – نصب چارچوب ZooKeeper

مرحله ی ۲.۱ – دانلود ZooKeeper

برای نصب چارچوب ZooKeeper روی دستگاه خود، به لینک زیر مراجعه کرده و آخرین نسخه ZooKeeper را دانلود کنید.

http://zookeeper.apache.org/releases.html

هم اکنون، آخرین نسخه ZooKeeper 3.4.6 (ZooKeeper-3.4.6.tar.gz) است.


مرحله ی ۲.۲ – استخراج فایل tar


با استفاده از دستور زیر فایل tar را استخراج کنید.
 

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data

مرحله ی ۲.۳ – ایجاد فایل Configuration

پرونده پیکربندی را با نام conf / zoo.cfg با استفاده از دستور vi conf / zoo.cfg  و کلیه پارامترهای زیر باز کنید تا به عنوان نقطه شروع تنظیم شود.

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

هنگامی که فایل configuration با موفقیت ذخیره شد و دوباره به ورودی بازگشتید، می توانید سرور zookeeper را شروع کنید.

مرحله ی ۲.۴ – شروع کار با سرور ZooKeeper 

$ bin/zkServer.sh start

پس از اجرای دستور فوق، مطابق شکل زیر پاسخی دریافت خواهید کرد:

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

مرحله ی ۲.۵ – شروع CLI

$ bin/zkCli.sh

بعد از تایپ دستور فوق، به سرور zookeeper وصل می شوید و پاسخ زیر را دریافت خواهید کرد:

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null

[zk: localhost:2181(CONNECTED) 0]

مرحله ی ۲.۶ – متوقف کردن سرور zookeeper

پس از اتصال سرور و اتمام عملیات، می توانید سرورzookeeper را با دستور زیر متوقف کنید:

$ bin/zkServer.sh stop

اکنون با موفقیت Java و ZooKeeper را روی دستگاه خود نصب کرده اید. در ادامه مراحل نصبApache Kafka  را ببینید.

مرحله ی ۳ – نصب Apache Kafka

برای نصب Apache Kafka  مراحل زیر را دنبال کنید. 


مرحله ی ۳.۱ – دانلود Apache Kafka


برای نصب Apache Kafka بر روی دستگاه خود، روی لینک زیر کلیک کنید

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

از طریق لینک فوق، آخرین نسخه یعنی kafka_2.11_0.9.0.0.tgz  روی دستگاه شما دانلود می شود.

مرحله ی ۳.۲ – استخراج فایل tar


فایل tar را با استفاده از دستور زیر استخراج کنید
 

$ cd opt/
$ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0

اکنون آخرین نسخه Apache Kafka روی دستگاه شما دانلود شده است.

مرحله ی ۳.۳ – شروع سرور 

می توانید سرور را با استفاده از دستور زیر شروع کنید

$ bin/kafka-server-start.sh config/server.properties

بعد از شروع سرور، پاسخ زیر را روی صفحه خود مشاهده خواهید کرد:

$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….

مرحله ی ۴ – متوقف کردن سرور 

پس از اتمام عملیات، می توانید سرور را با استفاده از دستور زیر متوقف کنید:
 

$ bin/kafka-server-stop.sh config/server.properties

Apache Kafka – عملیات اساسی

در این بخش به شرح برخی عملیات اساسی در Apache Kafka پرداخته ایم.

ابتدا اجازه دهید اجرای پیکربندی single node-single broker در Apache Kafka را شروع کنیم و سپس تنظیمات خود را به پیکربندی single node-multiple brokers  منتقل کنیم.
امیدوارم تاکنون Java ، ZooKeeper و Apache Kafka را روی دستگاه خود نصب کرده باشید. قبل از رفتن به تنظیمات Kafka Cluster ، ابتدا باید ZooKeeper خود را شروع کنید؛ زیرا Kafka Cluster از ZooKeeper استفاده می کند.

شروع ZooKeeper در Apache Kafka

یک ورودی جدید باز کرده و دستور زیر را داخل آن تایپ کنید:

bin/zookeeper-server-start.sh config/zookeeper.properties


برای شروع کارگزار Broker) Apache Kafka) ، دستور زیر را تایپ کنید:

bin/kafka-server-start.sh config/server.properties


بعد از شروع کارگزار Apache Kafka ، دستورات jps را در ورودی ZooKeeper تایپ کنید، سپس پاسخ زیر را مشاهده می کنید:

821 QuorumPeerMain
928 Kafka
931 Jps


اکنون می توانید دو Daemon را در حال اجرا روی ورودی ببینید که QuorumPeerMain به عنوان Daemon  برای ZooKeeper  و دیگری Daemon برای Kafka  است.


پیکربندی single node-single broker در Apache Kafka

در این پیکربندی یک نمونه تک از ZooKeeper و شناسه ی کارگزار دارید. مراحل پیکربندی آن در زیر آمده است:

ایجاد یک موضوع (topic) در Apache Kafka – کافکا یک ابزار خط فرمان با نام kafka-topics.sh را برای ایجاد موضوعات بر روی سرور فراهم می کند. ورودی جدید را باز کنید و مثال زیر را تایپ کنید:

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
--partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1   
--partitions 1 --topic Hello-Kafka

در مثال بالا، یک موضوع به نام Hello-Kafka با یک بخش واحد و یک فاکتور replica ایجاد شد. خروجی ایجاد شده فوق مشابه خروجی زیر خواهد بود:

Output − Created topic Hello-Kafka

پس از ایجاد موضوع، می توانید اعلان را در پنجره ورودی کارگزار کافکا ببینید و گزارش مربوط به موضوع ایجاد شده در “/tmp/kafka-logs/“   در فایل config/server.properties  دریافت کنید.

لیست موضوعات 

برای به دست آوردن لیستی از مباحث در سرور Apache Kafka ، می توانید از دستور زیر استفاده کنید:

Syntax

bin/kafka-topics.sh --list --zookeeper localhost:2181

Output

Hello-Kafka

از آنجا که ما یک موضوع ایجاد کرده ایم، فقط موضوع Hello-Kafka در لیست دریافتی ذکر شده است. درصورت ایجاد موضوعات بیشتر،  لیست نام  آن موضوعات را در خروجی دریافت خواهید کرد.

شروع ارسال پیام توسط تولیدکننده

Syntax

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name


از syntax فوق، دو پارامتر اصلی برای مشتری خط فرمان تولیدکننده مورد نیاز است:
۱. لیست کارگزاران: لیست کارگزارانی که می خواهیم پیام ها را برای آنها ارسال کنیم. در این حالت ما فقط یک کارگزار داریم. پرونده Config / server.properties حاوی شناسه درگاه کارگزار است، از آن جا که می دانیم کارگزار ما درگاه 9092 را گوش می دهد، بنابراین می توانید مستقیما آن را تمییز دهید.

۲. نام موضوع: در اینجا مثالی برای نام موضوع آمده است:

Example

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka


تولیدکننده منتظر ورودی از stdin بوده و آن را برای خوشه Apache Kafka منتشر می کند. به طور پیش فرض، هر خط جدید به عنوان یک پیام جدید منتشر می شود و سپس خصوصیات پیش فرض تولیدکننده در فایل config/producer.properties  ذخیره می شود. اکنون می توانید همانطور که در زیر نشان داده شده، چند خط پیام را در ورودی تایپ کنید.
 

Output

$ bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Hello-Kafka[2016-01-16 13:50:45,931] 
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message

شروع دریافت پیام ها از سوی مصرف کننده

مشابه تولیدکننده، ویژگی های پیش فرض مصرف  کننده در فایل config/consumer.proper-ties مشخص شده است. یک ورودی جدید باز کرده و syntax زیر را برای استفاده از پیام تایپ کنید.

Syntax

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name 
--from-beginning

Example

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka 
--from-beginning

Output

Hello
My first message
My second message


سرانجام، شما می توانید پیام هایی را از ورودی تولیدکننده وارد کرده و در ورودی مصرف کننده مشاهده کنید. از هم اکنون، شما درک خوبی در مورد single node cluster  با یک کارگزار واحد دارید. بگذارید اکنون به سراغ پیکربندی چندین کارگزار برویم.
 

پیکربندی Single Node-Multiple Brokers در Apache Kafka

قبل از این که به سراغ تنظیمات حالت چند کارگزار بروید، سرور ZooKeeper خود را فعال کنید. سپس مراحل زیر را طی کنید:

چندین کارگزار کافکا ایجاد کنید: ما یک نمونه کارگزار کافکا در حال حاضر در config/server.properties داریم. حال ما به چندین نمونه کارگزار نیاز داریم، بنابراین فایل server.prop-erties موجود را در دو فایل config جدید کپی کرده و آن را به عنوان server-one.properties و server-two.prop-erties تغییر نام دهید. سپس هر دو فایل جدید را ویرایش کرده و تغییرات زیر را اختصاص دهید: 

config/server-one.properties :

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1


config/server-two.properties :
 

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

شروع کار کارگزاران: پس از ایجاد تغییرات در سه سرور، سه پایانه جدید را باز کنید تا کارگزاران یک به یک شروع شوند.

Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties


حالا ما سه کارگزار مختلف داریم که روی دستگاه کار می کنند. برای بررسی تمام Daemon ها jps را در ورودی ZooKeeper  تایپ کنید ، سپس پاسخ را مشاهده خواهید کرد.

ایجاد یک موضوع

بگذارید مقدار فاکتور replication را به عنوان سه مورد برای این موضوع اختصاص دهیم، زیرا ما سه کارگزار مختلف داریم. اگر دو کارگزار دارید ، مقدار replica اختصاص داده شده دو عدد خواهد بود.

Syntax

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic topic-name

Example

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 
-partitions 1 --topic Multibrokerapplication

Output
 

created topic “Multibrokerapplication”


از دستور توصیف برای بررسی اینکه کدام کارگزار مشغول گوش دادن به موضوع ایجاد شده فعلی است، مانند حالت زیر استفاده می شود:
 

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Output

bin/kafka-topics.sh --describe --zookeeper localhost:2181 
--topic Multibrokerappli-cation

Topic:Multibrokerapplication    PartitionCount:1 
ReplicationFactor:3 Configs:
   
Topic:Multibrokerapplication Partition:0 Leader:0 
Replicas:0,2,1 Isr:0,2,1

از خروجی بالا می توان نتیجه گرفت که خط اول خلاصه ای از همه بخش ها، نام موضوع، تعداد بخش ها و فاکتور replication را که قبلاً انتخاب کرده ایم، نشان می دهد. در خط دوم ، هر گره رهبر یک قسمت از بخش ها که، به طور تصادفی انتخاب شده، خواهد بود.
در مثال ما، می بینیم که اولین کارگزار ما (با شناسه ی کارگزاری صفر) رهبر است. سپس Replicaهای صفر، ۱ و ۲ بدین معنی است که همه ی کارگزاران موضوع را همانطور تکرار می کنند که در آخر Isr است، Isr مجموعه Replicaهای همگام سازی است.

آغاز ارسال پیام توسط تولیدکننده

This procedure remains the same as in the single broker setup.

این رویه همانند پیکربندی قبلی است.

Example
 

bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic Multibrokerapplication

Output
 

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message

آغاز دریافت پیام توسط مصرف کننده


این رویه همانند پیکربندی قبلی است.
 

Example
 

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion --from-beginning

Output
 

bin/kafka-console-consumer.sh --zookeeper localhost:2181 
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message

عملیات اساسی بر روی موضوع


در این فصل ما در مورد انواع عملیات اساسی بر روی موضوع در Apache Kafka ، بحث خواهیم کرد.


اصلاح یک موضوع


همانطور که قبلا چگونگی ایجاد یک موضوع را در Kafka Cluster گفته شد، اکنون با استفاده از دستور زیر به اصلاح یک موضوع ایجاد شده را می پردازیم:
 

Syntax
 

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name 
--parti-tions count

Example
 

We have already created a topic “Hello-Kafka” with single partition count and one replica factor. 
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181 
--alter --topic Hello-kafka --parti-tions 2


Output
 

WARNING: If partitions are increased for a topic that has a key, 
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

حذف یک موضوع
برای حذف یک موضوع در Apache Kafka  می تواند از کدهای زیر استفاده کنید:
 

Syntax
 

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

Example
 

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka

Output
 

> Topic Hello-kafka marked for deletion

Apache Kafka – مثال ساده از تولیدکننده

در این بخش می خواهیم با استفاده از یک سرویس دهنده جاوا، برنامه ای برای انتشار و استفاده از پیام ها ایجاد کنیم. مشتری تولیدکننده Apache Kafka از API زیر تشکیل شده است:

API  تولیدکننده Apache Kafka 

در این بخش مهم ترین مجموعه API تولیدکننده Apache Kafka را توضیح خواهیم داد. قسمت مرکزی API تولیدکننده کافکا کلاس تولیدکننده کافکا است. کلاس تولیدکننده کافکا گزینه ای را برای اتصال یک کارگزار کافکا در ساختار خود، با روش های زیر فراهم می کند.
• کلاس تولیدکننده ک Apache Kafka روش ارسالی برای فرستادن پیام به صورت غیرهمزمان به یک موضوع را فراهم می کند. کد این روش به شرح زیر است:
 

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);

• رکورد تولیدکننده – تولیدکننده مجموعه ای از رکوردها را منتظر ارسال اند ، مدیریت می کند.
• Callback – پس از تأیید رکورد توسط سرور، یک کاربر Callbackای را به اجرا در می آورد. 
• کلاس تولیدکنند ه ی کافکا یک  flush method  را برای اطمینان از اتمام ارسال پیام های قبلی، فراهم می کند.  Syntaxاین روش به شرح زیر است :
 

public void flush()


• کلاس تولیدکنند ه ی Apache Kafka روش partitionFor را فراهم می کند، که به دریافت ابرداده های مربوط به موضوعی خاص در یک بخش کمک می کند. این می تواند برای بخش بندی های سفارشی استفاده شود. کد دستوری این روش به شرح زیر است:
 

public Map metrics()


این دستور، نقشه ی محاسبات داخلی را، که توسط تولید کننده نگهداری می شود ، برمی گرداند.
• Public void close () – کلاس تولیدکنند ه ی کافکا بلوک های این روش را تا زمانی که تمام درخواست-های قبلی تکمیل شوند، فراهم می کند.

API تولید کننده

قسمت مرکزی API تولیدکننده Apache Kafka کلاس تولیدکننده کافکا است. کلاس تولیدکننده کافکا گزینه ای را برای اتصال یک کارگزار کافکا در ساختار خود، با روش های زیر فراهم می کند.


کلاس تولید کننده


کلاس تولیدکننده Apache Kafka روش ارسالی برای فرستادن پیام به یک یا چند موضوع فراهم می-کند. کد این روش به شرح زیر است: 
 

public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

دو نوع تولیدکننده وجود دارد: Sync و Async

پیکربندی API برای تولیدکننده Sync تغییری نخواهد کرد. تفاوت بین این دو تولیدکننده در این است که یک تولیدکننده Sync پیام ها را مستقیماً اما در پس­زمینه ارسال می­کند. زمانی که نیاز به توان گذردهی بالاتری داشته باشید تولید کننده Async بهتر است. در نسخه­های قبلی مانند ۰.۸، یک تولیدکننده async برای ثبت خطای گیرنده در عملیات send()، callbackای ندارد. این امکان تنها در نسخه فعلی ۰.۹ در دسترس است.

public void close()

کلاس تولیدکنند­­ه­ این روش را برای بستن تمام راه­های ارتباطی بین تولیدکنندگان و کارگزاران، فراهم می­کند.

تنظیمات پیکربندی

برای فهم بهتر تنظیمات اصلی پیکربندی API تولیدکننده Apache Kafka  در جدول زیر آمده است:

ردیفتنظیمات پیکربندی و توضیحات آن
۱شناسه­ ی مشتریApplicationتولیدکننده را مشخص می­کند.
۲نوع تولیدکنندهSync  یا  async
۳Acksتنظیمات acks، معیارهای تحت درخواست تولیدکننده را کاملا کنترل می­کنند.
۴تلاش مجدداگر درخواست تولیدکننده با شکست مواجه شود، به صورت خودکار با مقدار مشخص جدیدی مجددا درخواست می­شود.
۵bootstrap.serversلیست راه اندازی کارگزاران.
۶linger.msاگر می­خواهید تعداد درخواست ها را کاهش دهید، می­توانید از linger.ms استفاده کنید.
۷key.serializerکلید رابط سریال سازی.
۸value.serializerمقدار رابط سریال سازی.
۹batch.sizeاندازه­ی بافر
۱۰buffer.memoryمقدار کل حافظه موجود برای تولیدکننده را، برای بافر کنترل می­کند.


ProducerRecord API

ProducerRecord یک جفت کلید/مقدار است که به خوشه­ی کافکا ارسال می­شود. سازنده­ی کلاس ProducerRecord برای ایجاد یک رکورد با جفت­های بخش، کلید و مقدار با استفاده از دستور زیر استفاده می­شود:

public ProducerRecord (string topic, int partition, k key, v value)


• topic – نام موضوع تعریف شده توسط کاربر که برای ضبط اضافه خواهد شد.
• partition – تعداد بخش
• key – کلیدی که در ضبط درج می شود.
• value – محتوای رکورد
 

public ProducerRecord (string topic, k key, v value)


سازنده کلاس ProducerRecord برای ایجاد یک رکورد با کلید ، جفت ارزش و بدون بخش استفاده می شود.
• topic – برای اختصاص رکوردها موضوعی ایجاد کنید.
• key – کلید برای ضبط.
• value – محتوای رکورد.
 

public ProducerRecord (string topic, v value)

کلاس ProducerRecord بدون بخش و کلید یک رکورد ایجاد می کند.
• topic – ایجاد یک موضوع.
• value – محتوای رکورد.
روش های کلاس ProducerRecord در جدول زیر ذکر شده است:

ردیفروش­های کلاس و توضیحات آن
۱public string topic()موضوع به رکورد اضافه می­شود.
۲public K key()کلید در رکورد گنجانده می­شود. در صورت عدم وجود چنین کلیدی، مقدار null بازگردانده خواهد شد.
۳public V value()ضبط محتوا
۴partition()بخش برای گزارش شمرده می­شوند.

SimpleProducer application

قبل از ایجاد برنامه ابتدا ZooKeeper و کارگزار Apache Kafka را راه اندازی کنید و سپس موضوع دلخواه خود را در کارگزار کافکا با استفاده از دستور ایجاد موضوع، ایجاد کنید. پس از آن یک کلاس جاوا به نام SimpleProducer.java ایجاد کنید و کد زیر را تایپ کنید.

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

Compilation   – برنامه با استفاده از دستور زیر قابل گردآوری است.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

Execution – برنامه با استفاده از دستور زیر قابل اجرا است.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

Output

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

مثال ساده مصرف کننده

باتوجه به اینکه تولید کننده ای برای ارسال پیام به خوشه Apache Kafka ایجاد کرده ایم، هم اکنون اقدام به ایجاد یک مصرف کننده برای استفاده از پیام ها، می کنیم. API مصرف کننده ی کافکا برای مصرف پیام های خوشه کافکا استفاده می شود. سازنده کلاس KafkaConsumer در زیر تعریف شده است:

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)


configs – نقشه پیکربندی های مصرف کننده را برمی گرداند.
کلاس KafkaConsumer دارای روش های قابل توجهی است که در جدول زیر ذکر شده اند:

ردیفروش­ها و توضیحات
۱public java.util.Set<TopicPar-tition> assignment()مجموعه بخش­هایی را که به تازگی توسط مصرف­کننده اختصاص داده شده را، دریافت می­کند.
۲public string subscription()برای دریافت مدوام بخش­های اختصاص داده شده، مشترک لیست موضوعات مشخص شده می­شود.
۳public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)برای دریافت مدوام بخش­های اختصاص داده شده، مشترک لیست موضوعات مشخص شده می­شود.
۴public void unsubscribe()اشتراک موضوعات لیست مشخص شده­ای از بخش­ها را لغو می­کند.
۵public void sub-scribe(java.util.List<java.lang.String> topics)برای دریافت مدوام بخش­های اختصاص داده شده، مشترک لیست موضوعات مشخص شده می­شود. اگر لیست خالی باشد مانند حالت روش ۴ عمل می­کند.
۶public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)الگوی استدلال به الگوی اشتراک در قالب بیان معمول اشاره دارد و آرگومان شنونده از الگوی اشتراک، اعلان دریافت می­کند.
۷public void as-sign(java.util.List<TopicParti-tion> partitions)به صورت دستی لیستی از بخش­ها را به مصرف­کننده اختصاص می­دهد.
۸poll()برای موضوعات یا بخش­های مشخص شده با استفاده از یکی از API های مشترک یا اختصاص داده شده، داده را بارگیری می­کند. اگر قبل از اجرای این روش بر روی داده­ها مشترک موضوعات ­نباشند ، خروجی اعلام خطا خواهد بود.
۹public void commitSync()Commit offsets را در آخرین poll() برای همه فهرست­های اشتراکی موضوعات و بخش­ها بازگردانده می­شود­. همین عمل برای commitAsyn() اعمال می­شود.
۱۰public void seek(TopicPartition partition, long offset)مقدار offset فعلی را که مصرف­کننده با استفاده از روش poll() بعدی استفاده خواهد کرد ، بارگیری می­کند.
۱۱public void resume()بخش متوقف شده را مجددا فعال می­کند.
۱۲public void wakeup()مصرف­کننده را فعال می­کند.

ConsumerRecord API
ConsumerRecord API برای دریافت رکوردها از خوشه Apache Kafka استفاده می شود. این API شامل یک نام موضوع، شماره بخشی که از آن رکورد دریافت می شود و یک offset که به یک رکورد در یک بخش Apache Kafka اشاره می کند، می باشد. کلاس ConsumerRecord برای ایجاد یک رکورد مصرف کننده با نام موضوع خاص، تعداد بخش ها و جفت های <کلید ، مقدار> استفاده می شود که دارای دستور زیر است:
 

public ConsumerRecord(string topic,int partition, long offset,K key, V value)


• topic – نام موضوع برای رکورد مصرف کننده که از خوشه کافکا دریافت شده است.
• partition – بخش برای موضوع.
• key – کلید ضبط ، در صورت وجود هیچ کلیدی، null بازگردانده خواهد شد.
• value – محتوای رکورد.


ConsumerRecord API


ConsumerRecords API به عنوان ظرفی برای ConsumerRecord عمل می کند. این API برای نگه داشتن لیست ConsumerRecord در هر بخش برای هر موضوع خاص استفاده می شود. سازنده ی این API در زیر تعریف شده است:

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)

•    TopicPartition − Return a map of partition for a particular topic.
•    Records − Return list of ConsumerRecord.

ConsumerRecords class has the following methods defined.

• TopicPartition – نقشه بخش را برای یک موضوع خاص برمی گرداند.
• سوابق – لیستی از ConsumerRecord را بازمی گرداند.
کلاس ConsumerRecords دارای روش هایی است که در زیر جدول تعریف شده است:

ردیفروش­ها و توضیحات
۱public int count()تعداد کل رکوردهای همه­ی موضوعات را برمی­گرداند.
۲public Set partitions()در این مجموعه رکوردها، مجموعه­ی بخش­های دارای داده بازگردانده می­شود. در صورت عدم وجود داده، مجموعه خالی خواهد بود.
۳public Iterator iterator()Iterator شما را قادر می­سازد تا از طریق در مجموعه­­ای از  گرفتن یا از بین بردن عناصر، چرخ بزنید.
۴public List records()لیستی از رکوردهای مربوط به بخش­های موردنظر را می­دهد.

تنظیمات پیکربندی

تنظیمات پیکربندی برای API مصرف­کننده در جدول زیر ذکر شده است:

ردیفتنظیمات و توضیحات
۱bootstrap.serversلیست راه­اندازی کارگزاران
۲group.idیک مصرف­کننده را به یک گروه اختصاص می­دهد.
۳enable.auto.commitدر صورت صحیح بودن مقدار offsetها به صورت خودکار عمل commit را انجام خواهد داد.
۴auto.commit.interval.msزمان­های بین به روزرسانی offsetهای استفاده در ZooKeeper را برمی­گرداند.
۵session.timeout.msمشخص می­کند که کافکا، چند میلی ثانیه قبل از منصرف شدن و ادامه مصرف پیام، منتظر پاسخ ZooKeeper به یک درخواست (خواندن یا نوشتن) خواهد بود.

SimpleConsumer Application 


مراحل این بخش با مراحل producer application  یکسان است. ابتدا ZooKeeper و کارگزار Kafka خود را شروع کنید. سپس با کلاس جاوا بنام SimpleConsumer.java یک برنامه SimpleConsumer ایجاد کنید و کد زیر را تایپ کنید:

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

Compilation   – برنامه با استفاده از دستور زیر قابل گردآوری است.

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java


Execution – برنامه با استفاده از دستور زیر قابل اجرا است.
 

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

Input – CLI سازنده را باز کنید و پیام هایی را برای موضوع ارسال کنید. می توانید ورودی ساده ای مانند ‘Hello Consumer’ را قرار دهید.

Output

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer

Apache Kafka – مثال گروه مصرف ­کنندگان

گروه مصرف کنندگان یک از دو مصرف multi-threaded یا multi-machine  از موضوعات کافکا، می باشد.

گروه مصرف کننده

• مصرف کنندگان می توانند با استفاده از یک شناسه گروه یکسان به گروه بپیوندند.
• حداکثر میزان موازی بودن یک گروه به تعداد مصرف کنندگان گروه ربط دارد نه به تعداد بخش ها.
• Apache Kafka، بخش های یک موضوع را به یک مصرف کننده از گروه اختصاص می دهد ، بدین ترتیب هر بخش تنها توسط یک مصرف کننده در گروه مصرف می شود.
• Apache Kafka تضمین می کند که یک پیام تنها توسط یک مصرف کننده در گروه خوانده می شود.
• مصرف کنندگان می توانند پیام را به ترتیب ذخیره شدن در لیست مربوطه مشاهده کنند.

تعادل مجدد یک مشتری

اضافه کردن مراحل یا موضوعات بیشتر باعث می شود تا کافکا دوباره تعادل برقرار کند. اگر هر مصرف کننده یا کارگزاری نتواند به ZooKeeper اعلام موجودیت کند، می تواند از طریق خوشه ی کافکا دوباره پیکربندی شود. در طی این تعادل مجدد، Apache Kafka بخش های موجود را به مصارف موجود اختصاص می دهد، احتمالا بخش را به فرآیند دیگری منتقل می کند.

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serialization.ByteArraySerializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

گردآوری

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

اجرا

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group

Here we have created a sample group name as my-group with two consumers. Similarly, you can create your group and number of consumers in the group.

در اینجا ما یک گروه نمونه به نام my-group  با دو مصرف کننده ایجاد کرده ایم. به طور مشابه، می-توانید گروه و تعداد مصرف کنندگان خود را در گروه ایجاد کنید.

ورودی
CLI تولیدکننده را باز کرده و مانند نمونه ی زیر پیام ارسال کنید:
 

Test consumer group 01
Test consumer group 02

خروجی اولین فرآیند

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

خروجی دومین فرآیند

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02


امیدوارم با استفاده از نسخه ی نمایشی Java ، SimpleConsumer و ConsumeGroup را درک کرده باشید. اکنون شما ایده ای در مورد نحوه ارسال و دریافت پیام با استفاده از سرویس دهنده جاوا دارید. بگذارید در فصل بعد ادغام apache kafka با فناوری های big data را ادامه دهیم.
 

Apache Kafka – ادغام با storm

در این بخش به شرح ادغام Apache Kafka با Apache Storm می پردازیم.

درباره ی storm

Storm در ابتدا توسط Nathan Marz و تیم او در BackType ایجاد شد. در مدت زمان کوتاهی، Apache Storm به یک استاندارد برای سیستم پردازش توزیع شده در لحظه تبدیل شد که به شما امکان پردازش حجم عظیمی از داده ها را می دهد. Storm بسیار سریع است و یک benchmark آن را با بیش از یک میلیون tuple در ثانیه به ازای هر گره پردازش می کند. Apache Storm به طور مداوم اجرا می شود و از داده های منابع پیکربندی شده(Spouts)  استفاده می کند و داده ها را از خط پردازش (Bolts) عبور می دهد. ترکیب Spouts و Bolts یک توپولوژی می سازد.

جریان انتزاعی 

Spout یک منبع جریان است. به عنوان مثال ، یک spout  می تواند موضوعات مربوط به Apache Kafka را بخواند و آن ها را به عنوان یک جریان منتشر کند. یک bolt جریان ورودی را مصرف می-کند، پردازش می کند و احتمالا جریان های جدیدی را منتشر می کند. Bolt ها می توانند از طریق توابع در حال اجرا، فیلتر نوارها، انجام جریان های ادغام شده، پیوستن جریان ها، تعامل با بانک های اطلاعاتی و موارد دیگر، هرکاری انجام دهند. هر گره در یک توپولوژی Storm به صورت موازی اجرا می شود. توپولوژی به طور نامحدود اجرا می شود تا زمانی که آن را فسخ کنید. Storm به طور خودکار دوباره وظایف ناموفق را تنظیم می کند. علاوه بر این، Storm عدم از دست دادن اطلاعات را تضمین می کند، حتی اگر دستگاه ها خراب شوند و پیام ها کاهش یابند.

Let us go through the Kafka-Storm integration API’s in detail. There are three main classes to integrate Kafka with Storm. They are as follows –

سه کلاس اصلی برای ادغام Apache Kafka و Storm  وجود دارد. آنها به شرح زیر است:

BrokerHosts – ZkHosts –  StaticHosts

BrokerHosts یک رابط است و ZkHosts و StaticHosts دو پیاده سازی اصلی آن هستند. از ZkHosts برای ردیابی پویا کارگزاران کافکا با حفظ جزئیات در ZooKeeper استفاده می شود، درحالی که از StaticHosts برای تنظیم دستی یا آماری کارگزاران Apache Kafka و جزئیات آن استفاده می شود. ZkHosts راهی ساده و سریع برای دسترسی به کارگزار Apache Kafka است.

دستور ZkHosts به شرح زیر است:

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)


جایی که brokerZkStr میزبان ZooKeeper است و brokerZkPath مسیر ZooKeeper برای حفظ جزئیات کارگزار کافکا است.
 

KafkaConfig API

این API برای تعریف تنظیمات پیکربندی برای خوشه Apache Kafka استفاده می شود. دستور Kafka Config به شرح زیر است:

public KafkaConfig(BrokerHosts hosts, string topic)

Host – BrokerHost ها می توانند ZkHosts  یا  StaticHosts باشند.
Topic – نام موضوع.

SpoutConfig API

Spoutconfig فرمت گسترده ی KafkaConfig است که از اطلاعات اضافی ZooKeeper پشتیبانی می-کند.

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)


• host – BrokerHosts می تواند اجرای هرگونه رابط BrokerHosts باشد.
• topic – نام موضوع.
• zkRoot – مسیر ریشه ZooKeeper .
• id – spout حالت offsetهای مصرفی آن در Zookeeper را ذخیره می کند. شناسه باید به صورت منحصربه فرد spout شما را مشخص کند.
 

SchemeAsMultiScheme

SchemeAsMultiScheme رابطی است که نحوه ی چگونگی تبدیل ByteBuffer مصرفی از کافکا را به storm tuple نشان می دهد. این از MultiScheme گرفته شده است و اجرای کلاس Scheme را می پذیرد. اجراهای کلاس Scheme بسیار زیادی موجود است و یکی از این آن ها StringScheme است که بایت را به عنوان یک رشته ی ساده تجزیه می کند. همچنین نامگذاری قسمت خروجی شما را کنترل می کند. دستور آن به شرح زیر است:

public SchemeAsMultiScheme(Scheme scheme)

• scheme – ByteBuffer مصرفی از کافکا

KafkaSpout API

KafkaSpout پیاده سازی spout ماست، که با Storm ادغام می شود. این پیام ها را از موضوع Apache Kafka دریافت می کند و آن را به عنوان tuple در محیط Storm منتشر می کند. KafkaSpout جزئیات پیکربندی خود را از SpoutConfig دریافت می کند. در زیر یک نمونه کد برای ایجاد یک KafkaSpout ساده قرار گرفته است:

// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts, 
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

ایجادBolt 

Bolt مؤلفه ای است که به tupleها را به عنوان ورودی از دریافت می کند ، آن را پردازش می کند و به عنوان خروجی tupleهای جدیدی تولید می کند. Boltها رابط IRichBolt را پیاده سازی می کنند. در این برنامه از دو Bolt،  WordSplitter-Bolt و WordCounterBolt برای انجام عملیات استفاده می شود.

رابط IRichBolt روش های زیر را دارد:

•Prepare – محیطی را برای اجرای bolt در اختیار شما قرار می دهد. مجریان، این روش را برای آغازدهی به bolt اجرا می کنند.
• Execute – یک دسته از ورودی را پردازش می کند.
• Cleanup – وقتی یک bolt درحال خاموش شدن باشد، وصل می شود.
• declareOutputFields – طرح خروجی tuple را اعلام می کند.
بگذارید SplitBolt.java را ایجاد کنیم، که منطق تقسیم یک جمله به کلمات و CountBolt.java را ایجاد می کند، که آن را برای جدا کردن کلمات منحصر به فرد و شمارش تکرار آن اجرا می کند.
 

SplitBolt.java

import java.util.Map;

import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class SplitBolt implements IRichBolt {
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
      this.collector = collector;
   }
   
   @Override
   public void execute(Tuple input) {
      String sentence = input.getString(0);
      String[] words = sentence.split(" ");
      
      for(String word: words) {
         word = word.trim();
         
         if(!word.isEmpty()) {
            word = word.toLowerCase();
            collector.emit(new Values(word));
         }
         
      }

      collector.ack(input);
   }
   
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }

   @Override
   public void cleanup() {}
   
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
   
}

CountBolt.java

import java.util.Map;
import java.util.HashMap;

import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;

public class CountBolt implements IRichBolt{
   Map<String, Integer> counters;
   private OutputCollector collector;
   
   @Override
   public void prepare(Map stormConf, TopologyContext context,
   OutputCollector collector) {
      this.counters = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple input) {
      String str = input.getString(0);
      
      if(!counters.containsKey(str)){
         counters.put(str, 1);
      }else {
         Integer c = counters.get(str) +1;
         counters.put(str, c);
      }
   
      collector.ack(input);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counters.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
   
   }

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

ارسال به توپولوژی

توپولوژی Storm اساسا یک ساختار صرفه جویی است. کلاس TopologyBuilder روش های ساده و راحتی را برای ایجاد توپولوژی های پیچیده ارائه می دهد. کلاس TopologyBuilder روش هایی برای تنظیم spout (setSpout)  و تنظیم bolt (setBolt)  دارد. درنهایت، TopologyBuilder برای ایجاد توپولوژی، روش createTopology را داراست. روش های shuffleGrouping و fieldsGrouping به تنظیم جریان گروه بندی برای spout و boltها کمک می کند.
Local Cluster  – برای اهداف توسعه، ما می وانیم با استفاده از شی LocalCluster یک خوشه محلی ایجاد کنیم و سپس توپولوژی را با استفاده از روش submitTopology از کلاس LocalCluster ارسال کنیم.
 

KafkaStormSample.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;

public class KafkaStormSample {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
      config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
      String zkConnString = "localhost:2181";
      String topic = "my-first-topic";
      BrokerHosts hosts = new ZkHosts(zkConnString);
      
      SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,    
         UUID.randomUUID().toString());
      kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
      kafkaSpoutConfig.forceFromStart = true;
      kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
      builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
      builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
         
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());

      Thread.sleep(10000);
      
      cluster.shutdown();
   }
}

قبل از کامپایل برنامه، ادغام  Apache Kafka و Apache Storm به کتابخانه ZooKeeper client java نیاز دارد. ویرایشگر نسخه ۲.۹.۱ از Apache Storm نسخه ۰.۹.۵ پشتیبانی می کند (که در این آموزش از آن ها استفاده می کنیم). فایل های زیر را بارگیری کرده و آن را در java class path قرار دهید.
•    curator-client-2.9.1.jar
•    curator-framework-2.9.1.jar
پس از درج فایل های موردنیاز، برنامه را با استفاده از دستور زیر کامپایل کنید:
 

javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java

اجرا
CLI  تولیدکننده ی Apache Kafka را آغاز کنید(توضیح داده شده در فصل قبل)، موضوع جدیدی به نام my-first-topic  ایجاد کرده و چند نمونه پیام را مطابق شکل زیر ارسال کنید:
 

hello
kafka
storm
spark
test message
another test message

حال با استفاده از فرمان زیر برنامه را اجرا کنید:

java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample


خروجی مثال به شکل زیر خواهد بود:
 

storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2

Apache Kafka – ادغام با spark

در این بخش به شرح ادغام Apache Kafka با Spark Streaming API پرداخته شده است.

درباره ی spark

Spark Streaming API پردازش داده های درحال جریان را مقیاس پذیر، با توان گذردهی بالا و حمل پذیری بالا در برابر خطا، می سازد. داده ها را می توان از منابع زیادی مانند Apache Kafka ، فلوم، توییتر و غیره استفاده کرد و می توان با استفاده از الگوریتم های پیچیده مانند توابع سطح بالا مانند map ، reduce ، join و window پردازششان کرد. سرانجام، داده های پردازش شده را می توان به فایل های سیستم، پایگاه داده ها و داشبوردهای جاری منتقل کرد. مجموعه داده های توزیع شده انعطاف پذیر (RDD)‌  یک ساختار داده بنیادی Spark است که مجموعه ای از objectهای توزیع شده ی تغییرناپذیر است. هر مجموعه داده در RDD به بخش های منطقی تقسیم می شود که ممکن است در گره های مختلف خوشه محاسبه شود.

ادغام Apache Kafka با Spark

Apache Kafka یک بستر پیام رسانی و ادغام بالقوه برای جریان Spark است. Apache Kafka  به عنوان قطب اصلی جریان داده های در لحظه عمل می کند و با استفاده از الگوریتم های پیچیده در Spark Streaming پردازش می شود. پس از پردازش داده ها ، Spark Streaming می تواند نتایجی را برای موضوع دیگری از Apache Kafka یا فروشگاه HDFS، پایگاه داده ها یا داشبوردها منتشر کند. نمودار زیر جریان مفهومی را نشان می دهد:

نمودار جریان مفهومی spark

نمودار جریان مفهومی spark

اکنون جزئیات API Kafka-Spark را شرح می دهیم:

SparkConf API

این API نشان دهنده ی پیکربندی یک برنامه Spark است که برای تنظیم پارامترهای مختلف Spark به عنوان جفت ارزش -کلید استفاده می شود.
کلاس SparkConf روشهای زیر را دارد:
 

• set(string key, string value)  – متغیرهای پیکربندی را تنظیم می کند
• remove(string key) – کلید را از پیکربندی خارج می کند
• setAppName(string name) – نام برنامه را برای برنامه شما تنظیم می کند
• get(string key)  – کلید را دریافت می کند
 

StreamingContext API

این API اصلی ترین نقطه ورود برای عملکرد Spark است. SparkContext نشان دهنده ی اتصال به یک خوشه ی Spark است و می تواند برای ایجاد RDD ها ، accumulatorها و broadcast variableها، روی خوشه مورد استفاده قرار گیرد. دستور آن در زیر مشخص شده است:

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)

  • Master – URL خوشه برای اتصال به (e.g. mesos://host:port, spark://host:port, local[4]).
  • appName – نام کار شما، برای نمایش در cluster web UI.
  • batchDuration – فاصله­ی زمانی­ای که جریان داده­ها بین دسته­ها تقسیم می­شود.
  • Master – URL خوشه برای اتصال به (e.g. mesos://host:port, spark://host:port, local[4]).
  • appName – نام کار شما، برای نمایش در cluster web UI.
  • batchDuration – فاصله­ی زمانی­ای که جریان داده­ها بین دسته­ها تقسیم می­شود.
public StreamingContext(SparkConf conf, Duration batchDuration)

   
با ایجاد تنظیمات لازم برای SparkContext جدید ، یک StreamingContext ایجاد کنید
• conf – پارامترهای spark
• batchDuration –  بازه زمانی که در آن داده های جریان به دسته ها تقسیم می شوند

KafkaUtils API

API KafkaUtils برای اتصال خوشه Apache Kafka به جریان Spark استفاده می شود. این API دارای روش قابل توجهی است که دستور زیر را ایجاد می کند:

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

روش فوق نشان داده شده است برای ایجاد یک جریان ورودی که پیام ها را از کارگزاران Apache Kafka بیرون می کشد.
• ssc –  جسم StreamingContext.
• zkQuorum – quorum Zookeeper
• groupId – شناسه گروه برای مصرف کننده.
• topics – نقشه ای از موضوعات را برای استفاده بازمی گرداند.
• storeLevel – سطح ذخیره سازی برای ذخیره اشیای دریافتی استفاده می شود.
KafkaUtils API یک روش دیگر  createDirectStream را دارد، که برای ایجاد یک جریان ورودی که بطور مستقیم پیام ها را از کارگزاران کافکا بدون استفاده از گیرنده بیرون می کشد، استفاده می شود. این جریان می تواند تضمین کند که هر پیام از Apache Kafka دقیقا یک بار در تبادلات قرار خواهد گرفت.
برنامه ی نمونه در Scala انجام می شود. برای کامپایل برنامه، لطفاًsbt ، ابزار ایجاد scala  را بارگیری و نصب کنید. کد برنامه اصلی در زیر ارائه شده است:

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

Build Script

ادغام spark-kafka  به spark، جریان spark و spark Kafka integration jar بستگی دارد. یک فایل جدید build.sbt ایجاد کنید و جزئیات برنامه و وابستگی آن را مشخص کنید. sbt در ضمن ایجاد برنامه، jar لازم را بارگیری می کند.

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

Compilation / Packaging

دستور زیر را برای کامپایل و بسته بندی فایل jar برنامه اجرا کنید. برای اجرای برنامه باید پرونده jar را در spark console  ارسال کنیم.

Submiting to Spark

مانند بخش های گذشته CLI تولیدکننده را آغاز کرده، مبحث جدیدی با عنوان my-first- topic ایجاد کنید و مانند زیر پیام های نمونه ای ارسال کنید.

Another spark test message

دستور زیر را برای ارسال برنامه به spark console اجرا کنید:

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

نمونه خروجی این برنامه در زیر نشان داده شده است:

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..

Apache Kafka –  twitter 

پیش از این، ما شاهد ادغام Storm و Spark با Apache Kafka بودیم. در هر دو سناریو، ما یک تولیدکننده ی کافکا (با استفاده از CLI) برای ارسال پیام به اکوسیستم Apache Kafka ایجاد کردیم. سپس، شبکه Storm و Spark پیام ها را با استفاده از مصرف کننده ی Apache Kafka می خوانند و آن ها را به ترتیب وارد اکوسیستم Storm و Spark می کنند. بنابراین ، عملاً باید تولید کننده ی Apache Kafka را با ویژگی های زیر ایجاد کنیم:
• فیدهای توییتر را با استفاده از“Twitter Streaming API”  بخواند،
• فیدها را پردازش کند،
• هشتگ ها را استخراج کند و آن ها را به Apache Kafka بفرستد.
پس از دریافت هشتگ ها توسط Apache Kafka ، ادغام Storm / Spark اطلاعات را دریافت کرده و آن را به اکوسیستم Storm / Spark می فرستد.

Twitter Streaming API

 “Twitter Streaming API”  در هر زبان برنامه نویسی ای قابل دسترسی است. “twitter4j”  یک کتابخانه جاوا غیر رسمی و منبعی آزاد است که یک ماژول مبتنی بر جاوا را برای دسترسی آسان به “Twitter Streaming API”  فراهم می کند. “twitter4j”  یک چارچوب مبتنی بر شنونده را برای دسترسی به توییت ها فراهم می کند. برای دسترسی به “Twitter Streaming API”، باید وارد حساب کاربری توسعه دهنده ی توییتر شوید و جزئیات تأیید اعتبار OAuth زیر را بدست آورید.
• Customerkey
• CustomerSecret
• AccessToken
• AccessTookenSecret
پس از ایجاد حساب توسعه دهنده، فایل های jar “twitter4j”را بارگیری کرده و آن را در مسیر کلاس جاوا قرار دهید.
کد دستوری کامل سازنده توییتر کافکا (KafkaTwitterProducer.java)  در زیر ذکر شده است:
 

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.*;
import twitter4j.conf.*;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTwitterProducer {
   public static void main(String[] args) throws Exception {
      LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
      
      if(args.length < 5){
         System.out.println(
            "Usage: KafkaTwitterProducer <twitter-consumer-key>
            <twitter-consumer-secret> <twitter-access-token>
            <twitter-access-token-secret>
            <topic-name> <twitter-search-keywords>");
         return;
      }
      
      String consumerKey = args[0].toString();
      String consumerSecret = args[1].toString();
      String accessToken = args[2].toString();
      String accessTokenSecret = args[3].toString();
      String topicName = args[4].toString();
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);

      ConfigurationBuilder cb = new ConfigurationBuilder();
      cb.setDebugEnabled(true)
         .setOAuthConsumerKey(consumerKey)
         .setOAuthConsumerSecret(consumerSecret)
         .setOAuthAccessToken(accessToken)
         .setOAuthAccessTokenSecret(accessTokenSecret);

      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
      StatusListener listener = new StatusListener() {
        
         @Override
         public void onStatus(Status status) {      
            queue.offer(status);

            // System.out.println("@" + status.getUser().getScreenName() 
               + " - " + status.getText());
            // System.out.println("@" + status.getUser().getScreen-Name());

            /*for(URLEntity urle : status.getURLEntities()) {
               System.out.println(urle.getDisplayURL());
            }*/

            /*for(HashtagEntity hashtage : status.getHashtagEntities()) {
               System.out.println(hashtage.getText());
            }*/
         }
         
         @Override
         public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
            // System.out.println("Got a status deletion notice id:" 
               + statusDeletionNotice.getStatusId());
         }
         
         @Override
         public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            // System.out.println("Got track limitation notice:" + 
               num-berOfLimitedStatuses);
         }

         @Override
         public void onScrubGeo(long userId, long upToStatusId) {
            // System.out.println("Got scrub_geo event userId:" + userId + 
            "upToStatusId:" + upToStatusId);
         }      
         
         @Override
         public void onStallWarning(StallWarning warning) {
            // System.out.println("Got stall warning:" + warning);
         }
         
         @Override
         public void onException(Exception ex) {
            ex.printStackTrace();
         }
      };
      twitterStream.addListener(listener);
      
      FilterQuery query = new FilterQuery().track(keyWords);
      twitterStream.filter(query);

      Thread.sleep(5000);
      
      //Add Kafka producer config settings
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("acks", "all");
      props.put("retries", 0);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      int i = 0;
      int j = 0;
      
      while(i < 10) {
         Status ret = queue.poll();
         
         if (ret == null) {
            Thread.sleep(100);
            i++;
         }else {
            for(HashtagEntity hashtage : ret.getHashtagEntities()) {
               System.out.println("Hashtag: " + hashtage.getText());
               producer.send(new ProducerRecord<String, String>(
                  top-icName, Integer.toString(j++), hashtage.getText()));
            }
         }
      }
      producer.close();
      Thread.sleep(5000);
      twitterStream.shutdown();
   }
}

Compilation

برنامه را با استفاده از دستور زیر کامپایل کنید:

javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java

Execution

دو کنسول را باز کنید. برنامه کامپایل شده فوق را مطابق شکل زیر در یک کنسول اجرا کنید:

java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food

هر یک از برنامه های Spark / Storm را که در فصل قبل توضیح داده شده است، در یکی دیگر از پنجره ها توضیح دهید. نکته اصلی قابل ذکر این است که موضوع مورد استفاده در هر دو مورد باید یکسان باشد. در اینجا، ما از “my-first-topic”  به عنوان نام موضوع استفاده کرده ایم.

Output

خروجی این برنامه به کلمات کلیدی و فید فعلی توییتر بستگی دارد. خروجی نمونه در زیر مشخص شده است (ادغام storm).

food : 1
foodie : 2
burger : 1
. . .

Apache Kafka – ابزارها

ابزار Apache Kafka تحت عنوان org.apache.kafka.tools  بسته بندی شده است. این ابزارها به ابزارهای سیستم و ابزارهای تکثیر طبقه بندی می شوند.

ابزار های سیستم در Apache Kafka

ابزارهای سیستم را می توان از خط فرمان با استفاده از run class script  اجرا کرد، که دستور آن به شرح زیر است:

bin/kafka-run-class.sh package.class - - options

برخی از این ابزارها در زیر آورده شده اند:

•    Kafka Migration Tool – این ابزار برای انتقال یک کارگزار از نسخه ی به نسخه ی دیگر استفاده می شود.
•    Mirror Maker – این ابزار برای منعکس کردن یک خوشه ی Apache Kafka در خوشه ی دیگر کاربرد دارد.
•    Consumer Offset Checker – این ابزار گروه مصرف کننده، موضوع، بخش ها، offset ها، logsize، مالک مجموعه ی مشخصی از موضوعات و گروه مصرف کننده را نمایش می دهد.
 

ابزارهای تکثیر در Apache Kafka

ابزارهای تکثیر در Apache Kafka شامل ابزارهای طراحی سطح بالا می باشند. هدف از افزودن این ابزارها ارائه ی دوام و در دسترس بودن بیشتر است. برخی از ابزارهای تکثیر در زیر ذکر شده است :
•    Create Topic Tool  – این ابزار، موضوع با تعداد مشخصی بخش و فاکتور تکثیر ایجاد می کند و از طرح پیش فرض کافکا برای انجام replica assignment  استفاده می کند.
•    List Topic Tool – این ابزار اطلاعات مربوط به لیست مشخصی از موضوعات را فهرست می کند. اگر هیچ موضوعی در خط فرمان ارائه نشده باشد، ابزار برای گرفتن همه ی موضوعات و لیست اطلاعات مربوط به آنها، از Zookeeper گزارش می گیرد. زمینه هایی که ابزار نمایش می دهد نام موضوع، بخش، رهبر ، replicaها و غیره است.
•    Add Partition Tool – در ایجاد یک موضوع، باید تعداد بخش های موضوع مشخص شود. سپس ممکن است با افزایش حجم موضوع بخش های بیشتری برای آن مورد نیاز باشد. این ابزار به اضافه کردن بخش های بیشتر برای یک موضوع خاص کمک می کند و همچنین به بخش های اضافه شده قابلیت replica assignment دستی اعطا می کند.
 

Apache Kafka – برنامه ها

Apache Kafka از بسیاری از بهترین اپلیکیشن های صنعتی امروزی پشتیبانی می کند. ما در این فصل به اختصار به برخی از قابل توجه ترین اپلیکیشن های کافکا خواهیم پرداخت:

Twitter
twitter یک سرویس شبکه های اجتماعی آنلاین است که بستری را برای ارسال و دریافت tweetهای کاربران فراهم می کند. کاربران ثبت نام شده می توانند توییت ها را بخوانند و آن ها را ارسال کنند، اما کاربران ثبت نام نشده فقط می توانند توییت ها را بخوانند. توییتر از Storm-Kafka به عنوان بخشی از زیرساخت های پردازش جریان آن استفاده می کند.
LinkedIn
Apache Kafka در LinkedIn برای داده های جریان فعالیت و معیارهای عملیاتی استفاده می شود. سیستم پیام رسان Kafka با استفاده از محصولات مختلف مانند LinkedIn Newsfeed ، برای استفاده از پیام رسانی آنلاین و سیستم های تحلیل آفلاین مانند Hadoop، به این برنامه کمک می کند. قابلیت دوام قوی کافکا نیز یکی از عوامل اصلی ارتباط آن با LinkedIn است.
Netflix
Netflix یک رسانه اینترنتی تقاضامحور در آمریکاست که از کافکا برای سیستم نظارت در لحظه و پردازش رویدادها استفاده می کند.
Mozilla
Mozilla یک نرم افزار مرورگر رایگان است که در سال 1998 توسط اعضای Netscape ایجاد شده است. کافکا به زودی جایگزین بخشی از سیستم تولید فعلی موزیلا را برای جمع آوری داده های عملکرد و داده های مربوط به میزان استفاده از مرورگر توسط کاربر نهایی برای پروژه هایی مانند Telemetry ، Test Pilot و غیره، خواهد شد.
Oracle
Oracle از محصول Enterprise Service Bus خود با نام OSB (Oracle Service Bus) اتصال به بومی را به کافکا فراهم می کند که به توسعه دهندگان این امکان را می دهد که برای اجرای خطوط داده ی مرحله ای، از قابلیت های میانجی داخلی OSB استفاده کنند.

دیدگاهتان را بنویسید