Development

Python SDK

You may have already learned that a3m comes with two executables: a3m and a3md. These are command-line interfaces wrapping a number of Python abstractions that we are also making available to software developers planning to build new applications embedding or communicating with a3m.

a3m.server.runner.create_server() is a function that helps you create your own instance of a3m.server.runner.Server, the gRPC server.

Use a3m.server.rpc.client.Client to communicate with it. a3m.cli.client.wrapper.ClientWrapper is a context manager that makes easier to access to both an embedded server and its client instance.

For more details, see: https://gist.github.com/sevein/2e5cf115c153df1cfc24f0f9d67f6d2a.

Warning

These APIs are still unstable, expect changes!

The following is an example of a web application that uses the development kit to embed a3m and make it available to web clients.

#!/usr/bin/env python
"""HTTP gateway for a3m.

When executed, this module starts a simple HTTP server that submits transfers
to an embedded a3m server instance on every GET request received.

Usage::

    $ pip install a3m
    $ ./webapp.py
    $ curl 127.0.0.1:33892

"""

from http.server import BaseHTTPRequestHandler
from http.server import ThreadingHTTPServer
from threading import Thread

import grpc

from a3m.cli.common import init_django

init_django()  # This will not be needed in the future.
from a3m.server.rpc.client import Client
from a3m.server.runner import create_server


class RequestHandler(BaseHTTPRequestHandler):
    def __init__(self, *args):
        super().__init__(*args)

    def do_GET(self):
        self.send_response(200)
        self.send_header("Content-Type", "text/html")
        self.end_headers()
        try:
            resp = a3mc.submit(
                name="Test",
                url="https://github.com/artefactual/archivematica-sampledata/raw/master/SampleTransfers/ZippedDirectoryTransfers/DemoTransferCSV.zip",
            )
        except Exception as err:
            self.wfile.write(f"Error: {err}".encode())
        else:
            self.wfile.write(f"Transfer submitted! {resp.id}".encode())


a3md = create_server(
    bind_address="127.0.0.1:0",
    server_credentials=grpc.local_server_credentials(
        grpc.LocalConnectionType.LOCAL_TCP
    ),
    max_concurrent_packages=1,
    batch_size=125,
    queue_workers=3,
    grpc_workers=3,
)
a3md_thread = Thread(target=a3md.start)
a3md_thread.start()

httpd = ThreadingHTTPServer(("127.0.0.1", 0), RequestHandler)
httpd_thread = Thread(target=httpd.serve_forever)
httpd_thread.start()
print(f"Web server listening on port {httpd.server_port}/tcp.")

a3mc = Client(grpc.insecure_channel(f"127.0.0.1:{a3md.grpc_port}"))

a3md_thread.join()

gRPC API

Whether you are embedding a3m or communicating with remote instances, its gRPC API is the underlying communication system and you should be able to put it in practice given any of the languages supported by the gRPC stack.

gRPC uses Protocol Buffers as the Interface Definition Language (IDL) for describing both the service interface and the structure of the payload messages.

So far the whole definition of messages and services fits in a single file that we share below. Writing your custom client isn’t hard because the stubs are automatically generated. Alternatively, it is possible to use a client such as grpccurl which dynamically browses our service schema.

Find the generated documentation of the a3m API at buf.build/artefactual/a3m.

Reference

a3m.server.runner.create_server(bind_address, server_credentials, max_concurrent_packages, batch_size, queue_workers, grpc_workers, debug=False)[source]

Create a3m server ready to use.

It bootstraps some bits locally needed, like the database, the local processing directory or the pool of threads. It wraps a3m.server.runner.Server.

class a3m.server.runner.Server(bind_address: str, server_credentials: ServerCredentials | None, workflow: Workflow, max_concurrent_packages: int, batch_size: int, queue_executor: ThreadPoolExecutor, grpc_executor: ThreadPoolExecutor, debug: bool = False)[source]

a3m server.

It runs the gRPC API server and the workflow engine, using independent pools of threads. It accepts a a3m.server.workflow.Workflow which can be customized as needed.

class a3m.server.rpc.client.Client(channel: Channel, rpc_timeout: int | None = 30, wait_for_ready: bool = False)[source]

a3m gRPC API client.

class a3m.cli.client.wrapper.ClientWrapper(address=None, wait_for_ready=False)[source]

A context manager that provides a a3m client or client-server instance.

Use address to indicate the location of the a3m server. When undefined, this wrapper launches an embedded server and sets up the client accordingly. Used resources are automatically cleaned up.