Skip to content

flight

Arrow Flight utilities.

MultiEndpointStream

MultiEndpointStream(endpoints, initial_client)

Bases: AbstractContextManager

Multi-threaded Arrow Flight endpoint stream iterator context manager

Given a list of endpoints, connect to all of them in parallel and stream data from them all interleaved.

Source code in arrakis/flight.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def __init__(
    self,
    endpoints: list[flight.FlightEndpoint],
    initial_client: flight.FlightClient,
):
    """initialize with list of endpoints and an reusable flight client"""
    self.endpoints = endpoints
    self.initial_client = initial_client
    self.q: queue.SimpleQueue = queue.SimpleQueue()
    self.quit_event: threading.Event = threading.Event()
    self.executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=len(self.endpoints),
    )
    self.threads_done = {endpoint.serialize(): False for endpoint in endpoints}
    self.futures: list[concurrent.futures.Future] | None = None

__iter__

__iter__(timeout=constants.DEFAULT_QUEUE_TIMEOUT)

Execute the streams and yield the results

Yielded results are a tuple of the data chunk, and the endpoint it came from.

The timeout is expected to be a timedelta object.

Source code in arrakis/flight.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def __iter__(
    self,
    timeout: timedelta = constants.DEFAULT_QUEUE_TIMEOUT,
) -> Generator[
    flight.FlightStreamReader
    | tuple[flight.FlightStreamReader, flight.FlightEndpoint],
    None,
    None,
]:
    """Execute the streams and yield the results

    Yielded results are a tuple of the data chunk, and the
    endpoint it came from.

    The timeout is expected to be a timedelta object.

    """
    self.futures = [
        self.executor.submit(self._execute_endpoint, endpoint)
        for endpoint in self.endpoints
    ]

    while not all(self.threads_done.values()):
        try:
            data, endpoint = self.q.get(block=True, timeout=timeout.total_seconds())
        except queue.Empty:
            pass
        else:
            if data is EOS:
                self.threads_done[endpoint.serialize()] = True
            else:
                yield data, endpoint
        for future in self.futures:
            if future.done() and future.exception():
                self.quit_event.set()

close

close()

close all streams

Source code in arrakis/flight.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def close(self):
    """close all streams"""
    self.quit_event.set()
    if self.futures is not None:
        for f in self.futures:
            # re-raise exceptions to the client, returning
            # user-friendly Flight-specific errors when relevant
            try:
                f.result()
            except flight.FlightError as e:
                # NOTE: this strips the original message of everything
                # besides the original error message raised by the server
                msg = e.args[0].partition(" Detail:")[0]
                raise type(e)(msg, e.extra_info) from None

    self.executor.shutdown(cancel_futures=True)
    self.futures = None

unpack

unpack()

Unpack stream data into individual elements

Source code in arrakis/flight.py
297
298
299
300
def unpack(self):
    """Unpack stream data into individual elements"""
    for chunk, _ in self:
        yield from chunk.data.to_pylist()

RequestValidator

RequestValidator()

A validator for JSON-encoded requests.

Source code in arrakis/flight.py
60
61
62
63
64
65
66
67
def __init__(self) -> None:
    self._validators: dict[RequestType, jsonschema.Draft7Validator] = {}

    # load generic descriptor schema
    resource = resources.files(arrakis_schema).joinpath("descriptor.json")
    with resources.as_file(resource) as path:
        schema = json.loads(path.read_text())
    self._generic_validator = jsonschema.Draft7Validator(schema)

validate

validate(payload)

Validate a JSON-encoded request.

Parameters:

Name Type Description Default
payload Request

A dictionary with a 'request' and an 'args' key encoding the given Flight request.

required

Raises:

Type Description
ValidationError

If the request does not match the expected schema.

Source code in arrakis/flight.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def validate(self, payload: Request) -> None:
    """Validate a JSON-encoded request.

    Parameters
    ----------
    payload : Request
        A dictionary with a 'request' and an 'args' key encoding
        the given Flight request.

    Raises
    ------
    ValidationError
        If the request does not match the expected schema.

    """
    self._generic_validator.validate(payload)
    request = RequestType[payload["request"]]

    # load schema on demand
    if request not in self._validators:
        resource = resources.files(arrakis_schema).joinpath(
            f"{request.name.lower()}.json"
        )
        with resources.as_file(resource) as path:
            schema = json.loads(path.read_text())
        self._validators[request] = jsonschema.Draft7Validator(schema)

    self._validators[request].validate(payload)

