Introduction

See also the Distributed Repositories Using Shards and Federation Setup document which provides details on setting up distributed repositories.

A database in AllegroGraph is typically implemented as a single repository, running in a single instance of AllegroGraph. This is simple to set up and operate, but problems arise when the size of data in the repository nears or exceeds the resources of the server on which it resides. Problems can also arise when the size of data fits well within the specs of the database server, but the query patterns across that data stress the system.

When the demands of data or query patterns outpace the ability of a server to keep up, there are two ways to attempt to grow your system: vertical or horizontal scaling.

With vertical scaling, you simply increase the capacity of the server on which AllegroGraph is running. Increasing CPU power or the amount of RAM in a server can work for modest size data sets, but you may soon run into the limitations of the available hardware, or the cost to purchase high end hardware may become prohibitive.

AllegroGraph provides an alternative solution called FedShard™: a cluster of database servers, and horizontal scaling through sharding. An AllegroGraph cluster is a set of AllegroGraph installations across a defined set of machines. A distributed repository is a logical database comprised of one or more repositories spread across one or more of the AllegroGraph nodes in that cluster. A distributed repository has a required partition key that is used when importing statements. When adding statements to your repository, the partition key is used to determine on which shard each statement will be placed.

By carefully choosing your partition key, it is usually possible to distribute your data across the shards in ways that supports the query patterns of your application.

SPARQL queries issued against FedShard™ are run in modified form in parallel on all the shards that make up the distributed repository. The query results of each shard are in the end combined to create the overall query result. While a query is executing on one shard it has access to the knowledge base, but it is isolated from all other shards. See the SPARQL section for examples.

This tutorial walks you through how to define and install to a cluster, how to define a distributed repository, and how various utilities can be used to manipulate AllegroGraph clusters and distributed repositories.

Cluster Definition File

To support operation over a cluster of servers, AllegroGraph requires a Cluster Definition file. It is used to identify the servers on which AllegroGraph will be managed, as well as individual repositories, distributed repositories, and replica sets over which you may wish to operate. A replica set is multi-master replication cluster which is not the subject of this tutorial so will not be further described.

The first thing to do when defining a Cluster Definition File is to declare all of the servers that are part of the AllegroGraph cluster. For this tutorial we will use the following lines to define our cluster. It consists of two lines that declare inheritable values (Port and Scheme), and a logical group called `my-servers' that consists of three servers (aghost1, aghost2 and aghost3). We save these lines to a file named agcluster.cfg.

Port 10035  
Scheme http  
 
group my-servers  
  server aghost1.franz.com aghost1  
  server aghost2.franz.com aghost2  
  server aghost3.franz.com aghost3  

Based on the above configuration, we have defined a cluster of three AllegroGraph servers. A server is an AllegroGraph server (as opposed to a machine), and is uniquely identified by a hostname and port. A scheme (http or https) must be specified for each server. The scheme specifies the communication protocol to be used. Each server can be assigned a label for easy reference later in the configuration, as well as with some command-line tools. Since servers will often run on the same port (but on different machines) using the same scheme and often with the same authorization, it is possible to declare some values that will be inherited by subsequent declarations. The Port and Scheme directives here serve that purpose. The above declarations indicate that all servers in the cluster will be running on port 10035 via HTTP. As with the server nicknames, the name of group directive can be used later in the configuration to refer to all of the servers that belong to it, and can also be used with some command-line tools.

NOTE: If a Port directive is included in the agcluster.cfg file, it must match the corresponding entry in the agraph.cfg used by each AllegroGraph server. That is, agraph.cfg must also contain this line:

Port 10035  

Now that our cluster has been defined, we can define a distributed repository.

Distributed Repositories

Distributed Repositories store triples partitioned into shards across an AllegroGraph cluster as defined by the cluster definition file.

The data must be partitioned such that each query can run independently on each shard without needing to transmit data between them. For example, a health insurance company could partition its claim data on patient ID so that any given patient will have all of their triples in a single shard. In this case, queries that are about the properties of patients could be run in parallel. Note, however, that this particular partitioning strategy would not allow queries about the relationships between patients because that might require transmitting data between the shards in the repository.
Note too that some triples, such as those associating billing codes with diseases, must be replicated on each server.

To use a distributed repository, it must first be defined. Distributed repositories are a collection of shards that are grouped together into an entity we call, simply, a db. DBs, in the cluster context, are declared by adding a definition to the cluster definition file, agcluster.cfg.

When defining a DB, there are two ways to define its shards, the easy way or the complicated way.

With the easy approach, you list the servers on which shards will reside, and how many shards you want per server. AllegroGraph then automatically names the set of repositories that will comprise those shards. You can specify servers via the server directive, or groups of servers using the include directive.

In this example, we declare that the DB bigDB will have shards on the servers contained within the group my-servers and, based on the shardsPerServer directive, that there are three shards per server. Access to the servers will be authenticated with the given user (test) and password (xyzzy).

A distributed DB also has a partition key, which is either a triple part (subject, predicate, object, or graph), or a named triple attribute. Every triple added to the DB will be stored in a particular shard based on this key.

The partition key is required and cannot be changed once the DB has been created. Likewise, every triple must also contain the partition key or an error will be raised when it is added to the repository. The missing partition key error can only occur when the partition key is an attribute.

Here, we specify the partition key will be the `graph' part of each triple:

