Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pytest: Implement fifo based communication #23279

Closed
3 tasks
karthiknadig opened this issue Apr 22, 2024 · 4 comments
Closed
3 tasks

pytest: Implement fifo based communication #23279

karthiknadig opened this issue Apr 22, 2024 · 4 comments
Assignees
Labels
area-internal Label for non-user facing issues area-testing feature-request Request for new features or functionality needs PR Ready to be worked on on-testplan Added to test plan
Milestone

Comments

@karthiknadig
Copy link
Member

Why do we need to communicate like this?
Pytest reports tests as it discovers/runs it. we moved to this model so User has feedback on what is going on. For repos with > 10k tests some times the discovery process can take long and there is no indication on what is going on. We used socket communication to send back test status to update the UI.

Notes for anyone who wants to understand why this is complicated:

  1. Stdio: pytest prints a lot of content to stdout, using stdout and stderr to parse useful information out is has error prone, and with plugins it can break easily.
  2. file output: pytest has the option to write to files, but that also mean you don't get live feedback. Another issue is that plugins make sometimes do not follow convention, like the markdown test plugin. This again leads to error prone parsing, and no life test results.
  3. TCP sockets: Plugins like the one mentioned here can block socket communication. Sockets can also be blocked due to filrewalls.
  4. Named Pipes: On windows this creates actual named pipes, on linux/mac Node currently only has UDS sockets. FIFOs are not available out of the box. We are working on providing a FIFO based solution for unix, but there are implementation complications as FIFOs are handled differently be each flavor of linux. We also need to make sure that work when you do parallel runs of tests.

Things to consider:

  • Should work with parallel runs
  • Should work even is sockets are disabled
  • Should report error when fifo is not available
@tongfa
Copy link

tongfa commented Aug 24, 2024

Reading point 1) caused me to question why the same data can't be sent over a stream similar to STDOUT / STDERR. I do understand the wisdom of not mixing data that's currently sent over the socket with STDOUT/STDERR.

I do think the fifo approach could work, but it does incur the burden of dealing with OS differences and creating something on disk.

I created a POC snippet of code to present yet another option that I think is appealing because:

  • OS differences are less
  • it's pipe based, so doesn't require creating a file or socket.

The idea is to create an extra pipe (file descriptor) that the child process can write to in addition to STDERR / STDOUT. This would be reserved for just the data that is currently communicated over a socket. A program run in the subprocess would need to be told a file descriptor number to write data to, instead of to a socket.

I initially tried to use a process.communicate approach. But I could not convince myself this approach could safely read large amounts of data from a pipe along with STDERR / STDOUT. All three streams need to be read simultaneously so as not to let a full one block the io loop. I didn't find a way to do this with process.communicate, but I didn't look too hard either. So I ended up going with asyncio.

Here is the POC snippet:

import asyncio
import os
from concurrent.futures import ThreadPoolExecutor

# This solution uses ThreadPoolExecutor to handle the synchronous read
#  operation from the extra file descriptor in a separate thread,
#  allowing it to be awaited in the asyncio event loop.
# without this I was getting an error something like: can't await on bytes
executor = ThreadPoolExecutor(1)


async def read_stream(stream, prefix):
    while True:
        data = await stream.read(1024)  # Read chunks of 1024 bytes
        if not data:
            break
        print(f"{prefix}: {data.decode().strip()}")


async def read_fd(fd, prefix):
    loop = asyncio.get_running_loop()
    with os.fdopen(fd, 'rb') as file:
        while True:
            # see ThreadPoolExecutor note above.
            # Reading a small amount of data (128) just to demo this loop is working. 
            data = await loop.run_in_executor(executor, file.read, 128)
            if not data:
                break
            print(f"{prefix}: {data.decode().strip()}")


async def run_subprocess_with_extra_fd():

    # this pipe is used as the extra file descriptor, it conceptually replaces the socket.
    extra_fd_read, extra_fd_write = os.pipe()
    os.set_blocking(extra_fd_read, False)

    # this mocks a command that would be modified to write to
    #  a specified file descriptor instead of a socket
    line_count = 30  # for very high numbers (3000) I get exit code -13 / 143, which is SIGKILL, maybe OOM killer?  Assuming problem is in mock script, which will get replaced anyways.
    command = [
        'bash',
        '-c',
        'echo "Hello, stdout"; '
        'echo "Hello, stderr" >&2; '
        f'echo "Hello, extra_fd" >&{extra_fd_write}; '
        # write a decent amount of dummy data to demonstrate this approach
        # reads from all streams simultaneously so they don't block
        f'for i in {{1..{line_count}}}; do '
        '    echo abcdefghij ; '
        '    echo ABCDEFGHIJK >&2; '
        f'    echo 0123456789 >&{extra_fd_write}; '
        'done; '
    ]

    process = await asyncio.create_subprocess_exec(
        *command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
        pass_fds=(extra_fd_write,)
    )

    os.close(extra_fd_write)  # Close the write end of the pipe in the parent process

    stdout_task = asyncio.create_task(read_stream(process.stdout, "STDOUT"))
    stderr_task = asyncio.create_task(read_stream(process.stderr, "STDERR"))
    extra_fd_task = asyncio.create_task(read_fd(extra_fd_read, "EXTRA_FD"))

    await asyncio.gather(stdout_task, stderr_task, extra_fd_task)

    await process.wait()
    print(f"Process exited with code {process.returncode}")

if __name__ == "__main__":
    asyncio.run(run_subprocess_with_extra_fd())

Please let me know what you think. thank you!

@karthiknadig
Copy link
Member Author

This should also work, but we need to launch the process from TypeScript. I agree FIFO has the problem of OS specific implementation. So, the extra pipe approach might actually work better. We will need to replace the calls in namedpipes.ts with the extra pipe approach.

@tongfa
Copy link

tongfa commented Aug 26, 2024

I might see a problem. The fact that the read and write sides of the pipe need to exist in two different processes that do not have simple process ancestry is something I did not anticipate. My POC code demonstrates one process creating the pipe and then the write side of that pipe is made available to a subprocess.

Given that the process that generates the data is launched elsewhere, I think the pipe would have to be created at a common ancestor process to both the python process that reads the data and the typescript process that spawns the process that generates the data. And (this is the kicker) it would have to happen before any of the subprocess that needed the file descriptors (both the read and write sides) were created, since the file descriptors are passed during process creation time by the OS. Further, if that common ancestor process is typescript / node I'm not sure what facilities there are to mimic what python's os.pipe() call does.

I'll recommend more POC level investigation into the extra pipe approach before committing to it.

eleanorjboyd added a commit that referenced this issue Sep 5, 2024
first step in work on
#23279

---------

Co-authored-by: Karthik Nadig <kanadig@microsoft.com>
anthonykim1 pushed a commit to anthonykim1/vscode-python that referenced this issue Sep 13, 2024
first step in work on
microsoft#23279

---------

Co-authored-by: Karthik Nadig <kanadig@microsoft.com>
@karthiknadig karthiknadig removed their assignment Dec 2, 2024
@eleanorjboyd
Copy link
Member

closed by d1d125a

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-internal Label for non-user facing issues area-testing feature-request Request for new features or functionality needs PR Ready to be worked on on-testplan Added to test plan
Projects
None yet
Development

No branches or pull requests

3 participants