python-3.xpython-2.7apisplunksplunk-sdk

Splunk Python SDK API job.results limited to 50k results. Trying to set an offset to pull multiple chunks of 50k but don't know how to get it to work


I have a job who's job['resultCount'] is 367k, but no matter what I do, I can't seem to pull more than the first 50,000 chunk.

I read this chunk of code off of an answer here for someone who had a similar end goal and setup: https://answers.splunk.com/answers/114045/python-sdk-results-resultsreader-extremely-slow.html

rs = job.results(count=maxRecords, offset=self._offset)
results.ResultsReader(io.BufferedReader(ResponseReaderWrapper(rs)))

I wrote the below code around that and I've fiddled around a bit with it, but I can't get offset=self._offset to do anything and I have no idea what it's supposed to be doing.

class SplunkConnector(object):
def __init__(self, username, password, customerGuid):
    self.username = username
    self.password = password
    self.customerGuid = customerGuid
    flag = True
    while flag:
        try:
            self.service = client.connect(host=*****, port=8089, username=self.username, password=self.password, scheme='https')
            flag = False
        except binding.HTTPError as e:
            json_log.debug(str(e))

def search(self, query_dict):
    query = query_dict['search']
    label = query_dict['label']
    search_headers = query_dict['headers']
    customer = query_dict['customer']
    customerGuid = query_dict['customerGuid']
    try:
        earliest_time = query_dict['earliest_time']
        latest_time = query_dict['latest_time']
    except KeyError:
        earliest_time = '-1d@d'
        latest_time = '@d'
    json_log.debug('Starting %s customerGuid=%s' % (label, self.customerGuid))
    kwargs_normalsearch = {'exec_mode': 'normal', 'earliest_time': earliest_time, 'latest_time': latest_time, 'output_mode': 'csv'}
    job = self.service.jobs.create(query + ' | fillnull value="---"', **kwargs_normalsearch)
    while True:
        try:
            while not job.is_ready():
                pass
            stats = {"isDone": job["isDone"],
                     "label": label,
                     "customer": customer,
                     "customerGuid": customerGuid,
                     "doneProgress": float(job["doneProgress"]) * 100,
                     "scanCount": int(job["scanCount"]),
                     "eventCount": int(job["eventCount"]),
                     "resultCount": int(job["resultCount"])}

            json_log.debug(stats)

            if stats["isDone"] == "1":
                json_log.debug("\n\nDone!\n\n")
                break
            sleep(2)
            stats = {"isDone": job["isDone"],
                     "label": label,
                     "customer": customer,
                     "customerGuid": customerGuid,
                     "doneProgress": float(job["doneProgress"]) * 100}

            json_log.debug(stats)

            if stats["isDone"] == "1":
                json_log.debug('Search %s finished for customerGuid=%s'
                               % (label, customerGuid))
                break
            sleep(2)

        except binding.HTTPError as e:
            json_log.debug(str(e))
            pass
        except AttributeError:
            stats = {"isDone": job["isDone"],
                     "label": label,
                     "customer": customer,
                     "customerGuid": customerGuid,
                     "doneProgress": float(job["doneProgress"]) * 100}

            json_log.debug(stats)

            if stats["isDone"] == "1":
                json_log.debug('Search %s finished for customerGuid=%s'
                               % (label, customerGuid))
                break
            sleep(2)

    # Get the results and display them
    result_count = job['resultCount']
    rs = job.results(count=0)
    rr = results.ResultsReader(io.BufferedReader(rs))
    results_list = []
    for result in rr:
        if isinstance(result, results.Message):
            # Diagnostic messages may be returned in the results
            json_log.debug('%s: %s label=%s customerGuid=%s'
                           % (result.type, result.message, label, customerGuid))
        elif isinstance(result, dict):
            # Normal events are returned as dicts
            keys, values = [], []

            for header in search_headers:
                if header not in result.keys():
                    print(header)
                    result[header] = ''

            for key, value in result.items():
                if key in search_headers:
                    keys.append(str(key))
                    values.append(str(value))
            if not results_list == []:
                results_list.append(values)
            else:
                results_list.append(keys)
                results_list.append(values)

    output = io.BytesIO()
    writer = csv.writer(output, delimiter=',')
    writer.writerows(results_list)
    output_string = output.getvalue()
    assert rr.is_preview is False

    job.cancel()
    return [label, output_string.replace('\r\n', '\n').replace('---', '')]

    def searches(self, query_list):
        print(query_list)
        if type(query_list) == dict:
            query_list = [value for value in query_list.values()]
        with closing(ThreadPool(processes=len(query_list))) as pool:
            results = pool.map(self.search, query_list)
            pool.terminate()

        print(results)
        search_results = {item[0]: item[1] for item in results}
        print(search_results)
        return search_results