The following definition uses the easy approach.

Port 10035  
Scheme http  
 
group my-servers  
  server aghost1.franz.com  
  server aghost2.franz.com  
  server aghost3.franz.com  
 
db bigDB  
  key part         graph  
  user             test  
  password         xyzzy  
  shardsPerServer  3  
  include          my-servers  
  kb               aghost1.franz.com/kb-for-patients  
 
bindir /home/agraph/ag-7.0.2/bin 

The kb aghost1.franz.com/kb-for-patients defines the knowledge base repo. It will be merged with each shard when processing SPARQL queries. We will populate it below when we populate the bigDB patient database. See below for a version of this example where the kb repos are copied to each server. kb directives are discussed in the section More on the KB repositories in the Distributed Repositories Using Shards and Federation Setup document.

For more complicated declarations, you can explicitly declare individual shards using the repo directive. You can mix and match server and shardsPerServer declarations with repo directives and AllegroGraph will attempt to generate repository names that do not conflict. If it is unable to do, an error will be issued at the time the Cluster Definition File is parsed.

We've added a bindir directive to specify where we'll install AllegroGraph on each server. We decided that AllegroGraph will be installed at /home/agraph/ag-7.0.2 and so its bin directory containing the programs we'll need to run to control agraph is /home/agraph/ag-7.0.2/bin. The bindir directive is optional. You can specify that information on the install-agraph command line but if you choose to specify it there you cannot then use the agraph-control program found in the same directory as the install-agraph program to control the cluster since agraph-control needs the bindir directive to locate the installed AllegroGraph on each server.

If you do not include a bindir directive in your agcluster.cfg file you pass to install-agraph you will need to run the agraph-control program found in the installed AllegroGraph's bin directory as it will know where to find the bindir.

Thus we suggest that you include a bindir in the agcluster.cfg file as it makes it easier to use both install-agraph and agraph-control As you work on configuring your distributed repositories you'll use both programs multiple times.

Note that multiple DBs can be added to agcluster.cfg file.

The declaration of a distributed repository in agcluster.cfg does not cause the distributed repository or any of its shards to be created. The declaration simply informs the server that a certain name refers to a distributed repository defined in a certain way. Armed with this information the server will take any request regarding this repository name (such as create, open, delete, or triple-count) to refer to the distributed repository as defined in the declaration in agcluster.cfg.

The server reads the agcluster.cfg file when it starts. The install-agraph command we'll discuss below allows you to distribute a new agcluster.cfg file to all servers and then cause those servers to read the new agcluster.cfg file and update its knowledge of distributed dbs.

You should never change the definition of an existing distributed repository. Once created the structure of a distributed repository is fixed and if you change the definition after it has been created the server will no longer be able to access all parts of the repository.

Installing AllegroGraph on a Cluster

There are two ways of installing AllegroGraph:

  1. getting a distribution in tar format, extracting it and running install-agraph
  2. getting an RPM version, installing it and running configure-agraph.

