pythonpython-3.xpython-asynciopython-aiofiles

Stopping asyncio program using file input


What specific code needs to change in the Python 3.12 example below in order for the program myReader.py to be successfully halted every time the line "Stop, damnit!" gets printed into sourceFile.txt by the program myWriter.py?

THE PROBLEM:

The problem is that myReader.py only sometimes stops when the line "Stop, damnit!" is printed into sourceFile.txt.

One workaround is to have myWriter.py continue to write "Stop, damnit!" again and again to sourceFile.txt. This can cause myReader.py to eventually halt. But the problem is that myWriter.py has to continue writing the same line for arbitrarily long periods of time. We have tested continuing for 15 minutes. But there might be situations in which myWriter.py might need to continue writing "Stop, damnit!" every second for 30 minutes. And there might be other times when myWriter.py might need to continue writing "Stop, damnit!" every second for only one or two minutes.

The problem seems to be that the API calls being made by myReader.py take variable amounts of time to return, so that the backlog can become arbitrarily long sometimes, but not always. And it seems that the myReader.py loop is not able to see the "Stop, damnit!" line unless and until the many asynchronous API call tasks have completed.

The solution would ideally involve having myReader.py actually hear and respond to a single writing of "Stop, damnit!" instead of needing to have "Stop, damnit!" written so many times.

WRITER PROGRAM:

The myWriter.py program writes a lot of things. But the relevant part of myWriter.py which writes the stop command is:

import time
#Repeat 900 times to test output. Sleep for 1 second between each.
for i in range(900):
  writeToFile("Stop, damnit!")
  time.sleep(1)

READER PROGRAM:

The relevant portion of myReader.py is as follows:

import os
import platform
import asyncio
import aiofiles

BATCH_SIZE = 10

def get_source_file_path():
  if platform.system() == 'Windows':
    return 'C:\\path\\to\\sourceFile.txt'
  else:
    return '/path/to/sourceFile.txt'

async def send_to_api(linesBuffer):
  success = runAPI(linesBuffer)
  return success

async def read_source_file():
  source_file_path = get_source_file_path()
  counter = 0
  print("Reading source file...")
  print("source_file_path: ", source_file_path)
  #Detect the size of the file located at source_file_path and store it in the variable file_size.
  file_size = os.path.getsize(source_file_path)
  print("file_size: ", file_size)
  taskCountList = []

  background_tasks = set()

  async with aiofiles.open(source_file_path, 'r') as source_file:
    await source_file.seek(0, os.SEEK_END)
    linesBuffer = []
    while True:
      # Always make sure that file_size is the current size:
      line = await source_file.readline()
      new_file_size = os.path.getsize(source_file_path)
      if new_file_size < file_size:
        print("The file has been truncated.")
        print("old file_size: ", file_size)
        print("new_file_size: ", new_file_size)
        await source_file.seek(0, os.SEEK_SET)
        file_size = new_file_size
        # Allocate a new list instead of clearing the current one
        linesBuffer = []
        counter = 0
        continue
      line = await source_file.readline()
      if line:
        new_line = str(counter) + " line: " + line
        print(new_line)
        linesBuffer.append(new_line)
        print("len(linesBuffer): ", len(linesBuffer))
        if len(linesBuffer) == BATCH_SIZE:
          print("sending to api...")
          task = asyncio.create_task(send_to_api(linesBuffer))
          background_tasks.add(task)
          task.add_done_callback(background_tasks.discard)
          pendingTasks = len(background_tasks)
          taskCountList.append(pendingTasks)
          print("")
          print("pendingTasks: ", pendingTasks)
          print("")
          # Do not clear the buffer; allocate a new one:
          linesBuffer = []
          counter += 1
          print("counter: ", counter)
        #detect whether or not the present line is the last line in the file.
        # If it is the last line in the file, then write whatever batch
        # we have even if it is not complete.
        if "Stop, damnit!" in line:
          #Print the next line 30 times to simulate a large file.
          for i in range(30):
            print("LAST LINE IN FILE FOUND.")
            #sleep for 1 second to simulate a large file.
            await asyncio.sleep(1)
          #Omitting other stuff for brevity.
          break
      else:
          await asyncio.sleep(0.1)

async def main():
  await read_source_file()

if __name__ == '__main__':
  asyncio.run(main())

Solution

  • Things to know:

    That first call would return a line, and the second call returns empty string because there's nothing else to read. So, you read "Stop, damnit!" in the first call then call readline again and get empty string.

    You can verify this by modifying your writer to write this: writeToFile("Stop, damnit!\nStop, damnit!") (maybe \r\n). This way you put two lines into the file at once, and the second call to readline actually reads something and the check to stop actually sees the message.

    Edit:

    Here's a couple examples to show you how it's behaving. Key changes:

    BATCH_SIZE = 1
    async def send_to_api(linesBuffer):
      print('processed lines', linesBuffer)
      return True
    

    With myWriter.py as:

    import time
    
    def writeToFile(text):
        with open("sourceFile.txt", "a") as f:
            f.write(text+"\n")
    
    for i in range(1,11):
        writeToFile(f"message {i}")
        time.sleep(0.2)
    
    writeToFile("Stop, damnit!\nStop, damnit!") # To make it stop
    

    we get output:

    processed lines ['0 line: message 5\n']
    processed lines ['1 line: message 8\n']
    processed lines ['2 line: Stop, damnit!\n']
    

    The majority of the messages are dropped.

    With myWriter.py as:

    def writeToFile(text):
        with open("sourceFile.txt", "a") as f:
            f.write(text+"\n")
    
    messages = []
    for i in range(1,11):
        messages.append(f"message {i}")
    writeToFile('\n'.join(messages))
    
    writeToFile("Stop, damnit!\nStop, damnit!")
    

    we get:

    processed lines ['0 line: message 2\n']
    processed lines ['1 line: message 4\n']
    processed lines ['2 line: message 6\n']
    processed lines ['3 line: message 8\n']
    processed lines ['4 line: message 10\n']
    processed lines ['5 line: Stop, damnit!\n']
    

    Here we see every other message dropped, since that first readline always drops what it read.