Apache Kafka یک سیستم ارسال پیام انتشار-اشتراک و یک صف قویست، که می تواند حجم بالایی از داده ها را در اختیار داشته باشد و شما را قادر می سازد تا پیام ها را از یک نقطه به نقطه ی دیگر منتقل کنید. کافکا مناسب استفاده برای هر دو پیام های آفلاین و آنلاین است. پیام های کافکا بر روی دیسک منتشر می شوند و در خوشه های مشخص همانند سازی می شوند .
معرفی Apache Kafka
در Big Data، حجم عظیمی از داده ها استفاده میشود. با توجه به این داده ها ، ما با دو چالش اصلی روبه رو هستیم. چالش اول نحوه جمع آوری این حجم زیاد از داده ها و چالش دوم تجزیه وتحلیل این داده های جمع آوری شده می باشد. برای غلبه بر این چالش ها، شما به یک سیستم پیامرسانی نیاز دارید.
Apache Kafka برای سیستم های توسعه یافته با توانایی گذردهی بالا، طراحی شده است. کافکا تمایل دارد جایگزین مناسبی برای پیامرسان های سنتی شود. در مقایسه با سایر سیستم های پیامرسانی، کافکا از توان گذردهی بهتر، تقسیم بندی داخلی، تکثیر و تحمل خطای ذاتی برخوردار است، که موجب مناسب بودن آن برای برنامه های پردازش پیام در مقیاس بزرگ میشود.
سیستم پیام رسانی چیست؟
یک سیستم پیامرسانی مسئول انتقال داده ها از یک برنامه به برنامه دیگر میباشد، بنابراین برنامه ها میتوانند بدون نگرانی درمورد نحوه ی اشتراک آنها روی داده ها متمرکز شوند. اساس پیام رسانی توسع هیافته، مفهوم صف بندی قابل اعتماد پیام است. پیام ها به طور غیرهمزمان بین برنامه های مشتری و سیستم پیام رسانی قرار می گیرند. دو نوع الگوی پیام رسانی وجود دارد؛ یکی نقطه به نقطه و دیگری سیستم پیام رسانی انتشار- اشتراک (pub-sub).
بسیاری از الگوهای پیام رسانی از سیستم pub-sub پیروی میکنند.
سیستم پیام رسانی نقطه به نقطه
در یک سیستم نقطه به نقطه، پیام ها در یک صف انتشار می یابند. یک یا چند مصرف کننده میتوانند از پیام های موجود در صف استفاده کنند، اما یک پیام خاص فقط توسط یک مصرف کننده قابل استفاده است. هنگامی که یک پیام از صف توسط یک مشتری خوانده شود، پیام از آن صف ناپدید میشود. نمونه ی بارز این سیستم یک سیستم پردازش سفارش است، که در آن هر سفارش توسط یک پردازنده سفارش پردازش میشود، اما پردازنده های چند منظورهی سفارش میتوانند همزمان چند سفارش را پردازش کنند. نمودار زیر ساختار این سیستم ها را نشان میدهد.
سیستم پیام رسان نقطه به نقطه
سیستم پیام رسانی انتشار و اشتراک (publish-subscribe)
در سیستم انتشار و اشتراک، پیامها در یک موضوع انتشار می یابند. برخلاف سیستم نقطه به نقطه، مصرف کنندگان می توانند مشترک یک یا چند موضوع شوند و از تمام پیام های موجود در آن موضوع استفاده کنند. در این سیستم، به تولید کنندگان پیام ناشر (publisher) و به مصرف کنندگان پیام مشترک (subscriber) گفته میشود. نمونهای از این سیستم ها در زندگی واقعی، تلویزیون دیش (Dish TV) است، که کانال های مختلفی از جمله ورزش، فیلم، موسیقی و … را منتشر میکند و هرکسی می تواند در مجموعه کانال های موردنظر خود مشترک شود و هر زمان که کانال های موردنظر در دسترس باشند از آنها استفاده کنند.
Apache Kafka چیست؟
Apache Kafka یک سیستم ارسال پیام انتشار-اشتراک و یک صف قویست، که میتواند حجم بالایی از داده ها را در اختیار داشته باشد و شما را قادر می سازد تا پیام ها را از یک نقطه به نقطه ی دیگر منتقل کنید. کافکا مناسب استفاده برای هر دو پیام های آفلاین و آنلاین است. پیام های کافکا بر روی دیسک منتشر میشوند و در خوشه های مشخص همانند سازی می شوند تا از هدر رفتن داده جلوگیری شود. کافکا در بالای سرویس هماهنگ سازی ZooKeeper ساخته شده است. این سیستم به خوبی با Apache Storm و Spark Apache برای تجزیه و تحلیل جریان داده ها در لحظه، ادغام شده است.
مزیتهای Apache Kafka
در زیر چند مزیت Apache Kafka آورده شده است:
• قابلیت اطمینان – کافکا توزیع، تقسیم و تکثیر شده است و دارای تلرانس خطا می باشد.
• مقیاس پذیری – سیستم پیام رسانی کافکا به راحتی و بدون خرابی مقیاس می کند.
• دوام – Apache Kafka از سیستم ثبت توسعه یافته، استفاده میکند و این به این معنی است که پیام ها در سریعترین زمان ممکن روی دیسک منتشر میشوند، از این رو دوام دارند.
• عملکرد – کافکا توانایی گذردهی بالایی هم برای انتشار و هم برای ارسال پیام دارد. این توانایی، عملکرد پایدار را حفظ می کند حتی بسیاری از سل پیام ها نیز ذخیره می شوند.
کافکا بسیار سریع است و خرابی و از دست رفتن اطلاعات صفر را تضمین می کند.
موارد استفاده ی Apache Kafka
Apache Kafka را می توان در بسیاری از موارد استفاده کرد. برخی از آنها در زیر فهرست شده اند:
• اندازه گیری ها – Apache Kafka اغلب برای داده های نظارت عملیاتی استفاده میشود. این شامل جمع آوری آماری از برنامه های توزیع شده برای تولید متمرکز داده های عملیاتی است.
• Log Aggregation Solution – از کافکا می توان در یک سازمان برای جمع آوری اطلاعات مربوط به چندین سرویس و در دسترس قرار دادن آنها، در قالب استاندارد، برای مصرف کنندگان مختلف، استفاده کرد.
• پردازش جریان – چارچوب های محبوب مانند Storm و Spark Streaming، داده ها را از یک موضوع میخوانند، آن را پردازش میکنند ، و دادههای پردازش شده را برای موضوع جدید ارسال می کنند که در دسترس کاربران و برنامه های کاربردی قرار می گیرد. دوام قوی کافکا نیز در زمینه پردازش جریان بسیار مفید است.
نیاز به Apache Kafka
Apache Kafka یک پلتفرم یک پارچه برای پردازش کلیه اطلاعات در لحظه میباشد. Apache Kafka از ارسال پیام با تاخیر کم پشتیبانی میکند و در صورت بروز خرابی دستگاه، تلرانس خطا را تضمین میکند. کافکا توانایی اداره ی تعداد زیاد مصرفکنندگان متنوع را داراست و بسیار سریع است، به طوری که در ثانیه ۲ میلیون انتشار دیتا را انجام می دهد. کافکا تمام داده ها را بر روی دیسک منتشر میکند، که در واقع به این معنی است که همه نوشته ها به حافظه نهان صفحه OS (RAM) منتقل می شوند. این امر انتقال اطلاعات از حافظه نهان صفحه به سوکت شبکه را بسیار کارآمد می کند.
اصول Apache Kafka
قبل از عمیق شدن در مبحث Apache Kafka ، باید با اصطلاحات اصلی مانند موضوعات (topics)، کارگزاران (brokers)، تولیدکنندگان (producers) و مصرفکنندگان (consumers) آشنا شوید. نمودار زیر اصطلاحات اصلی را نشان میدهد و جدول، اجزای نمودار را با جزئیات توضیح میدهد.
در نمودار فوق، یک موضوع (topic) در سه بخش تنظیم شده است. بخش۱ دارای دو عامل offset، ۰ و ۱ میباشد. بخش۲، چهار عامل offset، ۰، ۱، ۲ و ۳ دارد و بخش ۳ تنها یک عامل offset، صفر دارد. شناسه ی replica همان شناسه ی سرور میزبان است.
فرض کنید، اگر ضریب تکثیر موضوع روی ۳ تنظیم شود،Apache Kafka ۳ replica یکسان از هر بخش ایجاد میکند و آنها را در خوشه قرار میدهد تا برای همه ی عملیات های مربوط به آن در دسترس باشد. برای تعادل بار در خوشه، هر کارگزار (broker) یک یا چند قسمت از آن بخشها را ذخیره میکند. تولیدکنندگان و مصرفکنندگان متعدد میتوانند همزمان پیامها را منتشر و بازیابی کنند.
جدول اجزای Apache Kafka
ردیف | اجزاء و توضیحات |
---|---|
۱ | موضوع (topic)به جریانی از پیامهای متعلق به یک دسته خاص، موضوع (topic) گفته میشود. دادهها در موضوع ها (topics) ذخیره می شوند.topicها به بخش ها (partitions) تقسیم می شوند. برای هر موضوع، کافکا یک mini-mum از یک پارتیشن را نگه میدارد. هر بخش این چنینی، پیام ها را در یک ترتیب درخواستی تغییرناپذیر نگه می دارد؛ این بخش به عنوان مجموعه ای از فایل های این ترتیب با اندازههای برابر اجرا میشود. |
۲ | بخش (partition)هر موضوع میتواند چند بخش داشته باشد، بنابراین می تواند یک مقدار دلخواه از داده را اداره کند. |
۳ | Offset بخشهر پیام تقسیم شده یک شناسه ی ترتیبی منحصربفرد، به نام offset دارد. |
۴ | Replicaهای هر بخشReplicaها به عنوان backup هر بخش، هرگز داده ها را نمی خوانند یا نمی نویسند و از آنها برای جلوگیری از، از دست دادن داده استفاده میشود. |
۵ | کارگزاران (Brokers)کارگزاران در Apache Kafka ، سیستمهای ساده ای هستند که وظیفه حفظ داده های منتشر شده را دارند. هر کارگزار میتواند در هر موضوع (topic) چند بخش (partition) داشته باشد یا کلا نداشته باشد. فرض کنید، اگر یک موضوع دارای n بخش باشد و به تعداد n، کارگزار در دسترس باشد، به هر کارگزار یک بخش تعلق میگیرد. فرض کنید اگر یک موضوع دارای n بخش باشد و بیش از n کارگزار (n+m) دردسترس باشد، به n کارگزار اول یک بخش تعلق گرفته و به m کارگزار بعدی هیچ بخشی در آن موضوع خاص ارائه نخواهد شد. فرض کنید اگر یک موضوع دارای n بخش باشد و کمتر از n کارگزار (n-m) دردسترس باشد، به هر کارگزار یک یا چند بخش تعلق میگیرد. این سناریو به دلیل عدم تقسیم بار ناعادلانه در بین کارگزاران توصیه نمی شود. |
۶ | Kafka Cluster (خوشه)در Apache Kafka داشتن بیش از یک کارگزار به عنوان خوشه کافکا نامیده میشود. خوشه ی کافکا بدون خرابی قابل گسترش است. از این خوشه ها برای مدیریت پایداری و تکثیر داده های پیام استفاده میشود. |
۷ | تولیدکنندهتولیدکنندگان درواقع ناشران پیام های یک یا چند موضوع Apache Kafka هستند. تولید کنندگان داده ها را به کارگزاران کافکا می فرستند. هر بار که یک تولیدکننده، پیامی را به یک کارگزار ارسال میکند، کارگزار به سادگی پیام را به آخرین پرونده ی بخش اضافه میکند. در واقع، پیام به یک بخش اضافه میشود. تولیدکننده همچنین میتواند به بخشهای مورد انتخاب خود نیز پیام ارسال کند. |
۸ | مصرفکنندهمصرف کنندگان داده های کارگزاران را میخوانند. مصرف کنندگان در یک یا چند موضوع مشترک میشوند و با گرفتن داده ها از کارگزاران، پیام های منتشر شده را استفاده میکنند. |
۹ | رهبررهبر گرهی مسئول خواندن و نوشتن همه ی بخشهای مشخص شده است. هر بخش دارای یک سرور است که به عنوان یک رهبر عمل میکند. |
۱۰ | دنبال کنندهگره ایست که از دستورالعمل های رهبر پیروی میکند. اگر رهبر شکست بخورد، یکی از دنبال کنندگان به طور خودکار رهبر جدید خواهد شد. یک دنبال کننده به عنوان مصرف کننده عادی عمل کرده و با کشیدن پیامها به سمت خود ذخیره ی داده های خود را به روز میکند. |
Apache Kafka – معماری خوشه
در Apache Kafka داشتن بیش از یک کارگزار به عنوان خوشه کافکا نامیده میشود. خوشه ی کافکا بدون خرابی قابل گسترش است. از این خوشه ها برای مدیریت پایداری و تکثیر داده های پیام استفاده میشود. در این بخش به تشریح معماری خوشه میپردازیم.
تصویر زیر نمودار خوشه Apache Kafka را نشان میدهد.
نمودار خوشه Apache Kafka
جدول زیر هریک از مولفه های نشان داده شده در نمودار بالا را تشریح میکند.
جدول اجزای خوشه Apache Kafka
ردیف | اجزا و توضیحات |
---|---|
۱ | Broker یا کارگزارمعمولا خوشه ی Apache Kafka برای حفظ تعادل بار دیتا از چندین کارگزار تشکیل شده است. کارگزاران کافکا بدون تابعیت هستند، بنابراین از ZooKeeper برای حفظ تابعیت خوشه خود استفاده می کنند. یک نمونه کارگزار کافکا میتواند صدها هزار خواندن و نوشتن در هر ثانیه را کنترل کند و میتواند سل پیام های خود را بدون ایجاد اختلال در عملکرد، کنترل کنند. انتخاب رهبر کارگزار کافکا توسط ZooKeeper انجام می شود |
۲ | ZooKeeperZooKeeper برای مدیریت و هماهنگی کارگزار Apache Kafka استفاده میشود. خدمات ZooKeeper به طور عمده برای آگاه ساختن تولیدکننده و مصرفکننده در مورد حضور هر کارگزار جدید یا عدم موفقیت آن در سیستم Apache Kafka استفاده میشود. از این طریق، تولیدکننده و مصرفکننده در مورد هماهنگی کار خود با هر کارگزار تصمیم میگیرند. |
۳ | Producers یا تولیدکنندهتولید کنندگان داده ها را به سمت کارگزاران ارسال میکنند. با شروع کار کارگزار جدید، همه تولید کنندگان آن را جستجو کرده و به طور خودکار پیامی را برای آن ارسال میکنند. تولیدکننده در Apache Kafka بدون انتظار برای تایید از طرف کارگزار، به همان سرعتی که کارگزار بتواند رسیدگی کند، پیام ها را ارسال میکند. |
۴ | Consumers یا مصرف کنندهاز آنجا که کارگزاران Apache Kafka بدون تابعیت هستند، مصرف کننده باید با استفاده از offset بخش مورد استفاده، تعداد پیام های مصرف شده را حفظ کند. اگر مصرف کننده پیام خاصی را تایید کند، به این معنی است که مصرف کننده تمام پیام های قبلی را استفاده کرده است. برای در دسترس بودن پیام های موردنظر برای مصرف، مصرف کننده به صورت غیر همزمان درخواست ارسال پیام را به کارگزار صادر می کند. مصرف کنندگان میتوانند به سادگی با تهیه یک مقدار offset، به هر نقطه از یک بخش برگردند یا از آن گذر کنند. ارزش offst مصرف کننده توسط ZooKeeper اطلاع داده میشود. |
Apache Kafka – جریان کار
Kafka هر دو سیستم پیام رسانی pub-sub و صف را به صورت سریع، قابل اعتماد، بادوام و همراه با تلرانس خطا و خرابی صفر فراهم میکند. در هر دو مورد، تولیدکنندگان به سادگی پیام را به یک موضوع ارسال میکنند و مصرفکننده میتواند بسته به نیاز خود، هر نوع سیستم پیام رسانی را انتخاب کند. نحوهی انتخاب سیستم پیامرسانی توسط مصرفکننده در ادامه توضیح داده شده است.
مباحثی که تاکنون مطرح شده در مورد مفاهیم اصلی Apache Kafka بوده است، در این قسمت به بررسی جریان کار Apache Kafka میپردازیم.
Apache Kafka مجموعه ای از موضوعات (topics) است که به یک یا چند بخش تقسیم شده است. یک بخش در Apache Kafka یک دنباله خطی از پیام هاست که در آن هر پیام توسط اندیس آن (تحت عنوان offset) شناخته میشود. تمام داده های موجود در یک خوشه Apache Kafka، اتحادیه ی مجزای بخش ها هستند. پیامهای دریافتی در انتهای یک بخش نوشته شده و پیام ها به طور متوالی توسط مصرف کنندگان خوانده میشوند. دوام سیستم با تکثیر پیام ها به کارگزاران مختلف فراهم میگردد.
Apache Kafka هر دو سیستم پیام رسانی pub-sub و صف را به صورت سریع، قابل اعتماد، بادوام و همراه با تلرانس خطا و خرابی صفر فراهم میکند. در هر دو مورد، تولیدکنندگان به سادگی پیام را به یک موضوع (topic) ارسال میکنند و مصرف کننده میتواند بسته به نیاز خود، هر نوع سیستم پیامرسانی را انتخاب کند. نحوه ی انتخاب سیستم پیامرسانی توسط مصرف کننده در ادامه توضیح داده شده است.
جریان کار سیستم پیامرسانی pub-sub در Apache Kafka
در زیر مراحل جریان کار سیستم پیامرسانی Pub-Sub در Apache Kafka آورده شده است:
- تولید کنندگان در فواصل منظم به موضوع، پیام ارسال میکنند.
- کارگزار کافکا تمام پیامها را در بخشهایی که برای موضوع خاص موردنظر تنظیم شده است، ذخیره میکند. از این طریق تقسیم مساوی پیام ها بین بخشها تضمین میشود. اگر تولیدکننده دو پیام ارسال کند و دو بخش موجود باشد، کافکا یک پیام را در بخش اول و پیام دوم را در بخش دوم ذخیره میکند.
- مشتری در یک موضوع خاص مشترک میشود.
- هنگامی که مصرف کننده در یک موضوع مشترک شد، کافکا offset فعلی موضوع را در اختیار مصرف کننده قرار میدهد و همچنین در مجموعه Zookeeper ، offset را ذخیره میکند.
- مصرف کننده از کافکا در فواصل زمانی منظم (مثلا هر ۱۰۰ Ms) برای پیامهای جدید درخواست میکند.
- هنگامی که کافکا پیامهای تولیدکنندگان را دریافت کرد، آنها را برای مصرفکنندگان ارسال میکند.
- مصرف کننده پیام را دریافت کرده و آن را پردازش میکند.
- پس از پردازش پیامها، مصرف کننده یک تاییدیه برای کارگزار کافکا ارسال میکند.
- کافکا به محض دریافت تأییدیه، offset را به مقدار جدید تغییر داده و آن را در Zookeeper به روز میکند. از آنجا که offsetها در Zookeeper نگهداری میشوند، مصرف کننده میتواند پیام بعدی را به درستی، حتی در حین در دسترس نبودن سرور، بخواند.
• جریان فوق تا زمانی که مصرف کننده درخواست را متوقف کند تکرار خواهد شد.
• مصرف کننده این انتخاب را دارد که در هر زمان از پیام های یک موضوع عبور کرده و تمام پیام های بعدی را بخواند.
جریان کار سیستم پیام رسانی صف (گروه مصرف کننده) در Apache Kafka
در سیستم پیامرسانی صف در Apache Kafka به جای یک مصرف کننده ی واحد، گروهی از مصرف کنندگان که دارای شناسه گروه یکسان هستند، در یک موضوع مشترک میشوند. به زبان ساده، مصرف کنندگانی که در یک موضوع با شناسه گروه یکسان، مشترک میشوند، به عنوان یک گروه واحد در نظر گرفته شده و پیام ها در میان آنها به اشتراک گذاشته میشود. در زیر گردش کار واقعی این سیستم مورد بررسی واقع شده است:
- تولید کنندگان در فواصل زمانی مشخص پیامها را به یک موضوع ارسال میکنند.
- مانند سیستم قبل، کافکا تمامی پیام ها را در یک بخش مشخص از آن موضوع خاص، ذخیره میکند.
- یک مصرف کنندهی واحد مشترک یک موضوع خاص میشود، فرضا موضوع با شناسه ی Topic-01 با شناسه گروه Group-1.
- تا زمانی که مصرف کنندهی جدید به موضوع، با شناسه ی Topic-01 با شناسه گروه Group-1، مشترک شود کافکا به همان روش سیستم Pub-Sub با مصرف کنندگان تعامل میکند.
- به محض ورود مصرف کننده ی جدید، کافکا عملیات خود را برای به اشتراک گذاشتن حالت تغییر داده و داده ها را بین دو مصرف کننده به اشتراک میگذارد. این اشتراک گذاری ادامه خواهد یافت تا زمانی که تعداد مشترکین به تعداد بخش تنظیم شده برای آن موضوع خاص برسد.
- به محض گذشتن تعداد مصرف کنندگان از تعداد بخش ها، تا زمانی که مصرف کنندگان موجود اشتراک خود را لغو کنند مصرف کنندگان جدید هیچ پیامی دریافت نخواهند کرد. این سناریو به وجود می آید زیرا به هر مصرف کننده در کافکا حداقل یک بخش اختصاص داده میشود و به محض اینکه همه بخشها به مصرف کنندگان موجود اختصاص داده شود، مصرف کنندگان جدید باید منتظر بمانند.
- این مشخصه به عنوان گروه مصرف کننده نیز خوانده میشود.
نقش ZooKeeper در Apache Kafka
وابستگی اساسی Apache Kafka ، Apache Zookeeper می باشد که یک سرویس پیکربندی و همگام سازی توزیع شده می باشد. این سرویس به عنوان رابط هماهنگی بین کارگزاران کافکا و مصرف کنندگان فعالیت می کند. سرورهای کافکا اطلاعات را از طریق یک خوشه Zookeeper به اشتراک میگذارند. کافکا ابرداده های اساسی مانند اطلاعات مربوط به موضوعات (topics)، کارگزاران، offsetهای مصرف کنندگان (صف خوان) و غیره را در Zookeeper ذخیره میکند.
از آنجا که تمام اطلاعات ضروری در Zookeeper ذخیره میشوند و آن به طور معمول این دادهها را در مجموعه خود تکثیر میکند، عدم موفقیت کارگزار کافکا یا Zookeeper بر وضعیت(state) خوشه کافکا تاثیر نمیگذارد. هنگامی که Zookeeper دوباره شروع به کار کند، کافکا وضعیت (state) را احیا خواهد کرد. این فعالیتها خرابی صفر را برای کافکا به ارمغان میآورند. در صورت عدم موفقیت رهبر، انتخاب رهبر بین کارگزاران کافکا نیز با استفاده از Zookeeper انجام می شود.
Apache Kafka – مراحل نصب
مرحله ی ۱ – تأیید نصب جاوا
برای نصب Apache Kafka ، با فرض این که نرمافزار جاوا را روی دستگاه خود نصب کرده باشید، فقط با استفاده از دستور زیر آن را تأیید کنید.
$ java -version
اگر جاوا با موفقیت در دستگاه شما نصب شده باشد، می توانید نسخه ی نصب شده را مشاهده کنید.
مرحله ی ۱.۱ – دانلود JDK
در صورتی که نرم افزار جاوا را دانلود نکرده اید، لطفاً با مراجعه به لینک زیر، آخرین نسخه JDK را بارگیری کنید.
هم اکنون آخرین نسخه JDK 8u 60، با شماره فایل((jdk-8u60-linux-x64.tar.gz)) است. لطفا فایل را در دستگاه خود دانلود کنید.
مرحله ی ۱.۲ – استخراج فایل ها
بطور کلی فایل های دانلود شده در پوشه ی دانلودها ذخیره می شوند، آن را تأیید کرده و با استفاده از دستورات زیر، برنامه ی نصب tar را استخراج کنید.
$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz
مرحله ی ۱.۳ – به Opt Directory بروید
برای اینکه جاوا در دسترس همه کاربران قرار گیرد، محتوای جاوا استخراج شده را به پوشه usr/local/java/ انتقال دهید.
$ su
password: (type password of root user)
$ mkdir /opt/jdk
$ mv jdk-1.8.0_60 /opt/jdk/
مرحله ی ۱.۴ – تنظیم path
برای تنظیم path و متغیرهای JAVA_HOME ، دستورات زیر را به پرونده ~/.bashrc اضافه کنید.
export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin
اکنون تمام تغییرات را در سیستم عامل جاری اعمال کنید.
$ source ~/.bashrc
مرحله ی ۱.۵ – گزینه های جاوا
برای تغییر گزینه های جاوا از دستور زیر استفاده کنید.
update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100
مرحله ی ۱.۶ – اکنون با استفاده از دستور تایید (java -version) توضیح داده شده در مرحله ی ۱، جاوا را تایید کنید.
مرحله ی ۲ – نصب چارچوب ZooKeeper
مرحله ی ۲.۱ – دانلود ZooKeeper
برای نصب چارچوب ZooKeeper روی دستگاه خود، به لینک زیر مراجعه کرده و آخرین نسخه ZooKeeper را دانلود کنید.
هم اکنون، آخرین نسخه ZooKeeper 3.4.6 (ZooKeeper-3.4.6.tar.gz) است.
مرحله ی ۲.۲ – استخراج فایل tar
با استفاده از دستور زیر فایل tar را استخراج کنید.
$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data
مرحله ی ۲.۳ – ایجاد فایل Configuration
پرونده پیکربندی را با نام conf / zoo.cfg با استفاده از دستور vi conf / zoo.cfg و کلیه پارامترهای زیر باز کنید تا به عنوان نقطه شروع تنظیم شود.
$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
هنگامی که فایل configuration با موفقیت ذخیره شد و دوباره به ورودی بازگشتید، می توانید سرور zookeeper را شروع کنید.
مرحله ی ۲.۴ – شروع کار با سرور ZooKeeper
$ bin/zkServer.sh start
پس از اجرای دستور فوق، مطابق شکل زیر پاسخی دریافت خواهید کرد:
$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED
مرحله ی ۲.۵ – شروع CLI
$ bin/zkCli.sh
بعد از تایپ دستور فوق، به سرور zookeeper وصل می شوید و پاسخ زیر را دریافت خواهید کرد:
Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]
مرحله ی ۲.۶ – متوقف کردن سرور zookeeper
پس از اتصال سرور و اتمام عملیات، می توانید سرورzookeeper را با دستور زیر متوقف کنید:
$ bin/zkServer.sh stop
اکنون با موفقیت Java و ZooKeeper را روی دستگاه خود نصب کرده اید. در ادامه مراحل نصبApache Kafka را ببینید.
مرحله ی ۳ – نصب Apache Kafka
برای نصب Apache Kafka مراحل زیر را دنبال کنید.
مرحله ی ۳.۱ – دانلود Apache Kafka
برای نصب Apache Kafka بر روی دستگاه خود، روی لینک زیر کلیک کنید
از طریق لینک فوق، آخرین نسخه یعنی kafka_2.11_0.9.0.0.tgz روی دستگاه شما دانلود می شود.
مرحله ی ۳.۲ – استخراج فایل tar
فایل tar را با استفاده از دستور زیر استخراج کنید
$ cd opt/
$ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0
اکنون آخرین نسخه Apache Kafka روی دستگاه شما دانلود شده است.
مرحله ی ۳.۳ – شروع سرور
می توانید سرور را با استفاده از دستور زیر شروع کنید
$ bin/kafka-server-start.sh config/server.properties
بعد از شروع سرور، پاسخ زیر را روی صفحه خود مشاهده خواهید کرد:
$ bin/kafka-server-start.sh config/server.properties
[2016-01-02 15:37:30,410] INFO KafkaConfig values:
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
…………………………………………….
…………………………………………….
مرحله ی ۴ – متوقف کردن سرور
پس از اتمام عملیات، می توانید سرور را با استفاده از دستور زیر متوقف کنید:
$ bin/kafka-server-stop.sh config/server.properties
Apache Kafka – عملیات اساسی
در این بخش به شرح برخی عملیات اساسی در Apache Kafka پرداخته ایم.
ابتدا اجازه دهید اجرای پیکربندی single node-single broker در Apache Kafka را شروع کنیم و سپس تنظیمات خود را به پیکربندی single node-multiple brokers منتقل کنیم.
امیدوارم تاکنون Java ، ZooKeeper و Apache Kafka را روی دستگاه خود نصب کرده باشید. قبل از رفتن به تنظیمات Kafka Cluster ، ابتدا باید ZooKeeper خود را شروع کنید؛ زیرا Kafka Cluster از ZooKeeper استفاده می کند.
شروع ZooKeeper در Apache Kafka
یک ورودی جدید باز کرده و دستور زیر را داخل آن تایپ کنید:
bin/zookeeper-server-start.sh config/zookeeper.properties
برای شروع کارگزار Broker) Apache Kafka) ، دستور زیر را تایپ کنید:
bin/kafka-server-start.sh config/server.properties
بعد از شروع کارگزار Apache Kafka ، دستورات jps را در ورودی ZooKeeper تایپ کنید، سپس پاسخ زیر را مشاهده می کنید:
821 QuorumPeerMain
928 Kafka
931 Jps
اکنون می توانید دو Daemon را در حال اجرا روی ورودی ببینید که QuorumPeerMain به عنوان Daemon برای ZooKeeper و دیگری Daemon برای Kafka است.
پیکربندی single node-single broker در Apache Kafka
در این پیکربندی یک نمونه تک از ZooKeeper و شناسه ی کارگزار دارید. مراحل پیکربندی آن در زیر آمده است:
ایجاد یک موضوع (topic) در Apache Kafka – کافکا یک ابزار خط فرمان با نام kafka-topics.sh را برای ایجاد موضوعات بر روی سرور فراهم می کند. ورودی جدید را باز کنید و مثال زیر را تایپ کنید:
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic Hello-Kafka
در مثال بالا، یک موضوع به نام Hello-Kafka با یک بخش واحد و یک فاکتور replica ایجاد شد. خروجی ایجاد شده فوق مشابه خروجی زیر خواهد بود:
Output − Created topic Hello-Kafka
پس از ایجاد موضوع، می توانید اعلان را در پنجره ورودی کارگزار کافکا ببینید و گزارش مربوط به موضوع ایجاد شده در “/tmp/kafka-logs/“ در فایل config/server.properties دریافت کنید.
لیست موضوعات
برای به دست آوردن لیستی از مباحث در سرور Apache Kafka ، می توانید از دستور زیر استفاده کنید:
Syntax
bin/kafka-topics.sh --list --zookeeper localhost:2181
Output
Hello-Kafka
از آنجا که ما یک موضوع ایجاد کرده ایم، فقط موضوع Hello-Kafka در لیست دریافتی ذکر شده است. درصورت ایجاد موضوعات بیشتر، لیست نام آن موضوعات را در خروجی دریافت خواهید کرد.
شروع ارسال پیام توسط تولیدکننده
Syntax
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
از syntax فوق، دو پارامتر اصلی برای مشتری خط فرمان تولیدکننده مورد نیاز است:
۱. لیست کارگزاران: لیست کارگزارانی که می خواهیم پیام ها را برای آنها ارسال کنیم. در این حالت ما فقط یک کارگزار داریم. پرونده Config / server.properties حاوی شناسه درگاه کارگزار است، از آن جا که می دانیم کارگزار ما درگاه 9092 را گوش می دهد، بنابراین می توانید مستقیما آن را تمییز دهید.
۲. نام موضوع: در اینجا مثالی برای نام موضوع آمده است:
Example
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
تولیدکننده منتظر ورودی از stdin بوده و آن را برای خوشه Apache Kafka منتشر می کند. به طور پیش فرض، هر خط جدید به عنوان یک پیام جدید منتشر می شود و سپس خصوصیات پیش فرض تولیدکننده در فایل config/producer.properties ذخیره می شود. اکنون می توانید همانطور که در زیر نشان داده شده، چند خط پیام را در ورودی تایپ کنید.
Output
$ bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Hello-Kafka[2016-01-16 13:50:45,931]
WARN property topic is not valid (kafka.utils.Verifia-bleProperties)
Hello
My first message
My second message
شروع دریافت پیام ها از سوی مصرف کننده
مشابه تولیدکننده، ویژگی های پیش فرض مصرف کننده در فایل config/consumer.proper-ties مشخص شده است. یک ورودی جدید باز کرده و syntax زیر را برای استفاده از پیام تایپ کنید.
Syntax
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name
--from-beginning
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka
--from-beginning
Output
Hello
My first message
My second message
سرانجام، شما می توانید پیام هایی را از ورودی تولیدکننده وارد کرده و در ورودی مصرف کننده مشاهده کنید. از هم اکنون، شما درک خوبی در مورد single node cluster با یک کارگزار واحد دارید. بگذارید اکنون به سراغ پیکربندی چندین کارگزار برویم.
پیکربندی Single Node-Multiple Brokers در Apache Kafka
قبل از این که به سراغ تنظیمات حالت چند کارگزار بروید، سرور ZooKeeper خود را فعال کنید. سپس مراحل زیر را طی کنید:
چندین کارگزار کافکا ایجاد کنید: ما یک نمونه کارگزار کافکا در حال حاضر در config/server.properties داریم. حال ما به چندین نمونه کارگزار نیاز داریم، بنابراین فایل server.prop-erties موجود را در دو فایل config جدید کپی کرده و آن را به عنوان server-one.properties و server-two.prop-erties تغییر نام دهید. سپس هر دو فایل جدید را ویرایش کرده و تغییرات زیر را اختصاص دهید:
config/server-one.properties :
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The port the socket server listens on
port=9093
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1
config/server-two.properties :
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The port the socket server listens on
port=9094
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2
شروع کار کارگزاران: پس از ایجاد تغییرات در سه سرور، سه پایانه جدید را باز کنید تا کارگزاران یک به یک شروع شوند.
Broker1
bin/kafka-server-start.sh config/server.properties
Broker2
bin/kafka-server-start.sh config/server-one.properties
Broker3
bin/kafka-server-start.sh config/server-two.properties
حالا ما سه کارگزار مختلف داریم که روی دستگاه کار می کنند. برای بررسی تمام Daemon ها jps را در ورودی ZooKeeper تایپ کنید ، سپس پاسخ را مشاهده خواهید کرد.
ایجاد یک موضوع
بگذارید مقدار فاکتور replication را به عنوان سه مورد برای این موضوع اختصاص دهیم، زیرا ما سه کارگزار مختلف داریم. اگر دو کارگزار دارید ، مقدار replica اختصاص داده شده دو عدد خواهد بود.
Syntax
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic topic-name
Example
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3
-partitions 1 --topic Multibrokerapplication
Output
created topic “Multibrokerapplication”
از دستور توصیف برای بررسی اینکه کدام کارگزار مشغول گوش دادن به موضوع ایجاد شده فعلی است، مانند حالت زیر استفاده می شود:
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Output
bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic Multibrokerappli-cation
Topic:Multibrokerapplication PartitionCount:1
ReplicationFactor:3 Configs:
Topic:Multibrokerapplication Partition:0 Leader:0
Replicas:0,2,1 Isr:0,2,1
از خروجی بالا می توان نتیجه گرفت که خط اول خلاصه ای از همه بخش ها، نام موضوع، تعداد بخش ها و فاکتور replication را که قبلاً انتخاب کرده ایم، نشان می دهد. در خط دوم ، هر گره رهبر یک قسمت از بخش ها که، به طور تصادفی انتخاب شده، خواهد بود.
در مثال ما، می بینیم که اولین کارگزار ما (با شناسه ی کارگزاری صفر) رهبر است. سپس Replicaهای صفر، ۱ و ۲ بدین معنی است که همه ی کارگزاران موضوع را همانطور تکرار می کنند که در آخر Isr است، Isr مجموعه Replicaهای همگام سازی است.
آغاز ارسال پیام توسط تولیدکننده
This procedure remains the same as in the single broker setup.
این رویه همانند پیکربندی قبلی است.
Example
bin/kafka-console-producer.sh --broker-list localhost:9092
--topic Multibrokerapplication
Output
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
[2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties)
This is single node-multi broker demo
This is the second message
آغاز دریافت پیام توسط مصرف کننده
این رویه همانند پیکربندی قبلی است.
Example
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion --from-beginning
Output
bin/kafka-console-consumer.sh --zookeeper localhost:2181
—topic Multibrokerapplica-tion —from-beginning
This is single node-multi broker demo
This is the second message
عملیات اساسی بر روی موضوع
در این فصل ما در مورد انواع عملیات اساسی بر روی موضوع در Apache Kafka ، بحث خواهیم کرد.
اصلاح یک موضوع
همانطور که قبلا چگونگی ایجاد یک موضوع را در Kafka Cluster گفته شد، اکنون با استفاده از دستور زیر به اصلاح یک موضوع ایجاد شده را می پردازیم:
Syntax
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name
--parti-tions count
Example
We have already created a topic “Hello-Kafka” with single partition count and one replica factor.
Now using “alter” command we have changed the partition count.
bin/kafka-topics.sh --zookeeper localhost:2181
--alter --topic Hello-kafka --parti-tions 2
Output
WARNING: If partitions are increased for a topic that has a key,
the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
حذف یک موضوع
برای حذف یک موضوع در Apache Kafka می تواند از کدهای زیر استفاده کنید:
Syntax
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
Example
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
Output
> Topic Hello-kafka marked for deletion
Apache Kafka – مثال ساده از تولیدکننده
در این بخش می خواهیم با استفاده از یک سرویس دهنده جاوا، برنامه ای برای انتشار و استفاده از پیام ها ایجاد کنیم. مشتری تولیدکننده Apache Kafka از API زیر تشکیل شده است:
API تولیدکننده Apache Kafka
در این بخش مهم ترین مجموعه API تولیدکننده Apache Kafka را توضیح خواهیم داد. قسمت مرکزی API تولیدکننده کافکا کلاس تولیدکننده کافکا است. کلاس تولیدکننده کافکا گزینه ای را برای اتصال یک کارگزار کافکا در ساختار خود، با روش های زیر فراهم می کند.
• کلاس تولیدکننده ک Apache Kafka روش ارسالی برای فرستادن پیام به صورت غیرهمزمان به یک موضوع را فراهم می کند. کد این روش به شرح زیر است:
producer.send(new ProducerRecord<byte[],byte[]>(topic,
partition, key1, value1) , callback);
• رکورد تولیدکننده – تولیدکننده مجموعه ای از رکوردها را منتظر ارسال اند ، مدیریت می کند.
• Callback – پس از تأیید رکورد توسط سرور، یک کاربر Callbackای را به اجرا در می آورد.
• کلاس تولیدکنند ه ی کافکا یک flush method را برای اطمینان از اتمام ارسال پیام های قبلی، فراهم می کند. Syntaxاین روش به شرح زیر است :
public void flush()
• کلاس تولیدکنند ه ی Apache Kafka روش partitionFor را فراهم می کند، که به دریافت ابرداده های مربوط به موضوعی خاص در یک بخش کمک می کند. این می تواند برای بخش بندی های سفارشی استفاده شود. کد دستوری این روش به شرح زیر است:
public Map metrics()
این دستور، نقشه ی محاسبات داخلی را، که توسط تولید کننده نگهداری می شود ، برمی گرداند.
• Public void close () – کلاس تولیدکنند ه ی کافکا بلوک های این روش را تا زمانی که تمام درخواست-های قبلی تکمیل شوند، فراهم می کند.
API تولید کننده
قسمت مرکزی API تولیدکننده Apache Kafka کلاس تولیدکننده کافکا است. کلاس تولیدکننده کافکا گزینه ای را برای اتصال یک کارگزار کافکا در ساختار خود، با روش های زیر فراهم می کند.
کلاس تولید کننده
کلاس تولیدکننده Apache Kafka روش ارسالی برای فرستادن پیام به یک یا چند موضوع فراهم می-کند. کد این روش به شرح زیر است:
public void send(KeyedMessaget<k,v> message)
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);
دو نوع تولیدکننده وجود دارد: Sync و Async
پیکربندی API برای تولیدکننده Sync تغییری نخواهد کرد. تفاوت بین این دو تولیدکننده در این است که یک تولیدکننده Sync پیام ها را مستقیماً اما در پسزمینه ارسال میکند. زمانی که نیاز به توان گذردهی بالاتری داشته باشید تولید کننده Async بهتر است. در نسخههای قبلی مانند ۰.۸، یک تولیدکننده async برای ثبت خطای گیرنده در عملیات send()، callbackای ندارد. این امکان تنها در نسخه فعلی ۰.۹ در دسترس است.
public void close()
کلاس تولیدکننده این روش را برای بستن تمام راههای ارتباطی بین تولیدکنندگان و کارگزاران، فراهم میکند.
تنظیمات پیکربندی
برای فهم بهتر تنظیمات اصلی پیکربندی API تولیدکننده Apache Kafka در جدول زیر آمده است:
ردیف | تنظیمات پیکربندی و توضیحات آن |
۱ | شناسه ی مشتریApplicationتولیدکننده را مشخص میکند. |
۲ | نوع تولیدکنندهSync یا async |
۳ | Acksتنظیمات acks، معیارهای تحت درخواست تولیدکننده را کاملا کنترل میکنند. |
۴ | تلاش مجدداگر درخواست تولیدکننده با شکست مواجه شود، به صورت خودکار با مقدار مشخص جدیدی مجددا درخواست میشود. |
۵ | bootstrap.serversلیست راه اندازی کارگزاران. |
۶ | linger.msاگر میخواهید تعداد درخواست ها را کاهش دهید، میتوانید از linger.ms استفاده کنید. |
۷ | key.serializerکلید رابط سریال سازی. |
۸ | value.serializerمقدار رابط سریال سازی. |
۹ | batch.sizeاندازهی بافر |
۱۰ | buffer.memoryمقدار کل حافظه موجود برای تولیدکننده را، برای بافر کنترل میکند. |
ProducerRecord API
ProducerRecord یک جفت کلید/مقدار است که به خوشهی کافکا ارسال میشود. سازندهی کلاس ProducerRecord برای ایجاد یک رکورد با جفتهای بخش، کلید و مقدار با استفاده از دستور زیر استفاده میشود:
public ProducerRecord (string topic, int partition, k key, v value)
• topic – نام موضوع تعریف شده توسط کاربر که برای ضبط اضافه خواهد شد.
• partition – تعداد بخش
• key – کلیدی که در ضبط درج می شود.
• value – محتوای رکورد
public ProducerRecord (string topic, k key, v value)
سازنده کلاس ProducerRecord برای ایجاد یک رکورد با کلید ، جفت ارزش و بدون بخش استفاده می شود.
• topic – برای اختصاص رکوردها موضوعی ایجاد کنید.
• key – کلید برای ضبط.
• value – محتوای رکورد.
public ProducerRecord (string topic, v value)
کلاس ProducerRecord بدون بخش و کلید یک رکورد ایجاد می کند.
• topic – ایجاد یک موضوع.
• value – محتوای رکورد.
روش های کلاس ProducerRecord در جدول زیر ذکر شده است:
ردیف | روشهای کلاس و توضیحات آن |
۱ | public string topic()موضوع به رکورد اضافه میشود. |
۲ | public K key()کلید در رکورد گنجانده میشود. در صورت عدم وجود چنین کلیدی، مقدار null بازگردانده خواهد شد. |
۳ | public V value()ضبط محتوا |
۴ | partition()بخش برای گزارش شمرده میشوند. |
SimpleProducer application
قبل از ایجاد برنامه ابتدا ZooKeeper و کارگزار Apache Kafka را راه اندازی کنید و سپس موضوع دلخواه خود را در کارگزار کافکا با استفاده از دستور ایجاد موضوع، ایجاد کنید. پس از آن یک کلاس جاوا به نام SimpleProducer.java ایجاد کنید و کد زیر را تایپ کنید.
//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
public static void main(String[] args) throws Exception{
// Check arguments length value
if(args.length == 0){
System.out.println("Enter topic name”);
return;
}
//Assign topicName to string variable
String topicName = args[0].toString();
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully”);
producer.close();
}
}
Compilation – برنامه با استفاده از دستور زیر قابل گردآوری است.
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
Execution – برنامه با استفاده از دستور زیر قابل اجرا است.
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>
Output
Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10
مثال ساده مصرف کننده
باتوجه به اینکه تولید کننده ای برای ارسال پیام به خوشه Apache Kafka ایجاد کرده ایم، هم اکنون اقدام به ایجاد یک مصرف کننده برای استفاده از پیام ها، می کنیم. API مصرف کننده ی کافکا برای مصرف پیام های خوشه کافکا استفاده می شود. سازنده کلاس KafkaConsumer در زیر تعریف شده است:
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs – نقشه پیکربندی های مصرف کننده را برمی گرداند.
کلاس KafkaConsumer دارای روش های قابل توجهی است که در جدول زیر ذکر شده اند:
ردیف | روشها و توضیحات |
۱ | public java.util.Set<TopicPar-tition> assignment()مجموعه بخشهایی را که به تازگی توسط مصرفکننده اختصاص داده شده را، دریافت میکند. |
۲ | public string subscription()برای دریافت مدوام بخشهای اختصاص داده شده، مشترک لیست موضوعات مشخص شده میشود. |
۳ | public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)برای دریافت مدوام بخشهای اختصاص داده شده، مشترک لیست موضوعات مشخص شده میشود. |
۴ | public void unsubscribe()اشتراک موضوعات لیست مشخص شدهای از بخشها را لغو میکند. |
۵ | public void sub-scribe(java.util.List<java.lang.String> topics)برای دریافت مدوام بخشهای اختصاص داده شده، مشترک لیست موضوعات مشخص شده میشود. اگر لیست خالی باشد مانند حالت روش ۴ عمل میکند. |
۶ | public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)الگوی استدلال به الگوی اشتراک در قالب بیان معمول اشاره دارد و آرگومان شنونده از الگوی اشتراک، اعلان دریافت میکند. |
۷ | public void as-sign(java.util.List<TopicParti-tion> partitions)به صورت دستی لیستی از بخشها را به مصرفکننده اختصاص میدهد. |
۸ | poll()برای موضوعات یا بخشهای مشخص شده با استفاده از یکی از API های مشترک یا اختصاص داده شده، داده را بارگیری میکند. اگر قبل از اجرای این روش بر روی دادهها مشترک موضوعات نباشند ، خروجی اعلام خطا خواهد بود. |
۹ | public void commitSync()Commit offsets را در آخرین poll() برای همه فهرستهای اشتراکی موضوعات و بخشها بازگردانده میشود. همین عمل برای commitAsyn() اعمال میشود. |
۱۰ | public void seek(TopicPartition partition, long offset)مقدار offset فعلی را که مصرفکننده با استفاده از روش poll() بعدی استفاده خواهد کرد ، بارگیری میکند. |
۱۱ | public void resume()بخش متوقف شده را مجددا فعال میکند. |
۱۲ | public void wakeup()مصرفکننده را فعال میکند. |
ConsumerRecord API
ConsumerRecord API برای دریافت رکوردها از خوشه Apache Kafka استفاده می شود. این API شامل یک نام موضوع، شماره بخشی که از آن رکورد دریافت می شود و یک offset که به یک رکورد در یک بخش Apache Kafka اشاره می کند، می باشد. کلاس ConsumerRecord برای ایجاد یک رکورد مصرف کننده با نام موضوع خاص، تعداد بخش ها و جفت های <کلید ، مقدار> استفاده می شود که دارای دستور زیر است:
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
• topic – نام موضوع برای رکورد مصرف کننده که از خوشه کافکا دریافت شده است.
• partition – بخش برای موضوع.
• key – کلید ضبط ، در صورت وجود هیچ کلیدی، null بازگردانده خواهد شد.
• value – محتوای رکورد.
ConsumerRecord API
ConsumerRecords API به عنوان ظرفی برای ConsumerRecord عمل می کند. این API برای نگه داشتن لیست ConsumerRecord در هر بخش برای هر موضوع خاص استفاده می شود. سازنده ی این API در زیر تعریف شده است:
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
• TopicPartition − Return a map of partition for a particular topic.
• Records − Return list of ConsumerRecord.
ConsumerRecords class has the following methods defined.
• TopicPartition – نقشه بخش را برای یک موضوع خاص برمی گرداند.
• سوابق – لیستی از ConsumerRecord را بازمی گرداند.
کلاس ConsumerRecords دارای روش هایی است که در زیر جدول تعریف شده است:
ردیف | روشها و توضیحات |
۱ | public int count()تعداد کل رکوردهای همهی موضوعات را برمیگرداند. |
۲ | public Set partitions()در این مجموعه رکوردها، مجموعهی بخشهای دارای داده بازگردانده میشود. در صورت عدم وجود داده، مجموعه خالی خواهد بود. |
۳ | public Iterator iterator()Iterator شما را قادر میسازد تا از طریق در مجموعهای از گرفتن یا از بین بردن عناصر، چرخ بزنید. |
۴ | public List records()لیستی از رکوردهای مربوط به بخشهای موردنظر را میدهد. |
تنظیمات پیکربندی
تنظیمات پیکربندی برای API مصرفکننده در جدول زیر ذکر شده است:
ردیف | تنظیمات و توضیحات |
۱ | bootstrap.serversلیست راهاندازی کارگزاران |
۲ | group.idیک مصرفکننده را به یک گروه اختصاص میدهد. |
۳ | enable.auto.commitدر صورت صحیح بودن مقدار offsetها به صورت خودکار عمل commit را انجام خواهد داد. |
۴ | auto.commit.interval.msزمانهای بین به روزرسانی offsetهای استفاده در ZooKeeper را برمیگرداند. |
۵ | session.timeout.msمشخص میکند که کافکا، چند میلی ثانیه قبل از منصرف شدن و ادامه مصرف پیام، منتظر پاسخ ZooKeeper به یک درخواست (خواندن یا نوشتن) خواهد بود. |
SimpleConsumer Application
مراحل این بخش با مراحل producer application یکسان است. ابتدا ZooKeeper و کارگزار Kafka خود را شروع کنید. سپس با کلاس جاوا بنام SimpleConsumer.java یک برنامه SimpleConsumer ایجاد کنید و کد زیر را تایپ کنید:
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Kafka consumer configuration settings
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName))
//print the topic name
System.out.println("Subscribed to topic " + topicName);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
Compilation – برنامه با استفاده از دستور زیر قابل گردآوری است.
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
Execution – برنامه با استفاده از دستور زیر قابل اجرا است.
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>
Input – CLI سازنده را باز کنید و پیام هایی را برای موضوع ارسال کنید. می توانید ورودی ساده ای مانند ‘Hello Consumer’ را قرار دهید.
Output
Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer
Apache Kafka – مثال گروه مصرف کنندگان
گروه مصرف کنندگان یک از دو مصرف multi-threaded یا multi-machine از موضوعات کافکا، می باشد.
گروه مصرف کننده
• مصرف کنندگان می توانند با استفاده از یک شناسه گروه یکسان به گروه بپیوندند.
• حداکثر میزان موازی بودن یک گروه به تعداد مصرف کنندگان گروه ربط دارد نه به تعداد بخش ها.
• Apache Kafka، بخش های یک موضوع را به یک مصرف کننده از گروه اختصاص می دهد ، بدین ترتیب هر بخش تنها توسط یک مصرف کننده در گروه مصرف می شود.
• Apache Kafka تضمین می کند که یک پیام تنها توسط یک مصرف کننده در گروه خوانده می شود.
• مصرف کنندگان می توانند پیام را به ترتیب ذخیره شدن در لیست مربوطه مشاهده کنند.
تعادل مجدد یک مشتری
اضافه کردن مراحل یا موضوعات بیشتر باعث می شود تا کافکا دوباره تعادل برقرار کند. اگر هر مصرف کننده یا کارگزاری نتواند به ZooKeeper اعلام موجودیت کند، می تواند از طریق خوشه ی کافکا دوباره پیکربندی شود. در طی این تعادل مجدد، Apache Kafka بخش های موجود را به مصارف موجود اختصاص می دهد، احتمالا بخش را به فرآیند دیگری منتقل می کند.
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
if(args.length < 2){
System.out.println("Usage: consumer <topic> <groupname>");
return;
}
String topic = args[0].toString();
String group = args[1].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", group);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}
گردآوری
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java
اجرا
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
ConsumerGroup <topic-name> my-group
Here we have created a sample group name as my-group with two consumers. Similarly, you can create your group and number of consumers in the group.
در اینجا ما یک گروه نمونه به نام my-group با دو مصرف کننده ایجاد کرده ایم. به طور مشابه، می-توانید گروه و تعداد مصرف کنندگان خود را در گروه ایجاد کنید.
ورودی
CLI تولیدکننده را باز کرده و مانند نمونه ی زیر پیام ارسال کنید:
Test consumer group 01
Test consumer group 02
خروجی اولین فرآیند
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01
خروجی دومین فرآیند
Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02
امیدوارم با استفاده از نسخه ی نمایشی Java ، SimpleConsumer و ConsumeGroup را درک کرده باشید. اکنون شما ایده ای در مورد نحوه ارسال و دریافت پیام با استفاده از سرویس دهنده جاوا دارید. بگذارید در فصل بعد ادغام apache kafka با فناوری های big data را ادامه دهیم.
Apache Kafka – ادغام با storm
در این بخش به شرح ادغام Apache Kafka با Apache Storm می پردازیم.
درباره ی storm
Storm در ابتدا توسط Nathan Marz و تیم او در BackType ایجاد شد. در مدت زمان کوتاهی، Apache Storm به یک استاندارد برای سیستم پردازش توزیع شده در لحظه تبدیل شد که به شما امکان پردازش حجم عظیمی از داده ها را می دهد. Storm بسیار سریع است و یک benchmark آن را با بیش از یک میلیون tuple در ثانیه به ازای هر گره پردازش می کند. Apache Storm به طور مداوم اجرا می شود و از داده های منابع پیکربندی شده(Spouts) استفاده می کند و داده ها را از خط پردازش (Bolts) عبور می دهد. ترکیب Spouts و Bolts یک توپولوژی می سازد.
جریان انتزاعی
Spout یک منبع جریان است. به عنوان مثال ، یک spout می تواند موضوعات مربوط به Apache Kafka را بخواند و آن ها را به عنوان یک جریان منتشر کند. یک bolt جریان ورودی را مصرف می-کند، پردازش می کند و احتمالا جریان های جدیدی را منتشر می کند. Bolt ها می توانند از طریق توابع در حال اجرا، فیلتر نوارها، انجام جریان های ادغام شده، پیوستن جریان ها، تعامل با بانک های اطلاعاتی و موارد دیگر، هرکاری انجام دهند. هر گره در یک توپولوژی Storm به صورت موازی اجرا می شود. توپولوژی به طور نامحدود اجرا می شود تا زمانی که آن را فسخ کنید. Storm به طور خودکار دوباره وظایف ناموفق را تنظیم می کند. علاوه بر این، Storm عدم از دست دادن اطلاعات را تضمین می کند، حتی اگر دستگاه ها خراب شوند و پیام ها کاهش یابند.
Let us go through the Kafka-Storm integration API’s in detail. There are three main classes to integrate Kafka with Storm. They are as follows –
سه کلاس اصلی برای ادغام Apache Kafka و Storm وجود دارد. آنها به شرح زیر است:
BrokerHosts – ZkHosts – StaticHosts
BrokerHosts یک رابط است و ZkHosts و StaticHosts دو پیاده سازی اصلی آن هستند. از ZkHosts برای ردیابی پویا کارگزاران کافکا با حفظ جزئیات در ZooKeeper استفاده می شود، درحالی که از StaticHosts برای تنظیم دستی یا آماری کارگزاران Apache Kafka و جزئیات آن استفاده می شود. ZkHosts راهی ساده و سریع برای دسترسی به کارگزار Apache Kafka است.
دستور ZkHosts به شرح زیر است:
public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)
جایی که brokerZkStr میزبان ZooKeeper است و brokerZkPath مسیر ZooKeeper برای حفظ جزئیات کارگزار کافکا است.
KafkaConfig API
این API برای تعریف تنظیمات پیکربندی برای خوشه Apache Kafka استفاده می شود. دستور Kafka Config به شرح زیر است:
public KafkaConfig(BrokerHosts hosts, string topic)
Host – BrokerHost ها می توانند ZkHosts یا StaticHosts باشند.
Topic – نام موضوع.
SpoutConfig API
Spoutconfig فرمت گسترده ی KafkaConfig است که از اطلاعات اضافی ZooKeeper پشتیبانی می-کند.
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
• host – BrokerHosts می تواند اجرای هرگونه رابط BrokerHosts باشد.
• topic – نام موضوع.
• zkRoot – مسیر ریشه ZooKeeper .
• id – spout حالت offsetهای مصرفی آن در Zookeeper را ذخیره می کند. شناسه باید به صورت منحصربه فرد spout شما را مشخص کند.
SchemeAsMultiScheme
SchemeAsMultiScheme رابطی است که نحوه ی چگونگی تبدیل ByteBuffer مصرفی از کافکا را به storm tuple نشان می دهد. این از MultiScheme گرفته شده است و اجرای کلاس Scheme را می پذیرد. اجراهای کلاس Scheme بسیار زیادی موجود است و یکی از این آن ها StringScheme است که بایت را به عنوان یک رشته ی ساده تجزیه می کند. همچنین نامگذاری قسمت خروجی شما را کنترل می کند. دستور آن به شرح زیر است:
public SchemeAsMultiScheme(Scheme scheme)
• scheme – ByteBuffer مصرفی از کافکا
KafkaSpout API
KafkaSpout پیاده سازی spout ماست، که با Storm ادغام می شود. این پیام ها را از موضوع Apache Kafka دریافت می کند و آن را به عنوان tuple در محیط Storm منتشر می کند. KafkaSpout جزئیات پیکربندی خود را از SpoutConfig دریافت می کند. در زیر یک نمونه کد برای ایجاد یک KafkaSpout ساده قرار گرفته است:
// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);
//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts,
topicName, "/" + topicName UUID.randomUUID().toString());
//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
ایجادBolt
Bolt مؤلفه ای است که به tupleها را به عنوان ورودی از دریافت می کند ، آن را پردازش می کند و به عنوان خروجی tupleهای جدیدی تولید می کند. Boltها رابط IRichBolt را پیاده سازی می کنند. در این برنامه از دو Bolt، WordSplitter-Bolt و WordCounterBolt برای انجام عملیات استفاده می شود.
رابط IRichBolt روش های زیر را دارد:
•Prepare – محیطی را برای اجرای bolt در اختیار شما قرار می دهد. مجریان، این روش را برای آغازدهی به bolt اجرا می کنند.
• Execute – یک دسته از ورودی را پردازش می کند.
• Cleanup – وقتی یک bolt درحال خاموش شدن باشد، وصل می شود.
• declareOutputFields – طرح خروجی tuple را اعلام می کند.
بگذارید SplitBolt.java را ایجاد کنیم، که منطق تقسیم یک جمله به کلمات و CountBolt.java را ایجاد می کند، که آن را برای جدا کردن کلمات منحصر به فرد و شمارش تکرار آن اجرا می کند.
SplitBolt.java
import java.util.Map;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class SplitBolt implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word: words) {
word = word.trim();
if(!word.isEmpty()) {
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public void cleanup() {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
CountBolt.java
import java.util.Map;
import java.util.HashMap;
import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class CountBolt implements IRichBolt{
Map<String, Integer> counters;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str, 1);
}else {
Integer c = counters.get(str) +1;
counters.put(str, c);
}
collector.ack(input);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counters.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
ارسال به توپولوژی
توپولوژی Storm اساسا یک ساختار صرفه جویی است. کلاس TopologyBuilder روش های ساده و راحتی را برای ایجاد توپولوژی های پیچیده ارائه می دهد. کلاس TopologyBuilder روش هایی برای تنظیم spout (setSpout) و تنظیم bolt (setBolt) دارد. درنهایت، TopologyBuilder برای ایجاد توپولوژی، روش createTopology را داراست. روش های shuffleGrouping و fieldsGrouping به تنظیم جریان گروه بندی برای spout و boltها کمک می کند.
Local Cluster – برای اهداف توسعه، ما می وانیم با استفاده از شی LocalCluster یک خوشه محلی ایجاد کنیم و سپس توپولوژی را با استفاده از روش submitTopology از کلاس LocalCluster ارسال کنیم.
KafkaStormSample.java
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.ZkHosts;
import storm.kafka.Broker;
import storm.kafka.StaticHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;
public class KafkaStormSample {
public static void main(String[] args) throws Exception{
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
String zkConnString = "localhost:2181";
String topic = "my-first-topic";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,
UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.forceFromStart = true;
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig));
builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout");
builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());
Thread.sleep(10000);
cluster.shutdown();
}
}
قبل از کامپایل برنامه، ادغام Apache Kafka و Apache Storm به کتابخانه ZooKeeper client java نیاز دارد. ویرایشگر نسخه ۲.۹.۱ از Apache Storm نسخه ۰.۹.۵ پشتیبانی می کند (که در این آموزش از آن ها استفاده می کنیم). فایل های زیر را بارگیری کرده و آن را در java class path قرار دهید.
• curator-client-2.9.1.jar
• curator-framework-2.9.1.jar
پس از درج فایل های موردنیاز، برنامه را با استفاده از دستور زیر کامپایل کنید:
javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java
اجرا
CLI تولیدکننده ی Apache Kafka را آغاز کنید(توضیح داده شده در فصل قبل)، موضوع جدیدی به نام my-first-topic ایجاد کرده و چند نمونه پیام را مطابق شکل زیر ارسال کنید:
hello
kafka
storm
spark
test message
another test message
حال با استفاده از فرمان زیر برنامه را اجرا کنید:
java -cp “/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample
خروجی مثال به شکل زیر خواهد بود:
storm : 1
test : 2
spark : 1
another : 1
kafka : 1
hello : 1
message : 2
Apache Kafka – ادغام با spark
در این بخش به شرح ادغام Apache Kafka با Spark Streaming API پرداخته شده است.
درباره ی spark
Spark Streaming API پردازش داده های درحال جریان را مقیاس پذیر، با توان گذردهی بالا و حمل پذیری بالا در برابر خطا، می سازد. داده ها را می توان از منابع زیادی مانند Apache Kafka ، فلوم، توییتر و غیره استفاده کرد و می توان با استفاده از الگوریتم های پیچیده مانند توابع سطح بالا مانند map ، reduce ، join و window پردازششان کرد. سرانجام، داده های پردازش شده را می توان به فایل های سیستم، پایگاه داده ها و داشبوردهای جاری منتقل کرد. مجموعه داده های توزیع شده انعطاف پذیر (RDD) یک ساختار داده بنیادی Spark است که مجموعه ای از objectهای توزیع شده ی تغییرناپذیر است. هر مجموعه داده در RDD به بخش های منطقی تقسیم می شود که ممکن است در گره های مختلف خوشه محاسبه شود.
ادغام Apache Kafka با Spark
Apache Kafka یک بستر پیام رسانی و ادغام بالقوه برای جریان Spark است. Apache Kafka به عنوان قطب اصلی جریان داده های در لحظه عمل می کند و با استفاده از الگوریتم های پیچیده در Spark Streaming پردازش می شود. پس از پردازش داده ها ، Spark Streaming می تواند نتایجی را برای موضوع دیگری از Apache Kafka یا فروشگاه HDFS، پایگاه داده ها یا داشبوردها منتشر کند. نمودار زیر جریان مفهومی را نشان می دهد:
نمودار جریان مفهومی spark
اکنون جزئیات API Kafka-Spark را شرح می دهیم:
SparkConf API
این API نشان دهنده ی پیکربندی یک برنامه Spark است که برای تنظیم پارامترهای مختلف Spark به عنوان جفت ارزش -کلید استفاده می شود.
کلاس SparkConf روشهای زیر را دارد:
• set(string key, string value) – متغیرهای پیکربندی را تنظیم می کند
• remove(string key) – کلید را از پیکربندی خارج می کند
• setAppName(string name) – نام برنامه را برای برنامه شما تنظیم می کند
• get(string key) – کلید را دریافت می کند
StreamingContext API
این API اصلی ترین نقطه ورود برای عملکرد Spark است. SparkContext نشان دهنده ی اتصال به یک خوشه ی Spark است و می تواند برای ایجاد RDD ها ، accumulatorها و broadcast variableها، روی خوشه مورد استفاده قرار گیرد. دستور آن در زیر مشخص شده است:
public StreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
- Master – URL خوشه برای اتصال به (e.g. mesos://host:port, spark://host:port, local[4]).
- appName – نام کار شما، برای نمایش در cluster web UI.
- batchDuration – فاصلهی زمانیای که جریان دادهها بین دستهها تقسیم میشود.
- Master – URL خوشه برای اتصال به (e.g. mesos://host:port, spark://host:port, local[4]).
- appName – نام کار شما، برای نمایش در cluster web UI.
- batchDuration – فاصلهی زمانیای که جریان دادهها بین دستهها تقسیم میشود.
public StreamingContext(SparkConf conf, Duration batchDuration)
با ایجاد تنظیمات لازم برای SparkContext جدید ، یک StreamingContext ایجاد کنید
• conf – پارامترهای spark
• batchDuration – بازه زمانی که در آن داده های جریان به دسته ها تقسیم می شوند
KafkaUtils API
API KafkaUtils برای اتصال خوشه Apache Kafka به جریان Spark استفاده می شود. این API دارای روش قابل توجهی است که دستور زیر را ایجاد می کند:
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
StreamingContext ssc, String zkQuorum, String groupId,
scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
روش فوق نشان داده شده است برای ایجاد یک جریان ورودی که پیام ها را از کارگزاران Apache Kafka بیرون می کشد.
• ssc – جسم StreamingContext.
• zkQuorum – quorum Zookeeper
• groupId – شناسه گروه برای مصرف کننده.
• topics – نقشه ای از موضوعات را برای استفاده بازمی گرداند.
• storeLevel – سطح ذخیره سازی برای ذخیره اشیای دریافتی استفاده می شود.
KafkaUtils API یک روش دیگر createDirectStream را دارد، که برای ایجاد یک جریان ورودی که بطور مستقیم پیام ها را از کارگزاران کافکا بدون استفاده از گیرنده بیرون می کشد، استفاده می شود. این جریان می تواند تضمین کند که هر پیام از Apache Kafka دقیقا یک بار در تبادلات قرار خواهد گرفت.
برنامه ی نمونه در Scala انجام می شود. برای کامپایل برنامه، لطفاًsbt ، ابزار ایجاد scala را بارگیری و نصب کنید. کد برنامه اصلی در زیر ارائه شده است:
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Build Script
ادغام spark-kafka به spark، جریان spark و spark Kafka integration jar بستگی دارد. یک فایل جدید build.sbt ایجاد کنید و جزئیات برنامه و وابستگی آن را مشخص کنید. sbt در ضمن ایجاد برنامه، jar لازم را بارگیری می کند.
name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
Compilation / Packaging
دستور زیر را برای کامپایل و بسته بندی فایل jar برنامه اجرا کنید. برای اجرای برنامه باید پرونده jar را در spark console ارسال کنیم.
Submiting to Spark
مانند بخش های گذشته CLI تولیدکننده را آغاز کرده، مبحث جدیدی با عنوان my-first- topic ایجاد کنید و مانند زیر پیام های نمونه ای ارسال کنید.
Another spark test message
دستور زیر را برای ارسال برنامه به spark console اجرا کنید:
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
نمونه خروجی این برنامه در زیر نشان داده شده است:
spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..
Apache Kafka – twitter
پیش از این، ما شاهد ادغام Storm و Spark با Apache Kafka بودیم. در هر دو سناریو، ما یک تولیدکننده ی کافکا (با استفاده از CLI) برای ارسال پیام به اکوسیستم Apache Kafka ایجاد کردیم. سپس، شبکه Storm و Spark پیام ها را با استفاده از مصرف کننده ی Apache Kafka می خوانند و آن ها را به ترتیب وارد اکوسیستم Storm و Spark می کنند. بنابراین ، عملاً باید تولید کننده ی Apache Kafka را با ویژگی های زیر ایجاد کنیم:
• فیدهای توییتر را با استفاده از“Twitter Streaming API” بخواند،
• فیدها را پردازش کند،
• هشتگ ها را استخراج کند و آن ها را به Apache Kafka بفرستد.
پس از دریافت هشتگ ها توسط Apache Kafka ، ادغام Storm / Spark اطلاعات را دریافت کرده و آن را به اکوسیستم Storm / Spark می فرستد.
Twitter Streaming API
“Twitter Streaming API” در هر زبان برنامه نویسی ای قابل دسترسی است. “twitter4j” یک کتابخانه جاوا غیر رسمی و منبعی آزاد است که یک ماژول مبتنی بر جاوا را برای دسترسی آسان به “Twitter Streaming API” فراهم می کند. “twitter4j” یک چارچوب مبتنی بر شنونده را برای دسترسی به توییت ها فراهم می کند. برای دسترسی به “Twitter Streaming API”، باید وارد حساب کاربری توسعه دهنده ی توییتر شوید و جزئیات تأیید اعتبار OAuth زیر را بدست آورید.
• Customerkey
• CustomerSecret
• AccessToken
• AccessTookenSecret
پس از ایجاد حساب توسعه دهنده، فایل های jar “twitter4j”را بارگیری کرده و آن را در مسیر کلاس جاوا قرار دهید.
کد دستوری کامل سازنده توییتر کافکا (KafkaTwitterProducer.java) در زیر ذکر شده است:
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import twitter4j.*;
import twitter4j.conf.*;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaTwitterProducer {
public static void main(String[] args) throws Exception {
LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Sta-tus>(1000);
if(args.length < 5){
System.out.println(
"Usage: KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret> <twitter-access-token>
<twitter-access-token-secret>
<topic-name> <twitter-search-keywords>");
return;
}
String consumerKey = args[0].toString();
String consumerSecret = args[1].toString();
String accessToken = args[2].toString();
String accessTokenSecret = args[3].toString();
String topicName = args[4].toString();
String[] arguments = args.clone();
String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true)
.setOAuthConsumerKey(consumerKey)
.setOAuthConsumerSecret(consumerSecret)
.setOAuthAccessToken(accessToken)
.setOAuthAccessTokenSecret(accessTokenSecret);
TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();
StatusListener listener = new StatusListener() {
@Override
public void onStatus(Status status) {
queue.offer(status);
// System.out.println("@" + status.getUser().getScreenName()
+ " - " + status.getText());
// System.out.println("@" + status.getUser().getScreen-Name());
/*for(URLEntity urle : status.getURLEntities()) {
System.out.println(urle.getDisplayURL());
}*/
/*for(HashtagEntity hashtage : status.getHashtagEntities()) {
System.out.println(hashtage.getText());
}*/
}
@Override
public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {
// System.out.println("Got a status deletion notice id:"
+ statusDeletionNotice.getStatusId());
}
@Override
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
// System.out.println("Got track limitation notice:" +
num-berOfLimitedStatuses);
}
@Override
public void onScrubGeo(long userId, long upToStatusId) {
// System.out.println("Got scrub_geo event userId:" + userId +
"upToStatusId:" + upToStatusId);
}
@Override
public void onStallWarning(StallWarning warning) {
// System.out.println("Got stall warning:" + warning);
}
@Override
public void onException(Exception ex) {
ex.printStackTrace();
}
};
twitterStream.addListener(listener);
FilterQuery query = new FilterQuery().track(keyWords);
twitterStream.filter(query);
Thread.sleep(5000);
//Add Kafka producer config settings
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
int i = 0;
int j = 0;
while(i < 10) {
Status ret = queue.poll();
if (ret == null) {
Thread.sleep(100);
i++;
}else {
for(HashtagEntity hashtage : ret.getHashtagEntities()) {
System.out.println("Hashtag: " + hashtage.getText());
producer.send(new ProducerRecord<String, String>(
top-icName, Integer.toString(j++), hashtage.getText()));
}
}
}
producer.close();
Thread.sleep(5000);
twitterStream.shutdown();
}
}
Compilation
برنامه را با استفاده از دستور زیر کامپایل کنید:
javac -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:. KafkaTwitterProducer.java
Execution
دو کنسول را باز کنید. برنامه کامپایل شده فوق را مطابق شکل زیر در یک کنسول اجرا کنید:
java -cp “/path/to/kafka/libs/*”:”/path/to/twitter4j/lib/*”:
. KafkaTwitterProducer <twitter-consumer-key>
<twitter-consumer-secret>
<twitter-access-token>
<twitter-ac-cess-token-secret>
my-first-topic food
هر یک از برنامه های Spark / Storm را که در فصل قبل توضیح داده شده است، در یکی دیگر از پنجره ها توضیح دهید. نکته اصلی قابل ذکر این است که موضوع مورد استفاده در هر دو مورد باید یکسان باشد. در اینجا، ما از “my-first-topic” به عنوان نام موضوع استفاده کرده ایم.
Output
خروجی این برنامه به کلمات کلیدی و فید فعلی توییتر بستگی دارد. خروجی نمونه در زیر مشخص شده است (ادغام storm).
food : 1
foodie : 2
burger : 1
. . .
Apache Kafka – ابزارها
ابزار Apache Kafka تحت عنوان org.apache.kafka.tools بسته بندی شده است. این ابزارها به ابزارهای سیستم و ابزارهای تکثیر طبقه بندی می شوند.
ابزار های سیستم در Apache Kafka
ابزارهای سیستم را می توان از خط فرمان با استفاده از run class script اجرا کرد، که دستور آن به شرح زیر است:
bin/kafka-run-class.sh package.class - - options
برخی از این ابزارها در زیر آورده شده اند:
• Kafka Migration Tool – این ابزار برای انتقال یک کارگزار از نسخه ی به نسخه ی دیگر استفاده می شود.
• Mirror Maker – این ابزار برای منعکس کردن یک خوشه ی Apache Kafka در خوشه ی دیگر کاربرد دارد.
• Consumer Offset Checker – این ابزار گروه مصرف کننده، موضوع، بخش ها، offset ها، logsize، مالک مجموعه ی مشخصی از موضوعات و گروه مصرف کننده را نمایش می دهد.
ابزارهای تکثیر در Apache Kafka
ابزارهای تکثیر در Apache Kafka شامل ابزارهای طراحی سطح بالا می باشند. هدف از افزودن این ابزارها ارائه ی دوام و در دسترس بودن بیشتر است. برخی از ابزارهای تکثیر در زیر ذکر شده است :
• Create Topic Tool – این ابزار، موضوع با تعداد مشخصی بخش و فاکتور تکثیر ایجاد می کند و از طرح پیش فرض کافکا برای انجام replica assignment استفاده می کند.
• List Topic Tool – این ابزار اطلاعات مربوط به لیست مشخصی از موضوعات را فهرست می کند. اگر هیچ موضوعی در خط فرمان ارائه نشده باشد، ابزار برای گرفتن همه ی موضوعات و لیست اطلاعات مربوط به آنها، از Zookeeper گزارش می گیرد. زمینه هایی که ابزار نمایش می دهد نام موضوع، بخش، رهبر ، replicaها و غیره است.
• Add Partition Tool – در ایجاد یک موضوع، باید تعداد بخش های موضوع مشخص شود. سپس ممکن است با افزایش حجم موضوع بخش های بیشتری برای آن مورد نیاز باشد. این ابزار به اضافه کردن بخش های بیشتر برای یک موضوع خاص کمک می کند و همچنین به بخش های اضافه شده قابلیت replica assignment دستی اعطا می کند.
Apache Kafka – برنامه ها
Apache Kafka از بسیاری از بهترین اپلیکیشن های صنعتی امروزی پشتیبانی می کند. ما در این فصل به اختصار به برخی از قابل توجه ترین اپلیکیشن های کافکا خواهیم پرداخت:
Twitter
twitter یک سرویس شبکه های اجتماعی آنلاین است که بستری را برای ارسال و دریافت tweetهای کاربران فراهم می کند. کاربران ثبت نام شده می توانند توییت ها را بخوانند و آن ها را ارسال کنند، اما کاربران ثبت نام نشده فقط می توانند توییت ها را بخوانند. توییتر از Storm-Kafka به عنوان بخشی از زیرساخت های پردازش جریان آن استفاده می کند.
LinkedIn
Apache Kafka در LinkedIn برای داده های جریان فعالیت و معیارهای عملیاتی استفاده می شود. سیستم پیام رسان Kafka با استفاده از محصولات مختلف مانند LinkedIn Newsfeed ، برای استفاده از پیام رسانی آنلاین و سیستم های تحلیل آفلاین مانند Hadoop، به این برنامه کمک می کند. قابلیت دوام قوی کافکا نیز یکی از عوامل اصلی ارتباط آن با LinkedIn است.
Netflix
Netflix یک رسانه اینترنتی تقاضامحور در آمریکاست که از کافکا برای سیستم نظارت در لحظه و پردازش رویدادها استفاده می کند.
Mozilla
Mozilla یک نرم افزار مرورگر رایگان است که در سال 1998 توسط اعضای Netscape ایجاد شده است. کافکا به زودی جایگزین بخشی از سیستم تولید فعلی موزیلا را برای جمع آوری داده های عملکرد و داده های مربوط به میزان استفاده از مرورگر توسط کاربر نهایی برای پروژه هایی مانند Telemetry ، Test Pilot و غیره، خواهد شد.
Oracle
Oracle از محصول Enterprise Service Bus خود با نام OSB (Oracle Service Bus) اتصال به بومی را به کافکا فراهم می کند که به توسعه دهندگان این امکان را می دهد که برای اجرای خطوط داده ی مرحله ای، از قابلیت های میانجی داخلی OSB استفاده کنند.