In this document we will focus on method number 1. This is the easiest way to install the same version of AllegroGraph on a whole set of machines and to start and stop all servers. Also you can propagate cluster definition changes to all servers automatically.

Distributed repositories can be defined if you use the RPM method but you will need to do a number of steps manually.

The first step to installing AllegroGraph is to grab the tarball from the Franz download page and extract it. Inside the extracted directory is the installation utility install-agraph.

You'll want to keep this extracted directory around as you'll be using it to control and configure the cluster for the lifetime of your cluster.

Second, build your agcluster.cfg and agraph.cfg files and save them to disk. You'll need to specify the name of the directory where AllegroGraph will be installed. You must use the same directory name on all machines since the agraph.cfg file specifies the location of the AllegroGraph installation and the same agraph.cfg file will be used for every AllegroGraph server. Therefore you must specify a directory name that's available on all machines on which a server will be installed.

Each server must have its own installation directory. Do not use a directory on a network filesystem and have each server use the exact same directory. That configuration is not supported.

If you do not specify a pre-built agraph.cfg on the install-agraph command-line, you will be prompted with a set of questions, and a very basic agraph.cfg will be generated for you. We recommend building your own agraph.cfg as the configure-agraph script does not prompt for a number of directives which you might need, such as SSLPort or additional catalogs.

Here is a sample agraph.cfg:

# AllegroGraph configuration file  
Port 10035  
SuperUser super:xyzzy  
 
BaseDir /home/agraph/ag-7.0.2/  
 
SettingsDirectory data/settings  
LogDir log  
PidFile data/agraph.pid  
InstanceTimeout 604800  
 
<RootCatalog>  
 Main data/rootcatalog  
</RootCatalog>  
 
<SystemCatalog>  
 Main data/systemcatalog  
 InstanceTimeout 10  
</SystemCatalog>  
 
[License information added here] 

Third, make sure your hosts are all up and running and are configured for the following:

A typical local invocation of install-agraph looks as follows:

install-agraph --agraph-config agraph.cfg  /home/agraph/ag-7.0.2 

The destination directory /home/agraph/ag-7.0.2 is where AllegroGraph server will be installed. The specified directory must match the directory information specfied in the agraph.cfg file.

A cluster installation looks like this:

install-agraph --cluster-config ./agcluster.cfg --agraph-config ./agraph.cfg 

For clustered installations, a final destination-directory argument is optional and will only be used if there are servers defined in the agcluster.cfg file with no bindir directive associated with them. Since we include a bindir in our sample agcluster.cfg file, we don't need to include a destination-directory argument on the command-line shown above.

Various temporary files are created during the installation process. These are stored by default in /tmp on the various host machines. If /tmp does not have sufficient space, the installation will fail. The --staging-dir argument to install-agraph allows specifying a directory to use as the temporary directory. Its value must be an absolute pathname naming a directory accessible on every host in the cluster. The temporary files are removed at the end of the installation process.

The bindir directory in our agcluster.cfg file is /home/agraph/ag-7.0.2/bin. Let us select /home/agraph/ag-install-tmp/ as the staging directory. It will be created if it does not exist. Here is the modified call to install-agraph:

install-agraph --staging-dir /home/agraph/ag-install-tmp/ --cluster-config ./agcluster.cfg --agraph-config ./agraph.cfg  

As the installation runs, it will check that it has the proper access to each host (passwordless ssh and sudo, write permissions, etc). Furthermore, it will check for an existing AllegroGraph installation either based on any bindir directive associated with the server being installed to, or in the default-installation-dir (/home/agraph/ag-7.0.2). If there already exists an AllegroGraph installation the installation will be skipped on that node only.

After installation, the files specified by the --agraph-config and --cluster-config arguments are copied to lib/agraph.cfg and lib/agcluster.cfg, respectively, in the installation directory on each node. A bindir directive is added to the agcluster.cfg file if not already present. This occurs whether or not the installation is skipped. This behavior makes it easy to add new nodes to the cluster or update config files without performing a full install to all nodes. Simply update the config files and run the exact same command again.

