Running AG on AWS EC2¶
Federating multiple repositories located on the same server (as shown in Example 16: Federated repositories) can be useful for organizing data, but to truly explore the scalability potential of federated repositories we should look at a system consisting of multiple machines. This example will illustrate this by instantiating multiple AllegroGraph virtual machines on AWS, joining all servers in a single federated repository and issuing a simple query.
Warning
This example involves instantiating multiple AWS resources. These are not free and you will be charged by Amazon. While the cost should not exceed a few dollars, we are not responsible for any charges you might incur while running this tutorial.
To allow our script to interact with AWS we will need to install an additional dependency - boto3.
pip install boto3
We will also need to configure credentials to allow the script to
access the Amazon account that it should use. This can be done in
multiple ways. One possibility is to create a file
named ~/.aws/credentials
with the following contents:
[default]
aws_access_key_id = YOUR_KEY
aws_secret_access_key = YOUR_SECRET
The key and key id can be obtained from the web interface of AWS.
We’re now ready to begin writing the actual script. Let’s start with some imports:
import boto3
import atexit
import time
We will need the boto3
module to interact with AWS. We also want
to make sure that any resources we have provisioned will be properly
discarded when the scripts exits, even if it exits because of an
error. The atexit
module provides a convenient way of doing that.
Warning
While the mechanism of releasing resources used in this tutorial is reasonably robust, it might still fail in rare circumstances (e.g. if your computer loses power). It is important to manually check your AWS account for any stray resources after executing this tutorial to avoid unnecessary costs.
Now we will set a few configuration constants:
REGION = 'us-west-2'
AMI = 'ami-5616d82e'
TYPE = 't2.medium'
These describe the kind of servers that we will be provisioning, as
well as the AWS region in which our resources will reside. AMI
should be the image id of an AllegroGraph image appropriate for the
chosen region. Image identifiers can be found here. The
actual value shown above refers to the AllegroGraph 6.3.0 HVM image
for the us-west-2
region. TYPE
is the ‘instance type’ and
determines the amount of compute resources (RAM and CPU) that will be
available for each of our servers. The list of instance types can be
found here.
We will create a specified number of servers plus an additional one to manage the federation. The number below is the number of federated instances (i.e. it does not include the management instance).
INSTANCE_COUNT = 2
It is desirable to limit the connections to the system that we are creating so that only your machine will be able to access it. If you have a static IP address, fill it below. If not, leave the value as it is.
MY_IP = '0.0.0.0/0'
The next set of constants describe various aspect of the infrastructure that we are about to provision. There should be no need to adjust any values here.
VPC_CIDR = '10.0.0.0/16'
PUBLIC_CIDR = '10.0.0.0/24'
PRIVATE_CIDR = '10.0.1.0/24'
KEY_NAME = 'AG-TUTORIAL-KEYPAIR'
PROFILE_NAME = 'agprofile'
USER = 'ec2-user'
The first three values describe the structure of the virtual network
that will contain our machines. The network will consist of two
subnets - a public one which will contain the management instance and
a private one which will contain all other servers. Only machines in
the public subnet will be directly accessible over the
Internet. KEY_NAME
is a name that will be assigned to the
keypair that we will generate and use to connect to our
servers. PROFILE_NAME
is the name for an instance profile
that we will create for our AG servers. USER
is the system user on
the machines we want to connect to. Since AllegroGraph’s images are
based on Amazon Linux, that username must be set to ec2_user
.
We will now create handles for Amazon services that we want to use.
ec2 = boto3.resource('ec2', region_name=REGION)
ec2_client = boto3.client('ec2', region_name=REGION)
iam = boto3.resource('iam', region_name=REGION)
ssm = boto3.client('ssm', region_name=REGION)
These come in two variants, called resources
and
clients
. Clients provide low-level access to the Amazon API, while
resources are a high-level abstraction built over clients. Using
resources is slightly easier and thus preferable, but some operations
can only be performed with clients. In our case ec2_client
and
ssm
objects are clients for the respective AWS services, while
other handles are resources.
The first thing that we will create is a security role that will be assigned to our machines. This is necessary to allow the use of SSM, which we will need to execute scripts.
ssm_role = iam.create_role(RoleName='ssm',
AssumeRolePolicyDocument="""{
"Version":"2012-10-17",
"Statement":[
{
"Effect":"Allow",
"Principal":{
"Service": ["ssm.amazonaws.com", "ec2.amazonaws.com"]
},
"Action":"sts:AssumeRole"
}]}""")
atexit.register(ssm_role.delete)
The role created above can be assumed by EC2 instances and allows
access to SSM. We have also installed an atexit
handler to make
sure that the role is deleted once we are done. We will do the same
thing for every other AWS resource that we create.
To make all instances that have assumed the role defined above accessible to SSM we need to connect that role to the appropriate policy document.
ssm_arn = 'arn:aws:iam::aws:policy/service-role/AmazonEC2RoleforSSM'
ssm_role.attach_policy(PolicyArn=ssm_arn)
atexit.register(ssm_role.detach_policy, PolicyArn=ssm_arn)
Again, we have installed an atexit handler to undo the association. Without that we would not be able to remove the role itself.
Now we create an instance profile that we will use to launch instances using our new role.
instance_profile = iam.create_instance_profile(
InstanceProfileName=PROFILE_NAME
)
atexit.register(instance_profile.delete)
instance_profile.add_role(RoleName=ssm_role.name)
atexit.register(instance_profile.remove_role,
RoleName=ssm_role.name)
Now it is time to create the virtual network infrastructure for our system.
vpc = ec2.create_vpc(CidrBlock=VPC_CIDR)
atexit.register(vpc.delete)
public_subnet = vpc.create_subnet(CidrBlock=PUBLIC_CIDR)
atexit.register(public_subnet.delete)
private_subnet = vpc.create_subnet(CidrBlock=PRIVATE_CIDR)
atexit.register(private_subnet.delete)
We have two subnets - one for things that should be accessible from the Internet and one for all other machines. To make the public subnet work we need to add an Internet gateway to it.
internet_gateway = ec2.create_internet_gateway()
atexit.register(internet_gateway.delete)
internet_gateway.attach_to_vpc(VpcId=vpc.vpc_id)
atexit.register(internet_gateway.detach_from_vpc,
VpcId=vpc.vpc_id)
As usual, we have to install atexit
handlers to undo all
operations. This is important not only for operations that create
resources, but also for those that add associations, since AWS will
not allow us to remove a resource with existing associations.
Now we need to define a route table for the public subnet. The route table will basically tell our instances to use the Internet gateway that we have just created to communicate with the Internet.
public_route_table = vpc.create_route_table()
atexit.register(public_route_table.delete)
public_route_table.create_route(
DestinationCidrBlock='0.0.0.0/0',
GatewayId=internet_gateway.internet_gateway_id)
public_rt_assoc = public_route_table.associate_with_subnet(
SubnetId=public_subnet.id)
atexit.register(public_rt_assoc.delete)
Machines in the private subnet should not be accessible from the Internet, but must still be able to access external resources. To facilitate that we will create a NAT gateway in the private subnet. A NAT gateway must have a public IP address, so will get one first.
nat_eip = ec2_client.allocate_address(Domain='vpc')
atexit.register(ec2_client.release_address,
AllocationId=nat_eip['AllocationId'])
Before we actually create the gateway, we must ensure that we will be able to decommission it in a safe manner. The issue is that deleting a NAT gateway is not instantaneous and we cannot remove other resources (the VPC and subnet) until it is gone. So we will define a function that periodically checks the status of our gateway and blocks until it has been fully removed.
def wait_for_nat_gateway_termination(nat):
for repeat in range(40):
time.sleep(10)
response = ec2_client.describe_nat_gateways(
NatGatewayIds=[nat['NatGateway']['NatGatewayId']])
if not response.get('NatGateways', False):
return
if response['NatGateways'][0]['State'] in ('deleted', 'failed'):
return
raise Exception('NAT gateway refuses to go away')
boto3
uses so called waiters
to automate the process of wating
for a state change of an AWS resource. Unfortunately there is no
waiter
that checks for the termination of a NAT gateway, so we had
to write our own. We will use boto3
waiters in other parts of the
code.
We are now ready to create the gateway. Notce that the gateway itself must be a part of the public subent, even though it is meant to be used by the private subnet.
nat = ec2_client.create_nat_gateway(
AllocationId=nat_eip['AllocationId'],
SubnetId=public_subnet.id
)
atexit.register(wait_for_nat_gateway_termination, nat)
atexit.register(ec2_client.delete_nat_gateway,
NatGatewayId=nat['NatGateway']['NatGatewayId'])
Notice two atexit
handlers - one issues the delete command, the
other one waits for its completion. The handlers are executed in
reverse registration order, so the ‘waiting’ handler is registered
first. We will see this pattern again in the future.
Now we will wait until the NAT gateway is functional This might take a
while. Fortunately this time we can take advantage of a boto3
waiter:
ec2_client.get_waiter('nat_gateway_available').wait(
NatGatewayIds=[nat['NatGateway']['NatGatewayId']])
Now we need a route table for the private subnet
private_route_table = vpc.create_route_table()
atexit.register(private_route_table.delete)
private_route_table.create_route(
DestinationCidrBlock='0.0.0.0/0',
NatGatewayId=nat['NatGateway']['NatGatewayId'])
private_rt_assoc = private_route_table.associate_with_subnet(
SubnetId=private_subnet.id)
atexit.register(private_rt_assoc.delete)
The next pair of resources to create will be the security groups. These determine what is allowed to connect to what over the network. We will want different rules for private and public subnets.
public_security_group = vpc.create_security_group(
GroupName="ag-http-global",
Description="Allow SSH + HTTP connections to AG")
atexit.register(public_security_group.delete)
public_ip_permissions = [{
'IpProtocol': 'TCP',
'FromPort': 10035,
'ToPort': 10035,
'IpRanges': [{'CidrIp': MY_IP}]
}, {
'IpProtocol': 'TCP',
'FromPort': 16000,
'ToPort': 17000,
'IpRanges': [{'CidrIp': MY_IP}]
}, {
'IpProtocol': 'TCP',
'FromPort': 22,
'ToPort': 22,
'IpRanges': [{'CidrIp': MY_IP}]
}]
public_security_group.authorize_ingress(
IpPermissions=public_ip_permissions)
We allow SSH connections and HTTP connections to AG (both the frontend
(10035
) and the session ports (16000-17000
)) from the address
we defined above. Note that if you have not changed the default value
of MY_IP
the the system will be accessible from anywhere, which is
a security risk.
Private instances shall accept all connections, but only from machines within our virtual network.
private_security_group = vpc.create_security_group(
GroupName="all-internal",
Description="Allow all access from our network")
atexit.register(private_security_group.delete)
private_ip_permissions = [{
'IpProtocol': '-1',
'UserIdGroupPairs': [{'GroupId': private_security_group.id},
{'GroupId': public_security_group.id}]
}]
private_security_group.authorize_ingress(
IpPermissions=private_ip_permissions)
Now it is time to generate a keypair. This is simply the SSH key that we will use to control access to our machines.
key_pair = ec2.create_key_pair(KeyName=KEY_NAME)
atexit.register(key_pair.delete)
As mentioned above, we will want to use SSM to execute scripts on our servers. To do that we have to ensure that an SSM agent is installed on each machine. We can do this with the following script.
user_data = """#!/bin/bash
cd /tmp
sudo yum install -y https://s3.amazonaws.com/ec2-downloads-windows/SSMAgent/latest/linux_amd64/amazon-ssm-agent.rpm
sudo start amazon-ssm-agent"""
The script itself will be executed with cloud-init.
Now it is finally time to create our machines. Let us start with the management node.
public_instance = ec2.create_instances(
ImageId=AMI, InstanceType=TYPE,
UserData=user_data,
MinCount=1, MaxCount=1,
KeyName=KEY_NAME,
SecurityGroupIds=[
public_security_group.id,
],
IamInstanceProfile={
"Arn": instance_profile.arn
},
SubnetId=public_subnet.id)[0]
atexit.register(
ec2_client.get_waiter('instance_terminated').wait,
InstanceIds=[public_instance.id])
atexit.register(public_instance.terminate)
We pass our SSM installation script in the UserData
parameter. Notice how the atexit
handlers wait for the instance to
be fully deleted before proceeding to shutdown other systems.
Instances in the private subnet can be created in a similar way.
private_instances = ec2.create_instances(
ImageId=AMI, InstanceType=TYPE,
MinCount=INSTANCE_COUNT, MaxCount=INSTANCE_COUNT,
UserData=user_data,
KeyName=KEY_NAME,
SecurityGroupIds=[
private_security_group.id,
],
IamInstanceProfile={
"Arn": instance_profile.arn
},
SubnetId=private_subnet.id)
private_ids = [i.id for i in private_instances]
all_ids = [public_instance.id] + private_ids
atexit.register(
ec2_client.get_waiter('instance_terminated').wait,
InstanceIds=private_ids)
for instance in private_instances:
atexit.register(instance.terminate)
We gather the ids of all our instances in two lists. Now let us wait until all our instances are operational.
ec2_client.get_waiter('instance_status_ok').wait(
InstanceIds=all_ids)
We will need one more public IP address, to be used by the management instance that we need to connect to.
eip = ec2_client.allocate_address(Domain='vpc')
atexit.register(ec2_client.release_address,
AllocationId=eip['AllocationId'])
eip_assoc = ec2_client.associate_address(
AllocationId=eip['AllocationId'],
InstanceId=public_instance.id)
atexit.register(ec2_client.disassociate_address,
AssociationId=eip_assoc['AssociationId'])
We have installed the SSM agent on our machines, but it takes a moment for it to become operational. We will define yet another wait function to suspend the execution until then.
def wait_for_ssm_agents(instance_ids):
for repeat in range(40):
time.sleep(10)
response = ssm.describe_instance_information(
InstanceInformationFilterList=[{
'key': 'InstanceIds',
'valueSet': instance_ids
}])
num_responses = len(response.get(
'InstanceInformationList', []))
if num_responses != len(instance_ids):
# It is normal for an instance to not show up
# even if requested explicitly.
continue
for instance in response.get('InstanceInformationList'):
if instance['PingStatus'] != 'Online':
break
else:
return
raise Exception('Timed out waiting for SSM agents to activate.')
wait_for_ssm_agents(all_ids)
We will need one more function to wait for an SSM command to complete
its execution (unfortunately there is no built-in waiter for this in
boto3
).
def wait_for_ssm_command(command):
for repeat in range(40):
time.sleep(10)
response = ssm.list_commands(
CommandId=command['Command']['CommandId'])
if not response.get('Commands', False):
return
if response['Commands'][0]['Status'] in \
('Success', 'Cancelled', 'Failed', 'TimedOut'):
return
raise Exception(
'Timed out waiting for an SSM command to finish.')
Before we proceed, we should wait until all AG servers have started (at this point we know that the machines are up, but it takes a moment for AG itself to start processing requests). In a production system we would probably achieve this by using a dedicated monitoring solution or some other advanced mechanism, but to keep this tutorial simple we will just use SSM commands to poll the AG port until it starts responding.
wait_for_ag = [
'until curl -f http://127.0.0.1:10035/version; do sleep 2; done'
]
cmd = ssm.send_command(
InstanceIds=all_ids,
DocumentName='AWS-RunShellScript',
Parameters={'commands': wait_for_ag},
TimeoutSeconds=120)
wait_for_ssm_command(cmd)
We will now use SSM to send a script that will load sample data
into our instances. To keep things simple we will add just a single
triple per instance. That triple will include the index of the
instance (a number between 0
and INSTANCE_COUNT - 1
).
script = [
'ID=$(curl http://169.254.169.254/latest/meta-data/ami-launch-index)',
'curl -X PUT -u test:xyzzy http://127.0.0.1:10035/repositories/r',
' '.join(['curl -X PUT -u test:xyzzy http://127.0.0.1:10035/repositories/r/statement',
'--data-urlencode "subj=<ex://instance${ID}>"',
'--data-urlencode "pred=<ex://id>"',
'--data-urlencode "obj=\\"${ID}\\"^^<http://www.w3.org/2001/XMLSchema#integer>"'])
]
cmd = ssm.send_command(InstanceIds=private_ids,
DocumentName='AWS-RunShellScript',
Parameters={'commands': script})
wait_for_ssm_command(cmd)
Note that the waiter functions defined above are not particularly robust in their error handling. In a production system consisting of a larger number of instances a more elaborate mechanism for error detection and handling (retries) would have to be implemented.
Now we are ready to create the federation by connecting to the management machine. Notice that we use the default credentials for the AllegroGraph AMI.
from franz.openrdf.sail.allegrographserver import AllegroGraphServer
server = AllegroGraphServer(host=eip['PublicIp'],
user='test', password='xyzzy')
conn = server.openSession(
'+'.join('<http://test:xyzzy@%s/repositories/r>'
% i.private_ip_address
for i in private_instances))
And now we can issue a query:
from franz.openrdf.query.query import QueryLanguage
query = conn.prepareTupleQuery(QueryLanguage.SPARQL, """
SELECT (AVG(?o) as ?avg) { ?s ?p ?o }""")
query.evaluate(output=True)
This should print the average instance id, which should equal
(INSTANCE_COUNT - 1) / 2
.
-------
| avg |
=======
| 0.5 |
-------
That is all - the script is now done and will start tearing down the whole infrastructure.
Warning
Remember to check your AWS account for any leftover resources after running the tutorial to avoid unnecessary costs.