See also the Dynamic Distributed Repositories Using Shards and Federation Setup document which provides details on setting up distributed repositories.

A database in AllegroGraph is typically implemented as a single repository, running in a single instance of AllegroGraph. This is simple to set up and operate, but problems arise when the size of data in the repository nears or exceeds the resources of the server on which it resides. Problems can also arise when the size of data fits well within the specs of the database server, but the query patterns across that data stress the system.

When the demands of data or query patterns outpace the ability of a server to keep up, there are two ways to attempt to grow your system: vertical or horizontal scaling.

With vertical scaling, you simply increase the capacity of the server on which AllegroGraph is running. Increasing CPU power or the amount of RAM in a server can work for modest size data sets, but you may soon run into the limitations of the available hardware, or the cost to purchase high end hardware may become prohibitive.

AllegroGraph provides an alternative solution called FedShard™: a cluster of database servers, and horizontal scaling through sharding. An AllegroGraph cluster is a set of AllegroGraph installations across a defined set of machines. A distributed repository is a logical database comprised of one or more repositories spread across one or more of the AllegroGraph nodes in that cluster. A distributed repository has a required partition key that is used when importing statements. When adding statements to your repository, the partition key is used to determine on which shard each statement will be placed.

By carefully choosing your partition key, it is usually possible to distribute your data across the shards in ways that supports the query patterns of your application.

SPARQL queries issued against FedShard™ are run in modified form in parallel on all the shards that make up the distributed repository. The query results of each shard are in the end combined to create the overall query result. While a query is executing on one shard it has access to the knowledge base, but it is isolated from all other shards. See the SPARQL section for examples.

This tutorial walks you through how to define and install to a cluster, how to define a distributed repository, and how various utilities can be used to manipulate AllegroGraph clusters and distributed repositories.

Defining Distributed Repositories.

You must define the structure of a distributed repository before you can create it. The definition format is shown here.

A distributed repository is stored in shards, each shard is stored on an AllegroGraph server. Each server may have house one or more shards.

In the definition of the repo you specify the servers and how to access them (i.e. user name and password) and in which catalog you want to shards stored.

You specify how many shards should be stored on each server. A large server with lots of memory and CPU cores can house many shards whereas you'll only want a few shards on a small server.

The definition file also specifies if shards should be replicated using multi-master replication (MMR).

If the shards should be federated with knowledge bases the definition will note that.

You put the definition in a text file and then

  agtool fedshard define file.txt 

and that definition is stored on the server.

Installing and starting servers

In the implemention described by this document the servers are assumed to already be installed and running. Therefore one can easily define and create distributed repositories on exisiting servers without disrupting existing uses of those servers.

(In the old implementation of distributed repositories all of the servers with shards were installed, setup and controlled from one place based on the definitions found in the agcluster.cfg file.)

Creating the Shards of a Distributed Repository

This is the definition of a distributed repo named fedshard:bigDB that has four shards and each shard is federated with the knowledge base kb-for-patients.

Later we will use the agtool generate functionality to create sample data for this repository.

   repo bigDB  
   key  part  
   secondary-key graph  
   user test  
   password xyzzy  
   scheme http  
   port 10035  
   host 127.1  
   shards-per-server 4  
   host 127.1  
   repo kb-for-patients  

Adding data to a distributed repository

With the above definition in the file bigdb.def we execute

  % agtool fedshard define bigdb.def 

and now the definition of fedshard:bigDB is known to the server. The repository consisting of four shards has not been created yet but the server now knows how to create all of the shards of the distributed repo.

We can't create fedshard:bigDB until the repo kb-for-patients exists since each shard must be federated with kb-for-patients. Here we show how to create empty repos of a given name but alternatively you can just use the agtool generate commands below to both create the repo and fill it.

  % agtool repos create kb-for-patients  
  % agtool repos create fedshard:bigDB  

Next we generate sample data for the repository (which is stored in the shards) and for the knowledge base

  % agtool generate patient-only fedshard:bigDB 10000  
  Generating data for 10,000 patient-onlys...  
  Operation completed in 0h1m34s.  
  Added 10,000 patient-onlys. 105.73258 patient-onlys/sec.  
  Added 4,712,806 triples. 49829.715 triples/sec  
  % agtool generate patient-kb kb-for-patients 10000  
  Generating data for 10,000 patient-kbs...  
  Operation completed in 793ms.  
  Added 10,000 patient-kbs. 12598.996 patient-kbs/sec.  
  Added 19,342 triples. 24368.979 triples/sec  