Once the agcluster.cfg file is copied to each server the server, if it is running, is told to reload that agcluster.cfg file and update its set of distributed repository definitions.

Here is a sample run:

$ install-agraph --cluster ./agcluster.cfg --agraph-config ./agraph.cfg /disk1/ag  
Checking for passwordless ssh to all servers...ok.  
Checking for write permission on staging-dir /tmp/agtmp-7.0.0-2017-10-06-09-27-03... ok.  
Checking for write permission on installation-dir for each server... ok.  
Building plan for cluster installation...  
Checking for existing binary installations...  
Copying binaries for 3 new AllegroGraph installations.  
Copying AllegroGraph Server config file ./agraph.cfg to cluster.  
Copying AllegroGraph Cluster config file ./tmp/agraph-7.0.0/tmp_agclustera173210934071 to cluster.  
Removing staged installations from all hosts.  
Installation complete. 

The presence of agcluster.cfg in an installation enables other tools to also support operations on the cluster. If you deploy AllegroGraph to a cluster via other means, you can manually install an agcluster.cfg to gain the same extended feature set.

For manual tarball installations, agcluster.cfg should be added to the lib/ subdirectory of the installation dir.

For RPM installations the agcluster.cfg should be placed in /etc/agraph/.

Now that installation is complete, it is time to start AllegroGraph

Controlling AllegroGraph across a Cluster

The agraph-control utility is used to start and stop AllegroGraph on a single server. By adding the --cluster or --cluster-config argument, it will also work against the entire cluster.

The agraph-control program is found in the extracted tarball directory, the same directory as install-agraph and it's convenient to use it from there since you can switch between running install-agraph and agraph-control easily. If you use agraph-control in this directory you'll need to pass the --cluster-config argument specifying an agcluster.cfg file with a bindir directive (this is the same agcluster.cfg file given to install-agraph).

The agraph-control program is copied (if not already present) to the AllegroGraph installation directory on each server by install-agraph. If you use agraph-control in the AllegroGraph installation directory you need only specify --cluster and agraph-control will be able to find the agcluster.cfg file and then figure out from the 'bindir' value what is needed to start and stop AllegroGraph.

If agraph-control is not able to find the agcluster.cfg file, specify its location with the --cluster-config argument.

Similarly, agraph-control in the AllegroGraph installation directory can usually find the server's agraph.cfg file (needed to operate on a single server rather than all servers in a cluster). The --config argument can be used to specify the location of that file if agraph-control cannot find it.

In the examples below, we assume agraph-control can find, as needed, the agcluster.cfg file and the agraph.cfg file and so the examples do not use the --cluster-config or --cluster arguments.

To start AllegroGraph on all nodes:

$ agraph-control --cluster start  
Checking for passwordless ssh to all hosts...ok.  
Cluster definition has 3 hosts  
Issuing `start' command to all nodes... Done. 

The start command checks to see if AllegroGraph is already running, and if it is, does not try to start it again. This allows the start command to work properly on clusters where some nodes are running, but multiple stopped/crashed/new nodes now need to be started.

To stop AllegroGraph on only the local node:

$ agraph-control stop  
Stopping agraph (26547): ....  Stopped 

The status argument to agraph-control reports on the status of servers in the cluster:

$ agraph-control --cluster status  
Checking for passwordless ssh to all servers...ok.  
2 servers are UP: aghost2 aghost3  
1 server is DOWN: aghost1 

Now we stop the rest of the AllegroGraph nodes.

$ agraph-control --cluster stop  
Checking for passwordless ssh to all servers...ok.  
Issuing `stop' command to all nodes... Done. 

Now the status should show everything is shut down:

$ agraph-control --cluster status  
Checking for passwordless ssh to all servers...ok.  
All servers are DOWN. 

Creating the Shards of a Distributed Repository

Now that the distributed repository has been defined, and AllegroGraph started, the next step is to create the shards that comprise the DB.

Every server that's mentioned in the agcluster.cfg file has been started and each has read in the agcluster.cfg file. All servers thus know the definition of all of the distributed repositories defined in the agcluster.cfg file.

Given our agcluster.cfg file above all of the following will name the distributed repository bigDB:

