What happened so far
Streaming data from Kafka to Ignite and processing it with horizontal and vertical scalability has been covered in the first four parts. At this point, we got our processed data in the cache – so we want to either persist that data or set up some expiration policy so we don’t run out of space. Since we want to make sure everything works as intended, we’ll also start setting up a monitoring.
Obviously, we need to decide what we want to do with our data once we’re finished processing it.
In case we’re just interested in the live-/realtime-data, we might want to set up an expiration policy so we don’t run out of RAM.
Apache Ignite offers a few different methods of expiration.
In our case, the data scientist might just want to see the most recent bicycle data – let’s say last 10 minutes.
As always with Apache Ignite, we have two options how we can set things up:
Configuration via Java code or configuration via XML. Java would mean we’d have to rebuild every time we’re just adjusting some settings.
Here’s an example how your CacheConfiguration could look like:
<bean class="org.apache.ignite.configuration.CacheConfiguration"> <property name="dataRegionName" value="streaming_region"/> <property name="expiryPolicyFactory"> <bean id="expiryPolicy" class="javax.cache.expiry.CreatedExpiryPolicy" factory-method="factoryOf"> <constructor-arg> <bean class="javax.cache.expiry.Duration"> <constructor-arg value="SECONDS"/> <constructor-arg value="600"/> </bean> </constructor-arg> </bean> </property> <property name="eagerTtl" value="true"/> <property name="name" value="GpsPoint"/> <property name="backups" value="0"/> <property name="cacheMode" value="PARTITIONED"/> <property name="atomicityMode" value="ATOMIC"/> <property name="copyOnRead" value="true"/> </bean>
Basically, we tell Ignite which dataRegion the cache should use (you can define the max RAM usage, persistence,.., per region) and also what type of ExpirationPolicy it should use.
For our case, we’re just interested in when the entry was created – thus we’re using the CreatedExpiryPolicy. From now on, any entry in the cache “GpsPoint” will be set to “expired” after ten minutes.
EagerTtl makes sure the data will be actively removed by a task.
By the way – if you still want to preserve the data that is expiring, that’s totally possible.
You can enable certain event types via configuration and catch them with a continuous query. However, performance wise, it’s not recommended enabling too many events.
Similar to the expiration policies it’s pretty easy to enable Apache Ignites’ Native Persistence. As mentioned before, you can set up different data regions in Apache Ignite. Simply add
<property name="persistenceEnabled" value="true"/>
to one of them in your XML and Apache Ignite will start persisting your data.
How does it work though? The idea behind Apache Ignites’ Native Persistence is that there’s a fluent transition between the in-memory and the hard-drive storage layer. That means, hot data will be kept in-memory, while older data will only be available from hard-drive.
The developer doesn’t need to know where the data is actually stored, which is a huge advantage.
However, especially when dealing with great amounts of data like in our project, you’ll notice that the performance will be quite awful at first.
The default setting for Ignites’ Write Ahead Log (WAL) will do a fsync() on each update – which means your data would survive a complete OS crash.
When dealing with non-critical data like our bicycle data, we can get away using BACKGROUND WAL mode – which means it will accumulate new data over a certain time and the flush it as a batch. This will vastly improve the write-speed, but a system crash would mean that you’d lose some of the most recent data.
Aside from that, it’s recommended writing your WAL and the actual data on different hard-drives (SSD>HDD obviously). Both paths can be adjusted via the XML config. Since we’re working within Docker, you’ll need to mount a volume from each drive in your container and point the config at it.
Furthermore, you can adjust page size and some system settings to further improve performance.
Prometheus + Grafana
We want to make sure our data is processed as expected at any given time – and while test-driven development is always a good idea, it’s also necessary to have a real-time monitoring to catch some unusual behavior.
In addition to that, we want to compare Ignites’ performance with the other teams – which means we need to measure the throughput.
We decided that we want to use two state-of-the-art techs to achieve that.
Prometheus is scraping-based – meaning that your software usually exposes data which will be actively be gathered by Prometheus. For our project, we’ll push our metrics in a Pushgateway instead, which will be scraped by Prometheus.
Since we’re working with Docker it’s rather easy to add new containers to the stack (I’ve added a volume for each Grafana & Prometheus):
prometheus: image: prom/prometheus volumes: - ./prometheus/:/etc/prometheus/ - prometheus_data:/prometheus command: - '--config.file=/etc/prometheus/prometheus.yml' - '--storage.tsdb.path=/prometheus' - '--web.console.libraries=/usr/share/prometheus/console_libraries' - '--web.console.templates=/usr/share/prometheus/consoles' ports: - 9090:9090 links: - pushgateway:pushgateway restart: always container_name: prometheus pushgateway: image: prom/pushgateway ports: - 9091:9091 container_name: pushgateway
Config-wise, we only need to create a prometheus.yml which tells Prometheus to scrape from the Pushgateway:
global: scrape_interval: 15s# By default, scrape targets every 15 seconds. evaluation_interval: 15s# By default, scrape targets every 15 seconds. scrape_configs: - job_name: 'prometheus' scrape_interval: 5s static_configs: - targets: ['localhost:9090'] - job_name: 'pushgateway' scrape_interval: 5s honor_labels: true static_configs: - targets: ['pushgateway:9091']
To learn how to actively push metrics to your Pushgateway from your Java code, start here.
Remember that gathering data from operators that are being executed a thousand times a second will hit the performance. Don’t calculate the average run duration of a function with each run, rather add the milliseconds and the count and calculate it when you’re actually pushing.
grafana: image: grafana/grafana depends_on: - prometheus ports: - 3000:3000 volumes: - grafana_data:/var/lib/grafana env_file: - grafana/config.monitoring container_name: grafana
Just like above it just takes a few lines to add Grafana to the mix.
The config.monitoring only contains two lines in my local testing environment:
And that’s all it takes – well, at least to get started.
Running the cluster
After building my code, the docker container and preparing all volumes, I’m starting my shell script that executes a docker-compose up and starts connect-standalone tasks on the Kafka Connect container, one for the GpsPoints and one for the AccelerationPoints.
You can find more details on that in part two of this series.
Once your Docker containers are running, you should be able to access your Grafana UI on localhost:3000.
After logging in with the credentials defined in your config.monitoring you’ll need to add Prometheus as your data source (prometheus:9090).
Now, you can start to build your dashboard. Prometheus has a decent intro to the querying basics available here.
If you got everything running so far – congrats, you’ve basically implemented everything essential for your stream processing application.
An overview of our current components:
- Emitting from a source
- Queuing in Kafka
- Consuming with Ignite
- Processing with Ignite
- Monitoring with Prometheus+Grafana
This means we’re essentially done for this tutorial.
However, there will be a next, but last part of this series:
Replacing Kafka Connect with Apache Flume for better performance, measuring performance on different machines and comparing the results with the other teams.
~ Sven Goly