pythondata-sciencepipelinedata-extractionluigi

ı writing data process pipeline with luigi but ı get error


import os
import luigi
import pandas as pd
import requests as req
from bs4 import BeautifulSoup

class DownloadData(luigi.Task):

    def run(self):
        site = req.get("http://www.gutenberg.org/browse/scores/top").text
        with self.output().open("w") as f:
            f.write(site)

    def output(self):
        return luigi.LocalTarget("raw_data.txt")

    def complete(self):
        return os.path.exists(self.output().path)

class PrePData(luigi.Task):

    def requires(self):
        return DownloadData()

    def run(self):
        data = self.requires()
        bs4ed_data = []
        if data.contains("<!DOCTYPE html>"):
            bs4ed_data.append()(data,"html.parser")

        else:
            print("can not found any problem in this data")

        return bs4ed_data

    def output(self):
        return luigi.local_target("data.txt")

    def complete(self):
        return os.path.exists(self.output().path)

    def on_success(self):
        print("data preprocessing completed successfully")

    def on_failure(self):
        print("data preprocessing failed")

class RunAllTasks(luigi.WrapperTask):
    def requires(self):
        return [DownloadData(),PrePData()]

ı run this python file with this command in my terminal

python -m luigi --module PipeLineofETL-A RunAllTasks --local-scheduler --workers 4

and error

python -m luigi --module PipeLineofETL-A RunAllTasks --local-scheduler --workers 4
DEBUG: Checking if RunAllTasks() is complete
WARNING: Will not run RunAllTasks() or any dependencies due to error in complete() method:
Traceback (most recent call last):
  File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 429, in check_complete
    is_complete = check_complete_cached(task, completion_cache)
  File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 414, in check_complete_cached
    is_complete = task.complete()
  File "/home/tuna/.local/lib/python3.10/site-packages/luigi/task.py", line 845, in complete
    return all(r.complete() for r in flatten(self.requires()))
  File "/home/tuna/.local/lib/python3.10/site-packages/luigi/task.py", line 845, in <genexpr>
    return all(r.complete() for r in flatten(self.requires()))
  File "/home/tuna/Belgeler/GitLab/extractdata/ChatGPT's Basic tasks/PipeLineofETL-A.py", line 40, in complete
    return os.path.exists(self.output().path)
  File "/home/tuna/Belgeler/GitLab/extractdata/ChatGPT's Basic tasks/PipeLineofETL-A.py", line 37, in output
    return luigi.local_target("data.txt")
TypeError: 'module' object is not callable

INFO: Informed scheduler that task   RunAllTasks__99914b932b   has status   UNKNOWN
INFO: Done scheduling tasks
INFO: Running Worker with 4 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=1404147006, workers=4, host=tunapc, username=tuna, pid=9077) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 failed scheduling:
    - 1 RunAllTasks()

Did not run any tasks
This progress looks :( because there were tasks whose scheduling failed

===== Luigi Execution Summary =====

import os
import luigi
import pandas as pd
import requests as req
from bs4 import BeautifulSoup

class DownloadData(luigi.Task):

    def run(self):
        site = req.get("http://www.gutenberg.org/browse/scores/top").text
        with self.output().open("w") as f:
            f.write(site)

    def output(self):
        return luigi.LocalTarget("raw_data.txt")

    def complete(self):
        return os.path.exists(self.output().path)

class PrePData(luigi.Task):

    def requires(self):
        return DownloadData()

    def run(self):
        data = self.requires()
        bs4ed_data = []
        if data.contains("<!DOCTYPE html>"):
            bs4ed_data.append()(data,"html.parser")

        else:
            print("can not found any problem in this data")

        return bs4ed_data


class RunAllTasks(luigi.WrapperTask):
    def requires(self):
        return [DownloadData(),PrePData()]

ı write same command in terminal and ı get this error

DEBUG: Checking if RunAllTasks() is complete
/home/tuna/.local/lib/python3.10/site-packages/luigi/task.py:845: UserWarning: Task PrePData() without outputs has no custom complete() method
  return all(r.complete() for r in flatten(self.requires()))
DEBUG: Checking if DownloadData() is complete
DEBUG: Checking if PrePData() is complete
/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py:414: UserWarning: Task PrePData() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   RunAllTasks__99914b932b   has status   PENDING
INFO: Informed scheduler that task   PrePData__99914b932b   has status   PENDING
INFO: Informed scheduler that task   DownloadData__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 4 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: PrePData__99914b932b is currently run by worker Worker(salt=3997262702, workers=4, host=tunapc, username=tuna, pid=10617)
INFO: [pid 10624] Worker Worker(salt=3997262702, workers=4, host=tunapc, username=tuna, pid=10617) running   PrePData()
ERROR: [pid 10624] Worker Worker(salt=3997262702, workers=4, host=tunapc, username=tuna, pid=10617) failed    PrePData()
Traceback (most recent call last):
  File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 198, in run
    new_deps = self._run_get_new_deps()
  File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 138, in _run_get_new_deps
    task_gen = self.task.run()
  File "/home/tuna/Belgeler/GitLab/extractdata/ChatGPT's Basic tasks/PipeLineofETL-A.py", line 28, in run
    if data.contains("<!DOCTYPE html>"):
AttributeError: 'DownloadData' object has no attribute 'contains'
INFO: Informed scheduler that task   PrePData__99914b932b   has status   FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: There are 2 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=3997262702, workers=4, host=tunapc, username=tuna, pid=10617) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 1 complete ones were encountered:
    - 1 DownloadData()