http://aghost1.franz.com:10035/repositories/bigDB  
http://aghost2.franz.com:10035/repositories/bigDB  
http://aghost3.franz.com:10035/repositories/bigDB 

You can use the more concise server spec syntax and here are three more ways to name bigDB

aghost1.franz.com/bigDB  
aghost2.franz.com/bigDB  
aghost3.franz.com/bigDB 

If you want to do anything with the distributed repository (such as create, delete, augment or query) you must choose one of the servers. Each server is able to handle distributing the needed operation to all of the shards.

The agtool create-db command can be used to create repositories on the local node, or across an entire cluster. Its interface is as follows:

Usage: agtool create-db [ --cluster-config AGCLUSTER.CFG-FILENAME ]  
                        [ --supersede ]  [ --quiet ] [--reload]  
                        [ -v|--verbose ] [ --password PASSWORD ]  
                        [ -u|--user USER ]  
                        [ --host HOST ]  
                        DBNAME  
 
create-db will create one or more repositories depending on the  
value of the DBNAME argument.  The --cluster-config argument  
is only used when creating replica sets and is not used  
to create a distributed repository.  Only the *agcluster.cfg*  
read by the AllegroGraph server is used to define a  
distributed repository.  
 
Options:  
  --cluster-config AGCLUSTER.CFG-FILENAME    Specifies the agcluster.cfg file.  
                                             This is only used when creating  
                                             a replica set.  
  --supersede                                If there is an existing repository,  
                                             delete it first  
  --reload                                   Direct the AllegroGraph server to  
                                             reload the agcluster.cfg file that  
                                             it read on startup to learn of  
                                             the current distributed repository  
                                             definitions.  
 
Other options:  
  --quiet                                    Reduce output (default: no)  
  -v, --verbose                              Include more output  
 
Repository options:  
  --host HOST                                Host where the repository lives  
  	     					 (default: localhost)  
  --password PASSWORD                        Password for the repository; use  
  		 				 with --user (default: password)  
 
  -u USER, --user USER                       User name for the repository; use  
  	       	      				 with --password (default: username)  

We first create the knowledge base (kb-for-patients) repository. In this example the knowledge base repo lives on one server only. See below for a version of this example where the kb repos are copied to each server.

Run this command on aghost1:

agtool create-db kb-for-patients 

This repo will exist on aghost1 only.

Then we want to create all of the shards in our distributed DB, which we gave the label bigDB in our agcluster.cfg file. So, we invoke the following command-line (still on aghost1):

agtool create-db bigDB 

When this command completes, all shards for bigDB will have been created, and we can now undertake the process of populating them.

You need not use agtool create-db to create the distributed repository. Any method you've already been using in programs to create a normal repository in AllegroGraph can be used to create a distributed repository in the exact same way.

Adding data to a distributed repository

All of the ways of adding data to a single repository (such as agtool load or the various AllegroGraph client libraries) will also work for a distributed repository. For distributed repositories using attribute-based partitioning, all incoming triples must have the attribute specified by the db definition in the cluster configuration file.

We also provide a sample data generator using agtool generate. For this tutorial we generate sample data representing health insurance patients, and populate our db with it using the following command:

$ agtool generate patient-only aghost1.franz.com/bigDB 10000  
Generating data for 10,000 patient-onlys...  
Operation completed in 0h3m57s.  
Added 10,000 patient-onlys. 42.072144 patient-onlys/sec.  
Added 4,712,806 triples. 19827.785 triples/sec 

We also populate the knowledge base repo kb-for-patients:

$ agtool generate patient-kb aghost1.franz.com/kb-for-patients 10000  
Generating data for 10,000 patient-kbs...  
Operation completed in 0h0m1s.  
Added 10,000 patient-kbs. 9886.312 patient-kbs/sec.  
Added 19,342 triples. 19122.104 triples/sec 

In the first agtool generate call, patient-only indicates that we desire sample patient data, and the intended destination is the endpoint aghost1.franz.com/bigDB. The value 10000 instructs the generator how many patients we want data for. Each patient data set is comprised of roughly 800-900 triples. 10,000 keeps the number of triples generated under the 5,000,000 triple free limit.

