Apache Ignite C++ Documentation

The apacheignite-cpp Developer Hub

Welcome to the apacheignite-cpp developer hub. You'll find comprehensive guides and documentation to help you start working with apacheignite-cpp as quickly as possible, as well as support if you get stuck. Let's jump right in!

Get Started    

Continuous Queries

Continuously obtain real-time query results.

Continuous Queries

Continuous queries enable you to listen to data modifications occurring on Ignite caches. Once a continuous query is started, you will get notified of all the data changes that fall into your query filter if any.

Continuous queries functionality is available via ContinuousQuery class that is elaborated below.

Initial Query

Whenever a continuous query is prepared for execution, you have an option to specify an initial query that will be executed before the continuous query gets registered in the cluster and before you start to receive the updates.

The initial query can be set with Cache.QueryContinuous(Query) method and can be of any query type: Scan, SQL, or TEXT.

Remote Filter

This filter is executed on primary and backup nodes for a given key, and evaluates whether an update should be propagated as an event to the query's local listener.

If the filter returns true, then the local listener will be notified. Otherwise, the notification will be skipped. Updates filtering on specific primary and backup nodes, on which they occur, allows reducing unnecessary network traffic between primary/backup nodes and local listeners executed on the application side.

Below you can see an example of a custom filter:

// User-defined filter class.
template<typename K, typename V>
struct RangeFilter : CacheEntryEventFilter<K, V>
{
    RangeFilter() :
      rangeBegin(0), rangeEnd(0) { }

    RangeFilter(const K& from, const K& to) :
      rangeBegin(from), rangeEnd(to) { }

    virtual ~RangeFilter() { }

    // Event callback. Should be defined for any filter.
    virtual bool Process(const CacheEntryEvent<K, V>& event)
    {
        return event.GetKey() >= rangeBegin && event.GetKey() < rangeEnd;
    }

    // Beginning of the range.
    K rangeBegin;

    // End of the range.
    K rangeEnd;
};

A remote filter instance can be set via a constructor of the ContinuousQuery class.

Since the filter implementation can be executed on a random cluster node, make sure that the filter is registered using the IgniteBinding::RegisterCacheEntryEventFilter() method on all the nodes. This can be done either by calling Ignite::GetBinding() or using IGNITE_EXPORTED_CALL void IgniteModuleInit(ignite::IgniteBindingContext&) method that is called upon a node startup:

// This callback called by Ignite on node startup and could be
// used to register code, that needs to be called remotely.
IGNITE_EXPORTED_CALL void IgniteModuleInit(ignite::IgniteBindingContext& context)
{
    IgniteBinding binding = context.GetBingding();

    binding.RegisterCacheEntryEventFilter< RangeFilter<int, TestEntry> >();
}

// Alternatively you can register it manually.
// Note, that you should only register every user class once, so choose one method.
void SomeUserFunction()
{
  //...
  Ignite node = Ignition::Get("SomeNode");
  IgniteBinding binding = node.GetBingding();
  binding.RegisterCacheEntryEventFilter< RangeFilter<int, TestEntry> >();
  //...
}

Local Listener

When a cache gets modified (an entry is inserted, updated or deleted), an event related to the update will be sent the continuous query's local listener so that your application can react accordingly.

Here is how the listener might look like:

// User-defined listener class.
template<typename K, typename V>
class Listener : public CacheEntryEventListener<K, V>
{
public:
    Listener() { }

    // Callback that is executed locally when an notification is received.
    virtual void OnEvent(const CacheEntryEvent<K, V>* evts, uint32_t num)
    {
        for (uint32_t i = 0; i < num; ++i)
            std::cout << "key=" << evts[i].GetKey() 
                      << ", val=" << evts[i].GetValue()
                      << std::endl;
    }
};

The local listener can be set via the ContinuousQuery.SetListener(Reference<CacheEntryEventListener<K, V>>) method or passed to the ContinuousQuery constructor, as shown below:

// Creating a listener.
Listener<int32_t, std::string> lsnr;

// Creating a filter. We are only insterested in entries with
// keys in range [5, 10), i.e. {5, 6, 7, 8, 9}.
RangeFilter<int32_t, std::string> filter(5, 10);

// Getting the cache.
Cache<int32_t, std::string> cache =
  ignite.GetCache<int32_t, std::string>("mycache");

// Instantiating a continuous query. Passing a copy of the listener.
ContinuousQuery<int32_t, std::string> qry(
  MakeReferenceFromCopy(lsnr), MakeReferenceFromCopy(filter));

// Setting an optional initial query.
// The initial query will return all the entries that are in the cache.
ContinuousQueryHandle<int32_t, std::string> handle =
  cache.QueryContinuous(qry, ScanQuery());

QueryCursor<int32_t, std::string> cursor = handle.GetInitialQueryCursor();

// Iterating over the initial's query result set.
while (cursor.HasNext())
{
  CacheEntry<int32_t, std::string> e = cursor.GetNext();

  std::cout << "key=" << e.GetKey() 
    << ", val=" << e.GetValue()
    << std::endl;
}

// Adding a few more cache entries.
// As a result, the local listener above will be called.
for (int32_t i = 0; i < 15; ++i)
{
  std::stringstream converter;
  converter << i;

  cache.Put(i, converter.str());
}

For details on ignite::Reference class, refer to the Objects Lifetime documentation section.

Events Delivery Guarantees

Continuous query implementation guarantees exactly once delivery of an event to the client's local listener.

It's feasible since every backup node(s) maintains an update queue in addition to the primary node. If the primary node crashes or a topology is changed for some other reason, then every backup node flushes the content of its internal queue to the client, making sure that there will be no event that was not delivered to the client's local listener.

To avoid duplicate notifications, in cases when all backup nodes flush their queues to the client, Ignite manages a per-partition update counter. Once an entry in some partition is updated, a counter for this partition is incremented on both primary and backups. The value of this counter is also sent along with the event notification to the client, which also maintains the copy of this mapping. If the client receives an update with the counter less than in its local map, this update is treated as a duplicate and discarded.

Once the client confirms that an event is received, the primary and backup nodes remove the record for this event from their backup queues.

Example

A complete example demonstrating the usage of continuous queries is delivered as a part of every Apache Ignite distribution and named continuous_query_example.cpp. The example is available in GitHub.