* 1 failed:
    - 1 PrePData()
* 1 were left pending, among these:
    * 1 had failed dependencies:
        - 1 RunAllTasks()

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

when ı added output() method to DownloadData in requires function, ı get this error

DEBUG: Checking if RunAllTasks() is complete
/home/tuna/.local/lib/python3.10/site-packages/luigi/task.py:845: UserWarning: Task PrePData() without outputs has no custom complete() method
  return all(r.complete() for r in flatten(self.requires()))
DEBUG: Checking if DownloadData() is complete
DEBUG: Checking if PrePData() is complete
/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py:414: UserWarning: Task PrePData() without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   RunAllTasks__99914b932b   has status   PENDING
ERROR: Luigi unexpected framework error while scheduling RunAllTasks()
Traceback (most recent call last):
  File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 794, in add
    for next in self._add(item, is_complete):
  File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 892, in _add
    self._validate_dependency(d)
  File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 917, in _validate_dependency
    raise Exception('requires() can not return Target objects. Wrap it in an ExternalTask class')
Exception: requires() can not return Target objects. Wrap it in an ExternalTask class
INFO: Worker Worker(salt=6506578324, workers=4, host=tunapc, username=tuna, pid=10710) was stopped. Shutting down Keep-Alive thread
ERROR: Uncaught exception in luigi
Traceback (most recent call last):
  File "/home/tuna/.local/lib/python3.10/site-packages/luigi/retcodes.py", line 75, in run_with_retcodes
    worker = luigi.interface._run(argv).worker
  File "/home/tuna/.local/lib/python3.10/site-packages/luigi/interface.py", line 213, in _run
    return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
  File "/home/tuna/.local/lib/python3.10/site-packages/luigi/interface.py", line 171, in _schedule_and_run
    success &= worker.add(t, env_params.parallel_scheduling, env_params.parallel_scheduling_processes)
  File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 794, in add
    for next in self._add(item, is_complete):
  File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 892, in _add
    self._validate_dependency(d)
  File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 917, in _validate_dependency
    raise Exception('requires() can not return Target objects. Wrap it in an ExternalTask class')
Exception: requires() can not return Target objects. Wrap it in an ExternalTask class

Solution

  • You are getting the error in the first block because luigi.local_target is a module while luigi.LocalTarget is the class you were looking for.

    The second error is because you most likely don't want to be using self.requires directly in PrePData.run, but instead want to use self.input() (take a look at https://luigi.readthedocs.io/en/stable/tasks.html#task-run). self.input() will return the outputs of the required task, which in this case is DownloadData.

    Finally, there are a couple optimizations you can make to your code:

    1. In luigi, if a LocalTarget is specified as an output, it's mere existence signifies that the task is complete. This is actually the default implementation of Task.complete, so you don't need to reimplement it yourself.
    2. You don't need to specify all the tasks in RunAllTasks. Luigi will automatically discover required tasks and construct the requirements tree before resolving the entire tree. Therefore, you only need to specify the top-level tasks, which in this case is just the PrePData task.