When generating data for a partitioned DB, the generate command divides the job into one subtask per shard, each generating a portion of the patients. For example, if bigDB was comprised of 10 shards, there would be 10 subtasks, each generating 1000 patients. Once the command completes, each shard in our DB will be populated with data that we can query.

The generate patient-kb call puts about 18,000 triples in the kb-for-patients repo on aghost1.

Running SPARQL

The execution of a SPARQL query on a distributed repository involves the following:

  1. The user issues a SPARQL query against any of the AllegroGraph servers that are in the cluster. This server acts as the query coordinator responsible for the remaining steps.

  2. The coordinator rewrites the aggregate expressions in the SPARQL query.

  3. The coordinator sends each shard the subquery to execute.

  4. Each shard runs the subquery in isolation from other shards, and returns its results to the coordinator.

  5. The coordinator combines the subquery results from each shard in order to end up with the overall query result.

  6. The coordinator returns the overall result to the user.

Step 2, the rewriting of queries, involves rewriting the aggregation expressions at the the outer level of the query. Each aggregate calculation is split into an initial calculation done by each shard, followed by one final calculation done by the coordinator after receiving the shard results. If there is no aggregation, the query is sent as-is to the shards. The following examples illustrate this.

Example Query 1

If the user issues this query to find the people born on a certain date:

select ?personId ?name {  
  ?person <ex:personId> ?personId .  
  ?person <ex:birth> ?birth .  
  ?person <ex:name> ?name .  
  filter (?birth = '1960-04-27')  
} order by ?personId 

then each shard executes that query, sending an ordered list of (?personId ?name) pairs to the coordinator. This shard query execution and sorting happens in parallel.

The coordinator combines the sorted results it receives from each shard as if it executed the following pseudo query:

select ?personId ?name {  
  ?personId ?name  # all sorted rows received from each shard  
} order by ?personId 

which means that the sorted results per shard get merged together into the final ordered result. Because the result from each shard is already sorted, the final sorting does not take much effort.

Example Query 2

If the user issues this query to the distributed repository:

select (count(distinct *) as ?count) {  
  ?s <ex:pred> ?o  
} 

then each shard executes the subquery:

select distinct * {  
  ?s <ex:pred> ?o  
} 

which returns the distinct rows per shard. This means each shard will already remove duplicates from its results, which is done in parallel.

The coordinator receives the distinct rows per shard. As the shards have evaluated the subquery in isolation from each other, it can happen that two shards return the same row. Those duplicate rows are filtered out by the coordinator. What it does is the pseudo-query:

select (count(distinct *) as ?count) {  
  ?s ?o  # all rows returned by all shards  
} 

Example Query 3

For this user query:

select (count(*) as ?count) {  
  ?s <ex:pred> ?o  
} 

each shard will execute:

select (count(*) as ?subcount) {  
  ?s <ex:pred> ?o  
} 

which returns the number of matching triples in that shard. This calculation is run in parallel on every shard.

The coordinator will then compute the final aggregation value as:

select (sum(?subcount) as ?count) {  
  ?subcount  # one value returned per shard  
} 

which will be very fast as each shard only supplies one value to sum.

Example Query 4

For this user query:

select ?s (count(*) as ?count) {  
  ?s <ex:pred> ?o  
} group by ?s 

each shard will execute in parallel:

select ?s (count(*) as ?subcount) {  
  ?s <ex:pred> ?o  
} group by ?s 

and the coordinator will execute the pseudo-query:

select ?s (sum(?subcount) as ?count) {  
  ?s ?subcount  # all rows returned by all shards  
} group by ?s 

in order to calculate the total counts per group. The final summing will be fast, as for any ?s there are at most as many values as there are shards to sum.

Example Query 5

This user query finds the pairs of persons who share the same name:

select ?person ?person2 {  
  ?person <ex:name> ?name .  
  ?person2 <ex:name> ?name .  
  filter (?person != ?person2)  
} 

This query will be executed as-is on each shard.

The coordinator will do:

select ?person ?person2 {  
  ?person ?person2  # each row returned by each shard  
} 