We can see how well the triples are split among the shards. It's intentional that the first shard have fewer triples since the first shard can never be split.

  % agtool fedshard list --count bigDB  
  Name: bigDB  
  Key: part  
  Secondary Key: graph  
  | Server ID | Scheme |  Host |  Port | User | Catalog | Shard Count |  
  |         0 |   http | 127.1 | 10035 | test |    root |           4 |  
  | Shard ID | Server ID |               Repo |   Count |  
  |        0 |         0 | shard-bigDB-s0-sh0 |    5808 |  
  |        1 |         0 | shard-bigDB-s0-sh1 | 1614834 |  
  |        2 |         0 | shard-bigDB-s0-sh2 | 1553791 |  
  |        3 |         0 | shard-bigDB-s0-sh3 | 1538373 |  
  |            Repo | Scheme |  Host |  Port | User | Catalog |  
  | kb-for-patients |   http | 127.1 | 10035 | test |    root |  

In the first agtool generate call, patient-only indicates that we desire sample patient data, and the intended destination is the repository fedshard:bigDB. The value 10000 instructs the generator how many patients we want data for. Each patient data set is comprised of roughly 800-900 triples. 10,000 keeps the number of triples generated under the 5,000,000 triple free limit.

When generating data for a partitioned DB, the generate command divides the job into one subtask per shard, each generating a portion of the patients. For example, if bigDB was comprised of 10 shards, there would be 10 subtasks, each generating 1000 patients. Once the command completes, each shard in our DB will be populated with data that we can query.

The generate patient-kb call puts about 18,000 triples in the kb-for-patients repo on aghost1.

In the next section we have some examples of distributed SPARQL queries. The patient data just created is used by queries 6 and on (queries 1-5 illustrate other details about distributed SPARQL queries).

Running SPARQL

The execution of a SPARQL query on a distributed repository involves the following:

  1. The user issues a SPARQL query against any of the AllegroGraph servers that are in the cluster. This server acts as the query coordinator responsible for the remaining steps.

  2. The coordinator rewrites the aggregate expressions in the SPARQL query.

  3. The coordinator sends each shard the subquery to execute.

  4. Each shard runs the subquery in isolation from other shards, and returns its results to the coordinator.

  5. The coordinator combines the subquery results from each shard in order to end up with the overall query result.

  6. The coordinator returns the overall result to the user.

Step 2, the rewriting of queries, involves rewriting the aggregation expressions at the the outer level of the query. Each aggregate calculation is split into an initial calculation done by each shard, followed by one final calculation done by the coordinator after receiving the shard results. If there is no aggregation, the query is sent as-is to the shards. The following examples illustrate this.

The first five queries do not use the patient data generated above. They are for illustration only. To use the patient data, start with query 6.

Example Query 1

If the user issues this query to find the people born on a certain date:

select ?personId ?name {  
  ?person <ex:personId> ?personId .  
  ?person <ex:birth> ?birth .  
  ?person <ex:name> ?name .  
  filter (?birth = '1960-04-27')  
} order by ?personId 

then each shard executes that query, sending an ordered list of (?personId ?name) pairs to the coordinator. This shard query execution and sorting happens in parallel.

The coordinator combines the sorted results it receives from each shard as if it executed the following pseudo query:

select ?personId ?name {  
  ?personId ?name  # all sorted rows received from each shard  
} order by ?personId 

which means that the sorted results per shard get merged together into the final ordered result. Because the result from each shard is already sorted, the final sorting does not take much effort.

Example Query 2

If the user issues this query to the distributed repository:

select (count(distinct *) as ?count) {  
  ?s <ex:pred> ?o  

then each shard executes the subquery:

select distinct * {  
  ?s <ex:pred> ?o  

which returns the distinct rows per shard. This means each shard will already remove duplicates from its results, which is done in parallel.

The coordinator receives the distinct rows per shard. As the shards have evaluated the subquery in isolation from each other, it can happen that two shards return the same row. Those duplicate rows are filtered out by the coordinator. What it does is the pseudo-query:

select (count(distinct *) as ?count) {  
  ?s ?o  # all rows returned by all shards  

Example Query 3

For this user query:

select (count(*) as ?count) {  
  ?s <ex:pred> ?o  

each shard will execute:

select (count(*) as ?subcount) {  
  ?s <ex:pred> ?o  

which returns the number of matching triples in that shard. This calculation is run in parallel on every shard.

The coordinator will then compute the final aggregation value as:

select (sum(?subcount) as ?count) {  
  ?subcount  # one value returned per shard  

which will be very fast as each shard only supplies one value to sum.

Example Query 4

For this user query:

select ?s (count(*) as ?count) {  
  ?s <ex:pred> ?o  
} group by ?s 

each shard will execute in parallel:

select ?s (count(*) as ?subcount) {  
  ?s <ex:pred> ?o  
} group by ?s 

and the coordinator will execute the pseudo-query:

select ?s (sum(?subcount) as ?count) {  
  ?s ?subcount  # all rows returned by all shards  
} group by ?s 

in order to calculate the total counts per group. The final summing will be fast, as for any ?s there are at most as many values as there are shards to sum.

Example Query 5

This user query finds the pairs of persons who share the same name:

select ?person ?person2 {  
  ?person <ex:name> ?name .  
  ?person2 <ex:name> ?name .  
  filter (?person != ?person2)  

This query will be executed as-is on each shard.

The coordinator will do:

select ?person ?person2 {  
  ?person ?person2  # each row returned by each shard  

If one shard contains two or more persons with the same name, all pairs among them will be returned by that shard.

However, if there are two persons in two different shards with the same name, they won't be found by this query as there is no communication between shards.

Also, if there is a pair (person1 person2) returned by shard 1, and (person3 person4) returned by shard 2, and they all have the same name, then this will not be recognized by the coordinator. The two pairs mentioned will be returned to the user, but not the pair (person1 person3) or the other combinations between the pairs.

It is thus important to choose the partition key in line with the kind of queries that will be run, to get the desired results given the isolated query execution per shard.

(It is also possible create a federated repository in which the query is executed in one place, and the query results would include all pairs of persons with the same name, not just those pairs of persons in the same shard.)

Example Query 6

This is a query you can run against generated patient data. It computes the frequency of each diagnosis.

prefix fr: <>  
select (count(?node) as ?count) ?node where  
  ?EmergencyEncounter fr:diagnosis ?PatientDiagnosis .  
  ?diagnosis rdfs:label ?node .  
  ?PatientDiagnosis fr:symValue ?diagnosis .  
group by ?node  
order by desc(?count) 

As described for Example Query 3, each shard will calculated subcounts per ?node in parallel, and the coordinator will efficiently sum those subcounts to arrive at the total count per ?node.

Example Query 7

This query calculates the number of people who suffer from Lumbago:

prefix fr: <>  
select (count(distinct ?Person) as ?count) where  
  ?Person fr:claim ?OutpatientClaim .  
  ?OutpatientClaim fr:diagnosis ?PatientDiagnosis .  
  ?PatientDiagnosis fr:symValue ?diagnosis .  
  ?diagnosis rdfs:label "Lumbago" .  

As described for Example Query 2, each shard sends its distinct list of patients, and the coordinator removes any remaining duplicates returned by different shards. This removing of duplicates by the coordinator may involve quite some work.

Example Query 8

The partitioning of the patient data is such that all triples related to one patient end up in the same shard. That means that the user as query writer can assume that patients in any shard are all distinct from patients in other shards.

That hints at a way to to make Example Query 7 more efficient. Namely, instead of having the coordinator receive all distinct patients from each shard, let each shard calculate its number of distinct patients, and let the shard return just that number:

prefix fr: <>  
select (sum(?count) as ?total) {  
  select (count(distinct ?Person) as ?count) where  
    ?Person fr:claim ?OutpatientClaim .  
    ?OutpatientClaim fr:diagnosis ?PatientDiagnosis .  
    ?PatientDiagnosis <> ?diagnosis .  
    ?diagnosis rdfs:label "Lumbago" .  

Each shard receives the subquery:

select (count(distinct ?Person) as ?subcount) where  
  ?Person fr:claim ?OutpatientClaim .  
  ?OutpatientClaim fr:diagnosis ?PatientDiagnosis .  
  ?PatientDiagnosis <> ?diagnosis .  
  ?diagnosis rdfs:label "Lumbago" .  

The coordinator will finally execute:

select (sum(?subcount) as ?total) {  
  ?subcount  # one value received from each shard  

It is important to realize that Query 7 and Query 8 are equivalent only because the partitioning guarantees that every patient is counted by exactly one shard. If this were not the case, and one patient's claims were stored in multiple shards, a patient could be counted multiple times, so Query 8 might return a larger value than Query 7.

SPARQL on a Distributed versus Federated Repository

As can be seen from the queries above, when using a FedShard™: distributed repository it is important to choose the partition key in line with the expected queries that will be run, to get the desired results given the isolated query execution per shard.

It can happen that due the chosen partition key a query returns undesired results. For example we might wish to run Example Query 5 and receive all pairs, globally, instead of just the pairs found in each shard. For that case there is the option to create a federated repository in addition to the distributed repository, with the federated repository containing the same shards and knowledge base.

A SPARQL query issued against a federated repository will run in one place, and during its execution the query engine retrieves the relevant triple data (to match the query patterns) from the shards and knowledge base. As there is a single query execution with access to all triple data the query results will be global, whereas distributed repositories have the shard isolation during query execution which limits the results. If Example Query 5 were run on that federated store, it would return all pairs of persons with the same name, instead of only pairs found in each of the shards.

The query execution on a federated repository involves transmission of triple data from the shards and knowledge base, and there is no parallelism. For that reason queries generally perform better on a distributed repository. However if the shard isolation is undesired for certain queries, that might be an acceptable trade-off.