Solution

  • I was able to get this working successfully. My code below should demonstrate how this is accomplished.

    import io
    import csv
    from time import sleep
    import splunklib.results as results
    import splunklib.client as client
    import splunklib.binding as binding
    from multiprocessing.pool import ThreadPool
    from contextlib import closing
    
    
    
    class SplunkConnector(object):
        def __init__(self, username, password, customerGuid):
            self.username = username
            self.password = password
            self.customerGuid = customerGuid
            flag = True
            while flag:
                try:
                    self.service = client.connect(host=*****, port=8089, username=self.username, password=self.password, scheme='https')
                    flag = False
                except binding.HTTPError as e:
                    json_log.debug(str(e))
    
        def search(self, query_dict):
            query = query_dict['search']
            label = query_dict['label']
            search_headers = query_dict['headers']
            customer = query_dict['customer']
            customerGuid = query_dict['customerGuid']
            try:
                earliest_time = query_dict['earliest_time']
                latest_time = query_dict['latest_time']
            except KeyError:
                earliest_time = '-1d@d'
                latest_time = '@d'
            kwargs_normalsearch = {'exec_mode': 'normal', 'earliest_time': earliest_time, 'latest_time': latest_time, 'output_mode': 'csv'}
            flag = True
            while flag:
                try:
                    job = self.service.jobs.create(query + ' | fillnull value="---"', **kwargs_normalsearch)
                    flag = False
                except binding.HTTPError as e:
                    pass
                pass
            while True:
                try:
                    while not job.is_ready():
                        pass
                    stats = {"isDone": job["isDone"],
                             "label": label,
                             "customer": customer,
                             "customerGuid": customerGuid,
                             "doneProgress": float(job["doneProgress"]) * 100,
                             "scanCount": int(job["scanCount"]),
                             "eventCount": int(job["eventCount"]),
                             "resultCount": int(job["resultCount"])}
    
                    if stats["isDone"] == "1":
                        break
                    sleep(2)
                    stats = {"isDone": job["isDone"],
                             "label": label,
                             "customer": customer,
                             "customerGuid": customerGuid,
                             "doneProgress": float(job["doneProgress"]) * 100}
    
                    if stats["isDone"] == "1":
                        break
                    sleep(2)
    
                except binding.HTTPError as e:
                    pass
                except AttributeError:
    
                    stats = {"isDone": job["isDone"],
                             "label": label,
                             "customer": customer,
                             "customerGuid": customerGuid,
                             "doneProgress": float(job["doneProgress"]) * 100}
    
                    if stats["isDone"] == "1":
                        break
                    sleep(2)
    
            result_count = job['resultCount']
            offset = 0
            count = 50000
            results_list = self.results_getter(job, label, customerGuid, search_headers, True, count, offset, result_count)
    
    
            while len(results_list) < int(result_count) + 1:
                offset += count
                placeholder = self.results_getter(job, label, customerGuid, search_headers, False, count, offset, result_count)
                results_list.extend(placeholder)
    
            output = io.BytesIO()
            writer = csv.writer(output, delimiter=',')
            writer.writerows(results_list)
            output_string = output.getvalue()
            job.cancel()
            return [label, output_string.replace('\r\n', '\n').replace('---', '')]
    
        def results_getter(self, job, label, customerGuid, search_headers, first, count, offset, result_count):
            # Get the results and display them
            kwargs_paginate = {"count": count,
                               "offset": offset}
            blocksearch_results = job.results(**kwargs_paginate)
            results_list = []
    
            reader = results.ResultsReader(blocksearch_results)
    
            for result in reader:
                if isinstance(result, results.Message):
                    # Diagnostic messages may be returned in the results
                elif isinstance(result, dict):
                    # Normal events are returned as dicts
                    keys, values = [], []
    
                    for header in search_headers:
                        if header not in result.keys():
                            result[header] = ''
    
                    for key, value in result.items():
                        if key in search_headers:
                            keys.append(str(key))
                            values.append(str(value))
                    if not results_list == []:
                        results_list.append(values)
                    elif first:
                        results_list.append(keys)
                        results_list.append(values)
                    else:
                        results_list.append(values)
    
            assert not reader.is_preview
            return results_list
    
        def searches(self, query_list):
            if type(query_list) == dict:
                query_list = [value for value in query_list.values()]
            with closing(ThreadPool(processes=len(query_list))) as pool:
                results = pool.map(self.search, query_list)
                pool.terminate()
    
            search_results = {item[0]: item[1] for item in results}
            return search_results