If one shard contains two or more persons with the same name, all pairs among them will be returned by that shard.

However, if there are two persons in two different shards with the same name, they won't be found by this query as there is no communication between shards.

Also, if there is a pair (person1 person2) returned by shard 1, and (person3 person4) returned by shard 2, and they all have the same name, then this will not be recognized by the coordinator. The two pairs mentioned will be returned to the user, but not the pair (person1 person3) or the other combinations between the pairs.

It is thus important to choose the partition key in line with the kind of queries that will be run, to get the desired results given the isolated query execution per shard.

(It is also possible create a federated repository in which the query is executed in one place, and the query results would include all pairs of persons with the same name, not just those pairs of persons in the same shard.)

Example Query 6

This is a query you can run against generated patient data. It computes the frequency of each diagnosis.

prefix fr: <http://franz.com/>  
 
select (count(?node) as ?count) ?node where  
{  
  ?EmergencyEncounter fr:diagnosis ?PatientDiagnosis .  
  ?diagnosis rdfs:label ?node .  
  ?PatientDiagnosis fr:symValue ?diagnosis .  
}  
group by ?node  
order by desc(?count) 

As described for Example Query 3, each shard will calculated subcounts per ?node in parallel, and the coordinator will efficiently sum those subcounts to arrive at the total count per ?node.

Example Query 7

This query calculates the number of people who suffer from Lumbago:

prefix fr: <http://franz.com/>  
 
select (count(distinct ?Person) as ?count) where  
{  
  ?Person fr:claim ?OutpatientClaim .  
  ?OutpatientClaim fr:diagnosis ?PatientDiagnosis .  
  ?PatientDiagnosis fr:symValue ?diagnosis .  
  ?diagnosis rdfs:label "Lumbago" .  
} 

As described for Example Query 2, each shard sends its distinct list of patients, and the coordinator removes any remaining duplicates returned by different shards. This removing of duplicates by the coordinator may involve quite some work.

Example Query 8

The partitioning of the patient data is such that all triples related to one patient end up in the same shard. That means that the user as query writer can assume that patients in any shard are all distinct from patients in other shards.

That hints at a way to to make Example Query 7 more efficient. Namely, instead of having the coordinator receive all distinct patients from each shard, let each shard calculate its number of distinct patients, and let the shard return just that number:

select (sum(?count) as ?total) {  
{  
  select (count(distinct ?Person) as ?count) where  
  {  
    ?Person fr:claim ?OutpatientClaim .  
    ?OutpatientClaim fr:diagnosis ?PatientDiagnosis .  
    ?PatientDiagnosis <http://franz.com/symValue> ?diagnosis .  
    ?diagnosis rdfs:label "Lumbago" .  
   }  
  }  
} 

Each shard receives the subquery:

select (count(distinct ?Person) as ?subcount) where  
{  
  ?Person fr:claim ?OutpatientClaim .  
  ?OutpatientClaim fr:diagnosis ?PatientDiagnosis .  
  ?PatientDiagnosis <http://franz.com/symValue> ?diagnosis .  
  ?diagnosis rdfs:label "Lumbago" .  
} 

The coordinator will finally execute:

select (sum(?subcount) as ?total) {  
  ?subcount  # one value received from each shard  
} 

It is important to realize that Query 7 and Query 8 are equivalent only because the partitioning guarantees that every patient is counted by exactly one shard. If this were not the case, and one patient's claims were stored in multiple shards, a patient could be counted multiple times, so Query 8 might return a larger value than Query 7.

SPARQL on a Distributed versus Federated Repository

As can be seen from the queries above, when using a FedShard™: distributed repository it is important to choose the partition key in line with the expected queries that will be run, to get the desired results given the isolated query execution per shard.

It can happen that due the chosen partition key a query returns undesired results. For example we might wish to run Example Query 5 and receive all pairs, globally, instead of just the pairs found in each shard. For that case there is the option to create a federated repository in addition to the distributed repository, with the federated repository containing the same shards and knowledge base.