create_command

create_command(request_type, *, validator, **kwargs)

Create a Flight command containing a JSON-encoded request.

Parameters:

Name Type Description Default
request_type RequestType

The type of request.

required
validator RequestValidator

A validator to validate that the command matches the expected schema.

required
**kwargs dict

Extra arguments corresponding to the specific request.

{}

Returns:

Type Description
bytes

The JSON-encoded request.

Raises:

Type Description
ValidationError

If the request does not match the expected schema.

Source code in arrakis/flight.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def create_command(
    request_type: RequestType, *, validator: RequestValidator, **kwargs
) -> bytes:
    """Create a Flight command containing a JSON-encoded request.

    Parameters
    ----------
    request_type : RequestType
        The type of request.
    validator : RequestValidator
        A validator to validate that the command matches the expected schema.
    **kwargs : dict, optional
        Extra arguments corresponding to the specific request.

    Returns
    -------
    bytes
        The JSON-encoded request.

    Raises
    ------
    ValidationError
        If the request does not match the expected schema.

    """
    cmd: Request = {
        "request": request_type.name,
        "args": kwargs,
    }
    validator.validate(cmd)
    return json.dumps(cmd).encode("utf-8")

create_descriptor

create_descriptor(request_type, *, validator, **kwargs)

Create a Flight descriptor given a request.

Parameters:

Name Type Description Default
request_type RequestType

The type of request.

required
validator RequestValidator

A validator to validate that the command matches the expected schema.

required
**kwargs dict

Extra arguments corresponding to the specific request.

{}

Returns:

Type Description
FlightDescriptor

A Flight Descriptor containing the request.

Raises:

Type Description
ValidationError

If the request does not match the expected schema.

Source code in arrakis/flight.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def create_descriptor(
    request_type: RequestType, *, validator: RequestValidator, **kwargs
) -> flight.FlightDescriptor:
    """Create a Flight descriptor given a request.

    Parameters
    ----------
    request_type : RequestType
        The type of request.
    validator : RequestValidator
        A validator to validate that the command matches the expected schema.
    **kwargs : dict, optional
        Extra arguments corresponding to the specific request.

    Returns
    -------
    flight.FlightDescriptor
        A Flight Descriptor containing the request.

    Raises
    ------
    ValidationError
        If the request does not match the expected schema.

    """
    cmd = create_command(request_type, validator=validator, **kwargs)
    return flight.FlightDescriptor.for_command(cmd)

parse_command

parse_command(cmd, *, validator)

Parse a Flight command into a request.

Parameters:

Name Type Description Default
cmd bytes

The JSON-encoded request.

required
validator RequestValidator

A validator to validate that the command matches the expected schema.

required

Returns:

Name Type Description
request_type RequestType

The type of request.

kwargs dict

Arguments corresponding to the specific request.

Raises:

Type Description
JSONDecodeError

If the command does not decode to valid JSON.

ValidationError

If the request does not match the expected schema.

Source code in arrakis/flight.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def parse_command(
    cmd: bytes, *, validator: RequestValidator
) -> tuple[RequestType, dict]:
    """Parse a Flight command into a request.

    Parameters
    ----------
    cmd : bytes
        The JSON-encoded request.
    validator : RequestValidator
        A validator to validate that the command matches the expected schema.

    Returns
    -------
    request_type : RequestType
        The type of request.
    kwargs : dict
        Arguments corresponding to the specific request.

    Raises
    ------
    JSONDecodeError
        If the command does not decode to valid JSON.
    ValidationError
        If the request does not match the expected schema.

    """
    try:
        parsed = json.loads(cmd.decode("utf-8"))
    except json.JSONDecodeError as e:
        msg = "Command does not decode to valid JSON"
        raise json.JSONDecodeError(msg, e.doc, e.pos) from e
    else:
        validator.validate(parsed)
        return RequestType[parsed["request"]], parsed["args"]