# -*- coding: utf-8 -*- from odoo import api, models, tools from odoo.utils.model_util import const_model from odoo.utils.model_util.query_model_helper import QueryModelHelper from odoo.utils.model_util.const import get_digit_capacity_4_save_setting, DATE_LIST, get_database_value_from_value, \ get_date_range_string, DEFAULT_COUNT_PER_PAGE from .report_sources.source_base import SourceBase, GROUP_FIELD, SUM_FIELD, EXPRESSION_FIELD from odoo.utils.constant import REPORT_TYPE, ANALYSE, ER_WEI, CHART, SINGLE_HEAD, DOUBLE_HEAD, TABLE_HEAD_TYPE, \ CHART_TYPE, LINE, BAR from ..utils.chart import Chart, ChartType bi_result_dic = {} # 缓存。导出Excel、翻页时,就不用再查询一遍 class Base(models.AbstractModel): _inherit = 'base' _module = 'jc_bi' def _get_bi_helper(self, detail_field, reference_field): # print('_get_bi_helper') model = self._get_model_from(detail_field, reference_field,self.env) helper = QueryModelHelper(self.env, model) # lock_column_count = 0 # setting_list = [lock_column_count, DEFAULT_COUNT_PER_PAGE] # helper.set_setting(self, [], setting_list, []) return helper @api.model def super_query_dropdown(self, comodel): # todo: 是否需要权限管起来?在bi中调用这个方法 return self.super_query_dropdown2(comodel, ['id', 'name']) @api.model def super_query_dropdown2(self, comodel, field_list): # todo: 是否需要权限管起来?在bi中调用这个方法 res = self.env[comodel].sudo().search_read([], field_list) return res @api.model def get_source_info(self, source_name, report_style_id, report_type): # print('source_name:', source_name) # print('report_style_id:', report_style_id) # print('report_type:', report_type) exist_source = SourceBase.get_source(source_name) if not exist_source: return '' model = exist_source.model if report_type == ANALYSE: res = self.env[model].sudo().get_show_field(exist_source, report_style_id) # print('ANALYSE:', res) return res if report_type == ER_WEI: return self.env[model].sudo().get_er_wei(exist_source, report_style_id) if report_type == CHART: return self.env[model].sudo().get_chart(exist_source, report_style_id) return '' def get_show_field(self, source, report_style_id): style = self._query_style(report_style_id) _setting = style.detail if style else [] helper = self._get_bi_helper(source.detail_field, source.reference_field) setting_list = [style.lock_column_count, style.count_per_page] if style else [0, DEFAULT_COUNT_PER_PAGE] helper.set_setting(_setting, [], setting_list, None) show_field_setting = helper.get_show_field_setting() # print('show_field_setting:', show_field_setting) return show_field_setting def get_er_wei(self, source, report_style_id): style = self._query_style(report_style_id) helper = self._get_bi_helper(source.detail_field, source.reference_field) all_fields = helper.get_all_chart_fields() # print('all_fields:', all_fields) indicator_list, er_wei_info = Base._get_er_wei_setting(style) return all_fields, indicator_list, er_wei_info def get_chart(self, source, report_style_id): style = self._query_style(report_style_id) helper = self._get_bi_helper(source.detail_field, source.reference_field) all_fields = helper.get_all_chart_fields() # print('all_fields:', all_fields) indicator_list, chart_info = Base._get_chart_setting(style) return all_fields, indicator_list, chart_info, CHART_TYPE def _query_style(self, report_style_id): if report_style_id: report_style_id = int(report_style_id) style = self.env['jc_bi.report_style'].browse(report_style_id) else: style = None return style @staticmethod def _get_er_wei_setting(style): indicator_list = [(detail.indicator_field, detail.indicator_name, detail.indicator_show_name) for detail in style.indication_detail] er_wei_info = [style.row_name, style.row_name_show, style.first_column_width, style.column_name, style.column_name_show, style.other_column_width] return indicator_list, er_wei_info @staticmethod def _get_chart_setting(style): indicator_list = [(detail.indicator_field, detail.indicator_name, detail.indicator_show_name) for detail in style.chart_detail] chart_info = [style.unit_name, style.unit_name_show, style.x_name, style.x_name_show, style.chart_type] return indicator_list, chart_info @api.model def query_condition_setting(self, report_set_id, condition_id): res = [] bill = self.env[self._name].sudo().browse(int(report_set_id)) if not bill: return [] condition = self.env['jc_bi.report_label_condition'].sudo().browse(int(condition_id)) for d in bill.detail: if not d.report_style_id: continue style_id = d.report_style_id.id style = self.env['jc_bi.report_style'].sudo().browse(style_id) source_name = style.source # print('style_id,source:', style_id, source_name) exist_source = SourceBase.get_source(source_name) if not exist_source: res.append([style_id, []]) continue model = exist_source.model condition_list = self.env[model].sudo().get_style_condition(style, exist_source, condition) common_condition_field = Base._get_style_common_condition_field(style.id, condition) res.append([style_id, style.name, condition_list, common_condition_field]) # condition_list = helper.get_pre_query_setting_condition(setting.pre_query_condition_detail) return res def get_style_condition(self, style, source, condition): detail = [x for x in condition.detail if x.report_style_id.id == style.id] helper = self._get_bi_helper(source.detail_field, source.reference_field) setting_list = [style.lock_column_count, style.count_per_page] if style else [0, DEFAULT_COUNT_PER_PAGE] _setting = style.detail if style else [] helper.set_setting([], _setting, setting_list, None) condition_list = helper.get_setting_condition(detail) # print('style_id, condition_list:', style.id, condition_list) return condition_list @staticmethod def _get_style_common_condition_field(style_id, condition): detail = [x for x in condition.common_condition_detail if x.report_style_id.id == style_id] if not detail: return '' return detail[0].field @api.model def search_bi(self, label_id, style_id, page_no, date_range_condition): """ BI的每一个报表的查询方法 :param label_id: 标签ID :param style_id: 样式ID :param page_no: 第几页,用于分析 :param date_range_condition: 右上角的日期通用条件 :return: 查询结果 """ label_id = int(label_id) style_id = int(style_id) if page_no == -1: # 点击了查询按钮,或者点击了预定义查询按钮 self._query_search_and_set_cache_bi(label_id, style_id, date_range_condition) return self._get_page_data_bi(label_id, style_id, page_no) def _query_search_and_set_cache_bi(self, label_id, style_id, date_range_condition): style = self.env['jc_bi.report_style'].sudo().browse(style_id) if not style: return label = self.env['jc_bi.report_label'].sudo().browse(int(label_id)) condition_id = label.report_label_condition_id.id condition = self.env['jc_bi.report_label_condition'].sudo().browse(condition_id) if condition_id else None helper, count_per_page = self._get_helper_bi_query(style) condition_list = Base._get_condition(helper, condition, style_id) condition_list = Base._change_to_condition_format(condition_list) date_condition_list = Base._get_date_condition_list(date_range_condition, condition, style_id) if date_condition_list: condition_list += date_condition_list # print('condition_list:', condition_list) helper.set_condition_list(condition_list) if style.report_type == ANALYSE: self._query_analyse(helper, style, label_id, count_per_page) return if style.report_type == ER_WEI: self._query_er_wei(helper, style, label_id) return if style.report_type == CHART: self._query_chart(helper, style, label_id) return return def _query_analyse(self, helper, style, label_id, count_per_page): show_field_list = self._get_show_field_list(style) # print('show_field_list:', show_field_list) helper.set_query_fields_list(show_field_list) # # print('condition_list:', condition_list) # helper.set_condition_list(condition_list) res = helper.execute_query() column_name_list, rows, sum_row = res bi_result_dic[(label_id, style.id)] = (ANALYSE, (column_name_list, rows, sum_row, count_per_page)) return def _query_er_wei(self, helper, style, label_id): column_list = helper.query_column(style.column_name) indicator_list = Base._get_indicator_list(style.indication_detail) # print('indicator_list:', indicator_list) field_expression_list, all_fields, heads_list = Base._get_indicator_expression(column_list, indicator_list, style.row_name, style.row_name_show, style.column_name) # print('start er wei:') # print('all fields:', all_fields) er_wei_data = helper.query_chart(field_expression_list, all_fields, heads_list, False) if er_wei_data: width_pair = style.get_er_wei_column_width() er_wei_data = er_wei_data[0], er_wei_data[1:], width_pair # print('er_wei_data:', er_wei_data) bi_result_dic[(label_id, style.id)] = (ER_WEI, er_wei_data) return def _query_chart(self, helper, style, label_id): column_list = helper.query_column(style.unit_name) indicator_list = Base._get_indicator_list(style.chart_detail) # print('indicator_list:', indicator_list) field_expression_list, all_fields, heads_list = Base._get_indicator_expression(column_list, indicator_list, style.x_name, style.x_name_show, style.unit_name) chart_data = helper.query_chart(field_expression_list, all_fields, heads_list) # print('chart_data:', chart_data) res = Base._get_chart(style.chart_type, chart_data) # print('chart:', res) bi_result_dic[(label_id, style.id)] = (CHART, res) return @staticmethod def _get_chart(chart_type, chart_data): helper = Chart(None) helper.set_data(chart_data) # helper.set_zoom(0, 100) helper.set_chart_type(Base._get_chart_type(chart_type)) # helper.set_change_xy() return helper.get() @staticmethod def _get_chart_type(chart_type): if chart_type == LINE: return ChartType().line if chart_type == BAR: return ChartType().bar return ChartType().line @staticmethod def _get_indicator_list(style_detail): indicator_list = [] for detail in style_detail: name = detail.indicator_show_name if detail.indicator_show_name else detail.indicator_name indicator_list.append((detail.indicator_field, name)) return indicator_list @staticmethod def _get_indicator_expression(column_value_list, indicator_list, field_x_or_first, show_x_or_first, field_unit_or_other): all_fields = [field_x_or_first] selected_exp_list = [field_x_or_first] heads_list = [show_x_or_first] _format = "sum(case when {}={} then {} end) as {}" _format_none = "sum(case when {} is null then {} end) as {}" for name, _id in column_value_list: for table_field, show in indicator_list: all_fields.append(table_field) if not _id: name = '=无=' selected_exp_list.append(_format_none.format(field_unit_or_other, table_field, show)) else: selected_exp_list.append(_format.format(field_unit_or_other, _id, table_field, show)) heads_list.append("{}/{}".format(name, show)) # return ','.join(selected_exp_list), ','.join(heads_list) return selected_exp_list, all_fields, heads_list @staticmethod def _get_condition(helper, condition, style_id): if not condition: return [] detail = [d for d in condition.detail if d.report_style_id.id == style_id] condition_list = helper.get_setting_condition(detail) # print('condition_list, before:', condition_list) condition_list = Base._clear_empty_vale_and_reset_date_value(condition_list) # print('condition_list, after:', condition_list) return condition_list @staticmethod def _get_date_condition_list(date_range_condition, condition, style_id): if not date_range_condition: return None if not date_range_condition[0] and not date_range_condition[1]: return None if not condition: return None common_condition_field = Base._get_style_common_condition_field(style_id, condition) if not common_condition_field: return None res = [] if date_range_condition[0]: start = [common_condition_field, 'date', [date_range_condition[0], True]] res.append(start) if date_range_condition[1]: end = [common_condition_field, 'date', [date_range_condition[1], False]] res.append(end) return res @staticmethod def _clear_empty_vale_and_reset_date_value(condition_list): res = [] for i, item in enumerate(condition_list): field, name, _type, value = item if not value: continue if _type in ('selection', 'many2one') and not value[0]: continue if _type != 'date': res.append((field, _type, value)) continue start_date_define_pair, end_date_define_pair = value start_date_range = get_date_range_string(start_date_define_pair[0]) end_date_range = get_date_range_string(end_date_define_pair[0]) res.append((field, _type, [start_date_range[0], end_date_range[1]])) return res @staticmethod def _change_to_condition_format(condition_list): res = [] for table_field, _type, value in condition_list: if _type == 'many2one': res.append([table_field, _type, value[0][0]]) continue if _type == 'date': start, end = value if start: res.append([table_field, _type, [start, True]]) if end: res.append([table_field, _type, [end, False]]) continue res.append([table_field, _type, value]) return res def _get_show_field_list(self, style): # res = [] _list = [d for d in style.detail if d.sequence] # print('_list:', _list) _list.sort(key=lambda b: b.sequence) # print('style list order by sequence:', _list) return [d.field for d in _list if not d.field.endswith('.number_type') and not d.field.endswith('.bill_show')] def _get_helper_bi_query(self, style): model = self._get_model_from_bi(style) helper = QueryModelHelper(self.env, model) count_per_page = style.count_per_page if count_per_page <= 0: count_per_page = DEFAULT_COUNT_PER_PAGE setting_list = [style.lock_column_count, count_per_page] helper.set_setting(style.detail, [], setting_list, []) return helper, count_per_page def _get_model_from_bi(self, style): exist_source = SourceBase.get_source(style.source) if not exist_source: return '' model = exist_source.model table = exist_source.sql_table detail_field = exist_source.detail_field reference_field = exist_source.reference_field # print(' [获取 model 信息, do:', model, table) return const_model.get_model_from2(self, model, table, detail_field, reference_field, self._get_rec_name, self._get_field_info, self._get_table,self.env) def _get_page_data_bi(self, label_id, style_id, page_no): report_type, data = self.get_result_bi(label_id, style_id) if report_type == ANALYSE: return ANALYSE, self._get_analyse_result(data, page_no, label_id, style_id) if report_type == ER_WEI: return ER_WEI, (label_id, style_id, data) if report_type == CHART: return CHART, (label_id, style_id, data) return '', [] def _get_analyse_result(self, data, page_no, label_id, style_id): column_name_list, rows, sum_row, count_per_page = data page_count = self._get_page_count(rows, count_per_page) if page_no == '全部': return column_name_list, rows, sum_row, page_count, len(rows), label_id, style_id page_no = self._get_page_no(page_no, page_count) start = (page_no - 1) * count_per_page end = page_no * count_per_page page_rows = rows if page_count == 1 else rows[start:end] return column_name_list, page_rows, sum_row, page_count, len(rows), label_id, style_id def get_result_bi(self, label_id, style_id): if (label_id, style_id) not in bi_result_dic: return ANALYSE, ([], [], [], DEFAULT_COUNT_PER_PAGE) return bi_result_dic[(label_id, style_id)]