A SPARQL query issued against a federated repository will run in one place, and during its execution the query engine retrieves the relevant triple data (to match the query patterns) from the shards and knowledge base. As there is a single query execution with access to all triple data the query results will be global, whereas distributed repositories have the shard isolation during query execution which limits the results. If Example Query 5 were run on that federated store, it would return all pairs of persons with the same name, instead of only pairs found in each of the shards.

The query execution on a federated repository involves transmission of triple data from the shards and knowledge base, and there is no parallelism. For that reason queries generally perform better on a distributed repository. However if the shard isolation is undesired for certain queries, that might be an acceptable trade-off.

The same example but with replicated kb repos

Recall the agcluster.cfg file we have been using:

Port 10035  
Scheme http  
 
group my-servers  
  server aghost1.franz.com  
  server aghost2.franz.com  
  server aghost3.franz.com  
 
db bigDB  
  key part         graph  
  user             test  
  password         xyzzy  
  shardsPerServer  3  
  include          my-servers  
  kb               aghost1.franz.com/kb-for-patients 

Shards are distributed over three servers (aghost1.franz.com, aghost2.franz.com, and aghost3.franz.com) but the knowledge base kb repo is on just one, aghost1.franz.com. This means that SPARQL queries on shards on aghost2.franz.com and aghost3.franz.com will be federated with aghost1.franz.com/kb-for-patients over the network. There can be a significant performance hit when running a query on a federated repo where repos in the federation are on different servers because of the network communication overhead.

The solution is to have copies of the kb repo on each server. This does incur a space cost as (in our case) three copies of the kb repo use three times as much space as one, but trading space for speed is a common technique and kb repos can be much smaller than shard size (in our patient example, they are).

AllegroGraph has a mechanism for copying repos onto several servers and ensuring that they always have the same contents (modulo the network overhead when changes are made): multi-master replication, described in the Multi-master Replication document.

Let us make the changes necessary to achieve this. First here is a revised agcluster.cfg file:

Port 10035  
Scheme http  
 
group my-servers  
  server aghost1.franz.com  
  server aghost2.franz.com  
  server aghost3.franz.com  
 
db bigDB  
  key part         graph  
  user             test  
  password         xyzzy  
  shardsPerServer  3  
  include          my-servers  
  kb               localhost/kb-for-patients 

We have only changed the last line, from

  kb               aghost1.franz.com/kb-for-patients 

to

  kb               localhost/kb-for-patients 

In fact, if no server is specified in a kb directive, the KB is shard-local, meaning it inherits the host/port/scheme of the shard with which it will be federated:

  kb               kb-for-patients 

This is equivalent to localhost when the scheme and port have default values (http and 10035) which we assume they do in our examples.

A later step, after installing and starting the distributed cluster, is to create the kb repo. The instruction above was

We first create the knowledge base (kb-for-patients)  
repository. Run this command on `aghost1`:  
 
agtool create-db kb-for-patients 

We do not run that command. Instead, we create the repo as the controlling instance of a multi-master cluster and replicate it on the other servers with the following steps.

  1. Create the repo kb-for-patients on aghost1 as a multi-master cluster instance with the command, run on aghost1 using the agtool on that server:
  2. agtool repl create-cluster http://super:xyzzy@aghost1.franz.com/repositories/kb-for-patients kb-replica

kb-for-patients is the repo name and kb-replica is the (optional) instance name. We give a complete repository spec for clarity. The port defaults to 10035 and that is the port being used.

Now we replicate kb-for-patients on aghost2 and aghost3:

agtool repl grow-cluster \  
     http://super:xyzzy@aghost1.franz.com/repositories/kb-for-patients \  
     http://super:xyzzy@aghost2.franz.com/repositories/kb-for-patients  
 
agtool repl grow-cluster \  
     http://super:xyzzy@aghost1.franz.com/repositories/kb-for-patients \  
     http://super:xyzzy@aghost3.franz.com/repositories/kb-for-patients 

And we are done. When we populate kb-for-patients with the command (run on aghost1)

agtool generate patient-kb kb-for-patients 10000 

as done above, the data will be replicated in the kb-for-patients instances on aghost2 and aghost3. SPARQL queries on the distributed database will not be slower bacause of network overhead. The rest of the example continues just as above.